HADOOP-12060. Fix ByteBuffer usage for raw erasure coders. Contributed by Kai Zheng.

This commit is contained in:
Jing Zhao 2015-07-20 10:15:14 -07:00
parent 06394e3760
commit 29495cb8f6
17 changed files with 268 additions and 129 deletions

View File

@ -69,3 +69,6 @@
HADOOP-12065. Using more meaningful keys in EC schema. (Kai Zheng) HADOOP-12065. Using more meaningful keys in EC schema. (Kai Zheng)
HDFS-8557. Allow to configure RS and XOR raw coders (Kai Zheng) HDFS-8557. Allow to configure RS and XOR raw coders (Kai Zheng)
HADOOP-12060. Fix ByteBuffer usage for raw erasure coders. (Kai Zheng via
jing9)

View File

@ -22,17 +22,17 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.erasurecode.rawcoder.*; import org.apache.hadoop.io.erasurecode.rawcoder.*;
/** /**
* A codec utility. * A codec & coder utility to help create raw coders conveniently.
*/ */
public final class CodecUtil { public final class CodecUtil {
private CodecUtil() {} private CodecUtil() { }
/** /**
* Create RS raw encoder according to configuration. * Create RS raw encoder according to configuration.
* @param conf * @param conf configuration possibly with some items to configure the coder
* @param numDataUnits * @param numDataUnits number of data units in a coding group
* @param numParityUnits * @param numParityUnits number of parity units in a coding group
* @return raw encoder * @return raw encoder
*/ */
public static RawErasureEncoder createRSRawEncoder( public static RawErasureEncoder createRSRawEncoder(
@ -49,9 +49,9 @@ public final class CodecUtil {
/** /**
* Create RS raw decoder according to configuration. * Create RS raw decoder according to configuration.
* @param conf * @param conf configuration possibly with some items to configure the coder
* @param numDataUnits * @param numDataUnits number of data units in a coding group
* @param numParityUnits * @param numParityUnits number of parity units in a coding group
* @return raw decoder * @return raw decoder
*/ */
public static RawErasureDecoder createRSRawDecoder( public static RawErasureDecoder createRSRawDecoder(
@ -68,9 +68,9 @@ public final class CodecUtil {
/** /**
* Create XOR raw encoder according to configuration. * Create XOR raw encoder according to configuration.
* @param conf * @param conf configuration possibly with some items to configure the coder
* @param numDataUnits * @param numDataUnits number of data units in a coding group
* @param numParityUnits * @param numParityUnits number of parity units in a coding group
* @return raw encoder * @return raw encoder
*/ */
public static RawErasureEncoder createXORRawEncoder( public static RawErasureEncoder createXORRawEncoder(
@ -87,9 +87,9 @@ public final class CodecUtil {
/** /**
* Create XOR raw decoder according to configuration. * Create XOR raw decoder according to configuration.
* @param conf * @param conf configuration possibly with some items to configure the coder
* @param numDataUnits * @param numDataUnits number of data units in a coding group
* @param numParityUnits * @param numParityUnits number of parity units in a coding group
* @return raw decoder * @return raw decoder
*/ */
public static RawErasureDecoder createXORRawDecoder( public static RawErasureDecoder createXORRawDecoder(
@ -106,11 +106,11 @@ public final class CodecUtil {
/** /**
* Create raw coder using specified conf and raw coder factory key. * Create raw coder using specified conf and raw coder factory key.
* @param conf * @param conf configuration possibly with some items to configure the coder
* @param rawCoderFactoryKey * @param rawCoderFactoryKey configuration key to find the raw coder factory
* @param isEncoder * @param isEncoder is encoder or not we're going to create
* @param numDataUnits * @param numDataUnits number of data units in a coding group
* @param numParityUnits * @param numParityUnits number of parity units in a coding group
* @return raw coder * @return raw coder
*/ */
public static RawErasureCoder createRawCoder(Configuration conf, public static RawErasureCoder createRawCoder(Configuration conf,

View File

@ -37,8 +37,8 @@ public class ECBlock {
/** /**
* A constructor specifying isParity and isErased. * A constructor specifying isParity and isErased.
* @param isParity * @param isParity is a parity block
* @param isErased * @param isErased is erased or not
*/ */
public ECBlock(boolean isParity, boolean isErased) { public ECBlock(boolean isParity, boolean isErased) {
this.isParity = isParity; this.isParity = isParity;
@ -47,7 +47,7 @@ public class ECBlock {
/** /**
* Set true if it's for a parity block. * Set true if it's for a parity block.
* @param isParity * @param isParity is parity or not
*/ */
public void setParity(boolean isParity) { public void setParity(boolean isParity) {
this.isParity = isParity; this.isParity = isParity;
@ -55,10 +55,10 @@ public class ECBlock {
/** /**
* Set true if the block is missing. * Set true if the block is missing.
* @param isMissing * @param isErased is erased or not
*/ */
public void setErased(boolean isMissing) { public void setErased(boolean isErased) {
this.isErased = isMissing; this.isErased = isErased;
} }
/** /**
@ -71,7 +71,7 @@ public class ECBlock {
/** /**
* *
* @return true if it's missing or corrupt due to erasure, otherwise false * @return true if it's erased due to erasure, otherwise false
*/ */
public boolean isErased() { public boolean isErased() {
return isErased; return isErased;

View File

@ -27,8 +27,8 @@ public class ECBlockGroup {
/** /**
* A constructor specifying data blocks and parity blocks. * A constructor specifying data blocks and parity blocks.
* @param dataBlocks * @param dataBlocks data blocks in the group
* @param parityBlocks * @param parityBlocks parity blocks in the group
*/ */
public ECBlockGroup(ECBlock[] dataBlocks, ECBlock[] parityBlocks) { public ECBlockGroup(ECBlock[] dataBlocks, ECBlock[] parityBlocks) {
this.dataBlocks = dataBlocks; this.dataBlocks = dataBlocks;
@ -81,7 +81,7 @@ public class ECBlockGroup {
/** /**
* Get erased blocks count * Get erased blocks count
* @return * @return erased count of blocks
*/ */
public int getErasedCount() { public int getErasedCount() {
int erasedCount = 0; int erasedCount = 0;

View File

@ -28,7 +28,7 @@ public class ECChunk {
/** /**
* Wrapping a ByteBuffer * Wrapping a ByteBuffer
* @param buffer * @param buffer buffer to be wrapped by the chunk
*/ */
public ECChunk(ByteBuffer buffer) { public ECChunk(ByteBuffer buffer) {
this.chunkBuffer = buffer; this.chunkBuffer = buffer;
@ -36,7 +36,7 @@ public class ECChunk {
/** /**
* Wrapping a bytes array * Wrapping a bytes array
* @param buffer * @param buffer buffer to be wrapped by the chunk
*/ */
public ECChunk(byte[] buffer) { public ECChunk(byte[] buffer) {
this.chunkBuffer = ByteBuffer.wrap(buffer); this.chunkBuffer = ByteBuffer.wrap(buffer);
@ -52,7 +52,7 @@ public class ECChunk {
/** /**
* Convert an array of this chunks to an array of ByteBuffers * Convert an array of this chunks to an array of ByteBuffers
* @param chunks * @param chunks chunks to convert into buffers
* @return an array of ByteBuffers * @return an array of ByteBuffers
*/ */
public static ByteBuffer[] toBuffers(ECChunk[] chunks) { public static ByteBuffer[] toBuffers(ECChunk[] chunks) {

View File

@ -94,10 +94,10 @@ public final class ECSchema {
/** /**
* Constructor with key parameters provided. * Constructor with key parameters provided.
* @param schemaName * @param schemaName schema name
* @param codecName * @param codecName codec name
* @param numDataUnits * @param numDataUnits number of data units used in the schema
* @param numParityUnits * @param numParityUnits number os parity units used in the schema
*/ */
public ECSchema(String schemaName, String codecName, public ECSchema(String schemaName, String codecName,
int numDataUnits, int numParityUnits) { int numDataUnits, int numParityUnits) {
@ -107,11 +107,11 @@ public final class ECSchema {
/** /**
* Constructor with key parameters provided. Note the extraOptions may contain * Constructor with key parameters provided. Note the extraOptions may contain
* additional information for the erasure codec to interpret further. * additional information for the erasure codec to interpret further.
* @param schemaName * @param schemaName schema name
* @param codecName * @param codecName codec name
* @param numDataUnits * @param numDataUnits number of data units used in the schema
* @param numParityUnits * @param numParityUnits number os parity units used in the schema
* @param extraOptions * @param extraOptions extra options to configure the codec
*/ */
public ECSchema(String schemaName, String codecName, int numDataUnits, public ECSchema(String schemaName, String codecName, int numDataUnits,
int numParityUnits, Map<String, String> extraOptions) { int numParityUnits, Map<String, String> extraOptions) {

View File

@ -43,7 +43,8 @@ import org.xml.sax.SAXException;
* A EC schema loading utility that loads predefined EC schemas from XML file * A EC schema loading utility that loads predefined EC schemas from XML file
*/ */
public class SchemaLoader { public class SchemaLoader {
private static final Logger LOG = LoggerFactory.getLogger(SchemaLoader.class.getName()); private static final Logger LOG = LoggerFactory.getLogger(
SchemaLoader.class.getName());
/** /**
* Load predefined ec schemas from configuration file. This file is * Load predefined ec schemas from configuration file. This file is

View File

@ -91,37 +91,45 @@ public abstract class AbstractRawErasureCoder
} }
/** /**
* Check and ensure the buffers are of the length specified by dataLen. * Check and ensure the buffers are of the length specified by dataLen, also
* @param buffers * ensure the buffers are direct buffers or not according to isDirectBuffer.
* @param allowNull * @param buffers the buffers to check
* @param dataLen * @param allowNull whether to allow any element to be null or not
* @param dataLen the length of data available in the buffer to ensure with
* @param isDirectBuffer is direct buffer or not to ensure with
*/ */
protected void ensureLength(ByteBuffer[] buffers, protected void ensureLengthAndType(ByteBuffer[] buffers, boolean allowNull,
boolean allowNull, int dataLen) { int dataLen, boolean isDirectBuffer) {
for (int i = 0; i < buffers.length; ++i) { for (ByteBuffer buffer : buffers) {
if (buffers[i] == null && !allowNull) { if (buffer == null && !allowNull) {
throw new HadoopIllegalArgumentException( throw new HadoopIllegalArgumentException(
"Invalid buffer found, not allowing null"); "Invalid buffer found, not allowing null");
} else if (buffers[i] != null && buffers[i].remaining() != dataLen) { } else if (buffer != null) {
throw new HadoopIllegalArgumentException( if (buffer.remaining() != dataLen) {
"Invalid buffer, not of length " + dataLen); throw new HadoopIllegalArgumentException(
"Invalid buffer, not of length " + dataLen);
}
if (buffer.isDirect() != isDirectBuffer) {
throw new HadoopIllegalArgumentException(
"Invalid buffer, isDirect should be " + isDirectBuffer);
}
} }
} }
} }
/** /**
* Check and ensure the buffers are of the length specified by dataLen. * Check and ensure the buffers are of the length specified by dataLen.
* @param buffers * @param buffers the buffers to check
* @param allowNull * @param allowNull whether to allow any element to be null or not
* @param dataLen * @param dataLen the length of data available in the buffer to ensure with
*/ */
protected void ensureLength(byte[][] buffers, protected void ensureLength(byte[][] buffers,
boolean allowNull, int dataLen) { boolean allowNull, int dataLen) {
for (int i = 0; i < buffers.length; ++i) { for (byte[] buffer : buffers) {
if (buffers[i] == null && !allowNull) { if (buffer == null && !allowNull) {
throw new HadoopIllegalArgumentException( throw new HadoopIllegalArgumentException(
"Invalid buffer found, not allowing null"); "Invalid buffer found, not allowing null");
} else if (buffers[i] != null && buffers[i].length != dataLen) { } else if (buffer != null && buffer.length != dataLen) {
throw new HadoopIllegalArgumentException( throw new HadoopIllegalArgumentException(
"Invalid buffer not of length " + dataLen); "Invalid buffer not of length " + dataLen);
} }

View File

@ -41,14 +41,14 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
checkParameters(inputs, erasedIndexes, outputs); checkParameters(inputs, erasedIndexes, outputs);
ByteBuffer validInput = findFirstValidInput(inputs); ByteBuffer validInput = findFirstValidInput(inputs);
boolean usingDirectBuffer = validInput.isDirect();
int dataLen = validInput.remaining(); int dataLen = validInput.remaining();
if (dataLen == 0) { if (dataLen == 0) {
return; return;
} }
ensureLength(inputs, true, dataLen); ensureLengthAndType(inputs, true, dataLen, usingDirectBuffer);
ensureLength(outputs, false, dataLen); ensureLengthAndType(outputs, false, dataLen, usingDirectBuffer);
boolean usingDirectBuffer = validInput.isDirect();
if (usingDirectBuffer) { if (usingDirectBuffer) {
doDecode(inputs, erasedIndexes, outputs); doDecode(inputs, erasedIndexes, outputs);
return; return;
@ -63,14 +63,14 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
for (int i = 0; i < inputs.length; ++i) { for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i]; buffer = inputs[i];
if (buffer != null) { if (buffer != null) {
inputOffsets[i] = buffer.position(); inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array(); newInputs[i] = buffer.array();
} }
} }
for (int i = 0; i < outputs.length; ++i) { for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i]; buffer = outputs[i];
outputOffsets[i] = buffer.position(); outputOffsets[i] = buffer.arrayOffset() + buffer.position();
newOutputs[i] = buffer.array(); newOutputs[i] = buffer.array();
} }
@ -81,7 +81,7 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
buffer = inputs[i]; buffer = inputs[i];
if (buffer != null) { if (buffer != null) {
// dataLen bytes consumed // dataLen bytes consumed
buffer.position(inputOffsets[i] + dataLen); buffer.position(buffer.position() + dataLen);
} }
} }
} }
@ -89,7 +89,7 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
/** /**
* Perform the real decoding using Direct ByteBuffer. * Perform the real decoding using Direct ByteBuffer.
* @param inputs Direct ByteBuffers expected * @param inputs Direct ByteBuffers expected
* @param erasedIndexes * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs Direct ByteBuffers expected * @param outputs Direct ByteBuffers expected
*/ */
protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
@ -117,12 +117,12 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
/** /**
* Perform the real decoding using bytes array, supporting offsets and * Perform the real decoding using bytes array, supporting offsets and
* lengths. * lengths.
* @param inputs * @param inputs the input byte arrays to read data from
* @param inputOffsets * @param inputOffsets offsets for the input byte arrays to read data from
* @param dataLen * @param dataLen how much data are to be read from
* @param erasedIndexes * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs * @param outputs the output byte arrays to write resultant data into
* @param outputOffsets * @param outputOffsets offsets from which to write resultant data into
*/ */
protected abstract void doDecode(byte[][] inputs, int[] inputOffsets, protected abstract void doDecode(byte[][] inputs, int[] inputOffsets,
int dataLen, int[] erasedIndexes, int dataLen, int[] erasedIndexes,
@ -139,12 +139,12 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
/** /**
* Check and validate decoding parameters, throw exception accordingly. The * Check and validate decoding parameters, throw exception accordingly. The
* checking assumes it's a MDS code. Other code can override this. * checking assumes it's a MDS code. Other code can override this.
* @param inputs * @param inputs input buffers to check
* @param erasedIndexes * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs * @param outputs output buffers to check
*/ */
protected void checkParameters(Object[] inputs, int[] erasedIndexes, protected <T> void checkParameters(T[] inputs, int[] erasedIndexes,
Object[] outputs) { T[] outputs) {
if (inputs.length != getNumParityUnits() + getNumDataUnits()) { if (inputs.length != getNumParityUnits() + getNumDataUnits()) {
throw new IllegalArgumentException("Invalid inputs length"); throw new IllegalArgumentException("Invalid inputs length");
} }
@ -160,8 +160,8 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
} }
int validInputs = 0; int validInputs = 0;
for (int i = 0; i < inputs.length; ++i) { for (T input : inputs) {
if (inputs[i] != null) { if (input != null) {
validInputs += 1; validInputs += 1;
} }
} }
@ -177,7 +177,7 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
* not to read. * not to read.
* @return indexes into inputs array * @return indexes into inputs array
*/ */
protected int[] getErasedOrNotToReadIndexes(Object[] inputs) { protected <T> int[] getErasedOrNotToReadIndexes(T[] inputs) {
int[] invalidIndexes = new int[inputs.length]; int[] invalidIndexes = new int[inputs.length];
int idx = 0; int idx = 0;
for (int i = 0; i < inputs.length; i++) { for (int i = 0; i < inputs.length; i++) {
@ -191,13 +191,13 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
/** /**
* Find the valid input from all the inputs. * Find the valid input from all the inputs.
* @param inputs * @param inputs input buffers to look for valid input
* @return the first valid input * @return the first valid input
*/ */
protected static <T> T findFirstValidInput(T[] inputs) { protected static <T> T findFirstValidInput(T[] inputs) {
for (int i = 0; i < inputs.length; i++) { for (T input : inputs) {
if (inputs[i] != null) { if (input != null) {
return inputs[i]; return input;
} }
} }

View File

@ -37,14 +37,15 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
@Override @Override
public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) { public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
checkParameters(inputs, outputs); checkParameters(inputs, outputs);
boolean usingDirectBuffer = inputs[0].isDirect();
int dataLen = inputs[0].remaining(); int dataLen = inputs[0].remaining();
if (dataLen == 0) { if (dataLen == 0) {
return; return;
} }
ensureLength(inputs, false, dataLen); ensureLengthAndType(inputs, false, dataLen, usingDirectBuffer);
ensureLength(outputs, false, dataLen); ensureLengthAndType(outputs, false, dataLen, usingDirectBuffer);
boolean usingDirectBuffer = inputs[0].isDirect();
if (usingDirectBuffer) { if (usingDirectBuffer) {
doEncode(inputs, outputs); doEncode(inputs, outputs);
return; return;
@ -58,13 +59,13 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
ByteBuffer buffer; ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) { for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i]; buffer = inputs[i];
inputOffsets[i] = buffer.position(); inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array(); newInputs[i] = buffer.array();
} }
for (int i = 0; i < outputs.length; ++i) { for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i]; buffer = outputs[i];
outputOffsets[i] = buffer.position(); outputOffsets[i] = buffer.arrayOffset() + buffer.position();
newOutputs[i] = buffer.array(); newOutputs[i] = buffer.array();
} }
@ -102,11 +103,11 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
/** /**
* Perform the real encoding work using bytes array, supporting offsets * Perform the real encoding work using bytes array, supporting offsets
* and lengths. * and lengths.
* @param inputs * @param inputs the input byte arrays to read data from
* @param inputOffsets * @param inputOffsets offsets for the input byte arrays to read data from
* @param dataLen * @param dataLen how much data are to be read from
* @param outputs * @param outputs the output byte arrays to write resultant data into
* @param outputOffsets * @param outputOffsets offsets from which to write resultant data into
*/ */
protected abstract void doEncode(byte[][] inputs, int[] inputOffsets, protected abstract void doEncode(byte[][] inputs, int[] inputOffsets,
int dataLen, byte[][] outputs, int dataLen, byte[][] outputs,
@ -121,10 +122,10 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
/** /**
* Check and validate decoding parameters, throw exception accordingly. * Check and validate decoding parameters, throw exception accordingly.
* @param inputs * @param inputs input buffers to check
* @param outputs * @param outputs output buffers to check
*/ */
protected void checkParameters(Object[] inputs, Object[] outputs) { protected <T> void checkParameters(T[] inputs, T[] outputs) {
if (inputs.length != getNumDataUnits()) { if (inputs.length != getNumDataUnits()) {
throw new HadoopIllegalArgumentException("Invalid inputs length"); throw new HadoopIllegalArgumentException("Invalid inputs length");
} }

View File

@ -26,16 +26,16 @@ public interface RawErasureCoderFactory {
/** /**
* Create raw erasure encoder. * Create raw erasure encoder.
* @param numDataUnits * @param numDataUnits number of data units in a coding group
* @param numParityUnits * @param numParityUnits number of parity units in a coding group
* @return raw erasure encoder * @return raw erasure encoder
*/ */
public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits); public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits);
/** /**
* Create raw erasure decoder. * Create raw erasure decoder.
* @param numDataUnits * @param numDataUnits number of data units in a coding group
* @param numParityUnits * @param numParityUnits number of parity units in a coding group
* @return raw erasure decoder * @return raw erasure decoder
*/ */
public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits); public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits);

View File

@ -33,7 +33,8 @@ public interface RawErasureDecoder extends RawErasureCoder {
/** /**
* Decode with inputs and erasedIndexes, generates outputs. * Decode with inputs and erasedIndexes, generates outputs.
* How to prepare for inputs: * How to prepare for inputs:
* 1. Create an array containing parity units + data units; * 1. Create an array containing parity units + data units. Please note the
* parity units should be first or before the data units.
* 2. Set null in the array locations specified via erasedIndexes to indicate * 2. Set null in the array locations specified via erasedIndexes to indicate
* they're erased and no data are to read from; * they're erased and no data are to read from;
* 3. Set null in the array locations for extra redundant items, as they're * 3. Set null in the array locations for extra redundant items, as they're
@ -48,29 +49,39 @@ public interface RawErasureDecoder extends RawErasureCoder {
* erasedIndexes = [5] // index of d2 into inputs array * erasedIndexes = [5] // index of d2 into inputs array
* outputs = [a-writable-buffer] * outputs = [a-writable-buffer]
* *
* @param inputs inputs to read data from * Note, for both inputs and outputs, no mixing of on-heap buffers and direct
* buffers are allowed.
*
* @param inputs inputs to read data from, contents may change after the call
* @param erasedIndexes indexes of erased units in the inputs array * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs outputs to write into for data generated according to * @param outputs outputs to write into for data generated according to
* erasedIndexes * erasedIndexes, ready for reading the result data from after
* the call
*/ */
public void decode(ByteBuffer[] inputs, int[] erasedIndexes, public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs); ByteBuffer[] outputs);
/** /**
* Decode with inputs and erasedIndexes, generates outputs. More see above. * Decode with inputs and erasedIndexes, generates outputs. More see above.
* @param inputs inputs to read data from * @param inputs inputs to read data from, contents may change after the call
* @param erasedIndexes indexes of erased units in the inputs array * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs outputs to write into for data generated according to * @param outputs outputs to write into for data generated according to
* erasedIndexes * erasedIndexes, ready for reading the result data from after
* the call
*/ */
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs); public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs);
/** /**
* Decode with inputs and erasedIndexes, generates outputs. More see above. * Decode with inputs and erasedIndexes, generates outputs. More see above.
* @param inputs inputs to read data from *
* Note, for both input and output ECChunks, no mixing of on-heap buffers and
* direct buffers are allowed.
*
* @param inputs inputs to read data from, contents may change after the call
* @param erasedIndexes indexes of erased units in the inputs array * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs outputs to write into for data generated according to * @param outputs outputs to write into for data generated according to
* erasedIndexes * erasedIndexes, ready for reading the result data from after
* the call
*/ */
public void decode(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs); public void decode(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs);

View File

@ -31,23 +31,33 @@ import java.nio.ByteBuffer;
public interface RawErasureEncoder extends RawErasureCoder { public interface RawErasureEncoder extends RawErasureCoder {
/** /**
* Encode with inputs and generates outputs * Encode with inputs and generates outputs.
* @param inputs *
* Note, for both inputs and outputs, no mixing of on-heap buffers and direct
* buffers are allowed.
*
* @param inputs inputs to read data from, contents may change after the call
* @param outputs * @param outputs
*/ */
public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs); public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs);
/** /**
* Encode with inputs and generates outputs * Encode with inputs and generates outputs
* @param inputs * @param inputs inputs to read data from, contents may change after the call
* @param outputs * @param outputs outputs to write into for data generated, ready for reading
* the result data from after the call
*/ */
public void encode(byte[][] inputs, byte[][] outputs); public void encode(byte[][] inputs, byte[][] outputs);
/** /**
* Encode with inputs and generates outputs * Encode with inputs and generates outputs.
* @param inputs *
* @param outputs * Note, for both input and output ECChunks, no mixing of on-heap buffers and
* direct buffers are allowed.
*
* @param inputs inputs to read data from, contents may change after the call
* @param outputs outputs to write into for data generated, ready for reading
* the result data from after the call
*/ */
public void encode(ECChunk[] inputs, ECChunk[] outputs); public void encode(ECChunk[] inputs, ECChunk[] outputs);

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode;
import java.nio.ByteBuffer;
/**
* An abstract buffer allocator used for test.
*/
public abstract class BufferAllocator {
private boolean usingDirect = false;
public BufferAllocator(boolean usingDirect) {
this.usingDirect = usingDirect;
}
protected boolean isUsingDirect() {
return usingDirect;
}
/**
* Allocate and return a ByteBuffer of specified length.
* @param bufferLen
* @return
*/
public abstract ByteBuffer allocate(int bufferLen);
/**
* A simple buffer allocator that just uses ByteBuffer's
* allocate/allocateDirect API.
*/
public static class SimpleBufferAllocator extends BufferAllocator {
public SimpleBufferAllocator(boolean usingDirect) {
super(usingDirect);
}
@Override
public ByteBuffer allocate(int bufferLen) {
return isUsingDirect() ? ByteBuffer.allocateDirect(bufferLen) :
ByteBuffer.allocate(bufferLen);
}
}
/**
* A buffer allocator that allocates a buffer from an existing large buffer by
* slice calling, but if no available space just degrades as
* SimpleBufferAllocator. So please ensure enough space for it.
*/
public static class SlicedBufferAllocator extends BufferAllocator {
private ByteBuffer overallBuffer;
public SlicedBufferAllocator(boolean usingDirect, int totalBufferLen) {
super(usingDirect);
overallBuffer = isUsingDirect() ?
ByteBuffer.allocateDirect(totalBufferLen) :
ByteBuffer.allocate(totalBufferLen);
}
@Override
public ByteBuffer allocate(int bufferLen) {
if (bufferLen > overallBuffer.capacity() - overallBuffer.position()) {
// If no available space for the requested length, then allocate new
return isUsingDirect() ? ByteBuffer.allocateDirect(bufferLen) :
ByteBuffer.allocate(bufferLen);
}
overallBuffer.limit(overallBuffer.position() + bufferLen);
ByteBuffer result = overallBuffer.slice();
overallBuffer.position(overallBuffer.position() + bufferLen);
return result;
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.io.erasurecode; package org.apache.hadoop.io.erasurecode;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.erasurecode.BufferAllocator.SimpleBufferAllocator;
import org.apache.hadoop.io.erasurecode.BufferAllocator.SlicedBufferAllocator;
import org.apache.hadoop.io.erasurecode.rawcoder.util.DumpUtil; import org.apache.hadoop.io.erasurecode.rawcoder.util.DumpUtil;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -40,6 +42,7 @@ public abstract class TestCoderBase {
protected int numParityUnits; protected int numParityUnits;
protected int baseChunkSize = 513; protected int baseChunkSize = 513;
private int chunkSize = baseChunkSize; private int chunkSize = baseChunkSize;
private BufferAllocator allocator;
private byte[] zeroChunkBytes; private byte[] zeroChunkBytes;
@ -70,6 +73,17 @@ public abstract class TestCoderBase {
this.zeroChunkBytes = new byte[chunkSize]; // With ZERO by default this.zeroChunkBytes = new byte[chunkSize]; // With ZERO by default
} }
protected void prepareBufferAllocator(boolean usingSlicedBuffer) {
if (usingSlicedBuffer) {
int roughEstimationSpace =
chunkSize * (numDataUnits + numParityUnits) * 10;
allocator = new SlicedBufferAllocator(usingDirectBuffer,
roughEstimationSpace);
} else {
allocator = new SimpleBufferAllocator(usingDirectBuffer);
}
}
/** /**
* Set true during setup if want to dump test settings and coding data, * Set true during setup if want to dump test settings and coding data,
* useful in debugging. * useful in debugging.
@ -299,8 +313,7 @@ public abstract class TestCoderBase {
*/ */
int startOffset = startBufferWithZero ? 0 : 11; // 11 is arbitrary int startOffset = startBufferWithZero ? 0 : 11; // 11 is arbitrary
int allocLen = startOffset + bufferLen + startOffset; int allocLen = startOffset + bufferLen + startOffset;
ByteBuffer buffer = usingDirectBuffer ? ByteBuffer buffer = allocator.allocate(allocLen);
ByteBuffer.allocateDirect(allocLen) : ByteBuffer.allocate(allocLen);
buffer.limit(startOffset + bufferLen); buffer.limit(startOffset + bufferLen);
fillDummyData(buffer, startOffset); fillDummyData(buffer, startOffset);
startBufferWithZero = ! startBufferWithZero; startBufferWithZero = ! startBufferWithZero;

View File

@ -65,14 +65,14 @@ public abstract class TestErasureCoderBase extends TestCoderBase {
* The following runs will use 3 different chunkSize for inputs and outputs, * The following runs will use 3 different chunkSize for inputs and outputs,
* to verify the same encoder/decoder can process variable width of data. * to verify the same encoder/decoder can process variable width of data.
*/ */
performTestCoding(baseChunkSize); performTestCoding(baseChunkSize, true);
performTestCoding(baseChunkSize - 17); performTestCoding(baseChunkSize - 17, false);
performTestCoding(baseChunkSize + 16); performTestCoding(baseChunkSize + 16, true);
} }
private void performTestCoding(int chunkSize) { private void performTestCoding(int chunkSize, boolean usingSlicedBuffer) {
setChunkSize(chunkSize); setChunkSize(chunkSize);
prepareBufferAllocator(usingSlicedBuffer);
// Generate data and encode // Generate data and encode
ECBlockGroup blockGroup = prepareBlockGroupForEncoding(); ECBlockGroup blockGroup = prepareBlockGroupForEncoding();

View File

@ -68,9 +68,9 @@ public abstract class TestRawCoderBase extends TestCoderBase {
* The following runs will use 3 different chunkSize for inputs and outputs, * The following runs will use 3 different chunkSize for inputs and outputs,
* to verify the same encoder/decoder can process variable width of data. * to verify the same encoder/decoder can process variable width of data.
*/ */
performTestCoding(baseChunkSize, false, false); performTestCoding(baseChunkSize, true, false, false);
performTestCoding(baseChunkSize - 17, false, false); performTestCoding(baseChunkSize - 17, false, false, false);
performTestCoding(baseChunkSize + 16, false, false); performTestCoding(baseChunkSize + 16, true, false, false);
} }
/** /**
@ -82,7 +82,7 @@ public abstract class TestRawCoderBase extends TestCoderBase {
prepareCoders(); prepareCoders();
try { try {
performTestCoding(baseChunkSize, true, false); performTestCoding(baseChunkSize, false, true, false);
Assert.fail("Encoding test with bad input should fail"); Assert.fail("Encoding test with bad input should fail");
} catch (Exception e) { } catch (Exception e) {
// Expected // Expected
@ -98,7 +98,7 @@ public abstract class TestRawCoderBase extends TestCoderBase {
prepareCoders(); prepareCoders();
try { try {
performTestCoding(baseChunkSize, false, true); performTestCoding(baseChunkSize, false, false, true);
Assert.fail("Decoding test with bad output should fail"); Assert.fail("Decoding test with bad output should fail");
} catch (Exception e) { } catch (Exception e) {
// Expected // Expected
@ -122,9 +122,10 @@ public abstract class TestRawCoderBase extends TestCoderBase {
} }
} }
private void performTestCoding(int chunkSize, private void performTestCoding(int chunkSize, boolean usingSlicedBuffer,
boolean useBadInput, boolean useBadOutput) { boolean useBadInput, boolean useBadOutput) {
setChunkSize(chunkSize); setChunkSize(chunkSize);
prepareBufferAllocator(usingSlicedBuffer);
dumpSetting(); dumpSetting();