HADOOP-11920. Refactor some codes for erasure coders. Contributed by Kai Zheng.

This commit is contained in:
Zhe Zhang 2015-05-18 10:09:57 -07:00 committed by Zhe Zhang
parent 9c7a78c874
commit 09c3a375ba
14 changed files with 156 additions and 139 deletions

View File

@ -46,3 +46,5 @@
HADOOP-11841. Remove unused ecschema-def.xml files. (szetszwo) HADOOP-11841. Remove unused ecschema-def.xml files. (szetszwo)
HADOOP-11921. Enhance tests for erasure coders. (Kai Zheng via Zhe Zhang) HADOOP-11921. Enhance tests for erasure coders. (Kai Zheng via Zhe Zhang)
HADOOP-11920. Refactor some codes for erasure coders. (Kai Zheng via Zhe Zhang)

View File

@ -143,10 +143,6 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Supported erasure codec classes */ /** Supported erasure codec classes */
public static final String IO_ERASURECODE_CODECS_KEY = "io.erasurecode.codecs"; public static final String IO_ERASURECODE_CODECS_KEY = "io.erasurecode.codecs";
/** Use XOR raw coder when possible for the RS codec */
public static final String IO_ERASURECODE_CODEC_RS_USEXOR_KEY =
"io.erasurecode.codec.rs.usexor";
/** Raw coder factory for the RS codec */ /** Raw coder factory for the RS codec */
public static final String IO_ERASURECODE_CODEC_RS_RAWCODER_KEY = public static final String IO_ERASURECODE_CODEC_RS_RAWCODER_KEY =
"io.erasurecode.codec.rs.rawcoder"; "io.erasurecode.codec.rs.rawcoder";

View File

@ -71,7 +71,7 @@ public class ECChunk {
* @param chunks * @param chunks
* @return an array of byte array * @return an array of byte array
*/ */
public static byte[][] toArray(ECChunk[] chunks) { public static byte[][] toArrays(ECChunk[] chunks) {
byte[][] bytesArr = new byte[chunks.length][]; byte[][] bytesArr = new byte[chunks.length][];
ByteBuffer buffer; ByteBuffer buffer;

View File

@ -90,11 +90,7 @@ public abstract class AbstractErasureCoder
throw new RuntimeException("Failed to create raw coder", e); throw new RuntimeException("Failed to create raw coder", e);
} }
if (fact != null) { return isEncoder ? fact.createEncoder() : fact.createDecoder();
return isEncoder ? fact.createEncoder() : fact.createDecoder();
}
return null;
} }
@Override @Override

View File

@ -17,13 +17,11 @@
*/ */
package org.apache.hadoop.io.erasurecode.coder; package org.apache.hadoop.io.erasurecode.coder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder;
/** /**
* Reed-Solomon erasure decoder that decodes a block group. * Reed-Solomon erasure decoder that decodes a block group.
@ -32,38 +30,14 @@ import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder;
*/ */
public class RSErasureDecoder extends AbstractErasureDecoder { public class RSErasureDecoder extends AbstractErasureDecoder {
private RawErasureDecoder rsRawDecoder; private RawErasureDecoder rsRawDecoder;
private RawErasureDecoder xorRawDecoder;
private boolean useXorWhenPossible = true;
@Override @Override
public void setConf(Configuration conf) {
super.setConf(conf);
if (conf != null) {
this.useXorWhenPossible = conf.getBoolean(
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_USEXOR_KEY, true);
}
}
@Override
protected ErasureCodingStep prepareDecodingStep(final ECBlockGroup blockGroup) { protected ErasureCodingStep prepareDecodingStep(final ECBlockGroup blockGroup) {
RawErasureDecoder rawDecoder;
ECBlock[] inputBlocks = getInputBlocks(blockGroup); ECBlock[] inputBlocks = getInputBlocks(blockGroup);
ECBlock[] outputBlocks = getOutputBlocks(blockGroup); ECBlock[] outputBlocks = getOutputBlocks(blockGroup);
/** RawErasureDecoder rawDecoder = checkCreateRSRawDecoder();
* Optimization: according to some benchmark, when only one block is erased
* and to be recovering, the most simple XOR scheme can be much efficient.
* We will have benchmark tests to verify this opt is effect or not.
*/
if (outputBlocks.length == 1 && useXorWhenPossible) {
rawDecoder = checkCreateXorRawDecoder();
} else {
rawDecoder = checkCreateRSRawDecoder();
}
return new ErasureDecodingStep(inputBlocks, return new ErasureDecodingStep(inputBlocks,
getErasedIndexes(inputBlocks), outputBlocks, rawDecoder); getErasedIndexes(inputBlocks), outputBlocks, rawDecoder);
} }
@ -81,19 +55,9 @@ public class RSErasureDecoder extends AbstractErasureDecoder {
return rsRawDecoder; return rsRawDecoder;
} }
private RawErasureDecoder checkCreateXorRawDecoder() {
if (xorRawDecoder == null) {
xorRawDecoder = new XORRawDecoder();
xorRawDecoder.initialize(getNumDataUnits(), 1, getChunkSize());
}
return xorRawDecoder;
}
@Override @Override
public void release() { public void release() {
if (xorRawDecoder != null) { if (rsRawDecoder != null) {
xorRawDecoder.release();
} else if (rsRawDecoder != null) {
rsRawDecoder.release(); rsRawDecoder.release();
} }
} }

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import java.nio.ByteBuffer;
import java.util.Arrays;
/** /**
* A common class of basic facilities to be shared by encoder and decoder * A common class of basic facilities to be shared by encoder and decoder
* *
@ -27,6 +30,9 @@ import org.apache.hadoop.conf.Configured;
public abstract class AbstractRawErasureCoder public abstract class AbstractRawErasureCoder
extends Configured implements RawErasureCoder { extends Configured implements RawErasureCoder {
// Hope to reset coding buffers a little faster using it
private byte[] zeroChunkBytes;
private int numDataUnits; private int numDataUnits;
private int numParityUnits; private int numParityUnits;
private int chunkSize; private int chunkSize;
@ -37,6 +43,8 @@ public abstract class AbstractRawErasureCoder
this.numDataUnits = numDataUnits; this.numDataUnits = numDataUnits;
this.numParityUnits = numParityUnits; this.numParityUnits = numParityUnits;
this.chunkSize = chunkSize; this.chunkSize = chunkSize;
zeroChunkBytes = new byte[chunkSize]; // With ZERO by default
} }
@Override @Override
@ -55,7 +63,7 @@ public abstract class AbstractRawErasureCoder
} }
@Override @Override
public boolean preferNativeBuffer() { public boolean preferDirectBuffer() {
return false; return false;
} }
@ -63,4 +71,57 @@ public abstract class AbstractRawErasureCoder
public void release() { public void release() {
// Nothing to do by default // Nothing to do by default
} }
/**
* Convert an array of heap ByteBuffers to an array of byte array.
* @param buffers
* @return an array of byte array
*/
protected static byte[][] toArrays(ByteBuffer[] buffers) {
byte[][] bytesArr = new byte[buffers.length][];
ByteBuffer buffer;
for (int i = 0; i < buffers.length; i++) {
buffer = buffers[i];
if (buffer == null) {
bytesArr[i] = null;
continue;
}
if (buffer.hasArray()) {
bytesArr[i] = buffer.array();
} else {
throw new IllegalArgumentException("Invalid ByteBuffer passed, " +
"expecting heap buffer");
}
}
return bytesArr;
}
/**
* Ensure the buffer (either input or output) ready to read or write with ZERO
* bytes fully in chunkSize.
* @param buffer
* @return the buffer itself
*/
protected ByteBuffer resetBuffer(ByteBuffer buffer) {
buffer.clear();
buffer.put(zeroChunkBytes);
buffer.position(0);
return buffer;
}
/**
* Ensure the buffer (either input or output) ready to read or write with ZERO
* bytes fully in chunkSize.
* @param buffer bytes array buffer
* @return the buffer itself
*/
protected byte[] resetBuffer(byte[] buffer) {
System.arraycopy(zeroChunkBytes, 0, buffer, 0, buffer.length);
return buffer;
}
} }

View File

@ -32,27 +32,30 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
@Override @Override
public void decode(ByteBuffer[] inputs, int[] erasedIndexes, public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) { ByteBuffer[] outputs) {
if (erasedIndexes.length == 0) { checkParameters(inputs, erasedIndexes, outputs);
return;
}
doDecode(inputs, erasedIndexes, outputs); boolean hasArray = inputs[0].hasArray();
if (hasArray) {
byte[][] newInputs = toArrays(inputs);
byte[][] newOutputs = toArrays(outputs);
doDecode(newInputs, erasedIndexes, newOutputs);
} else {
doDecode(inputs, erasedIndexes, outputs);
}
} }
/** /**
* Perform the real decoding using ByteBuffer * Perform the real decoding using Direct ByteBuffer.
* @param inputs * @param inputs Direct ByteBuffers expected
* @param erasedIndexes * @param erasedIndexes
* @param outputs * @param outputs Direct ByteBuffers expected
*/ */
protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, protected abstract void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs); ByteBuffer[] outputs);
@Override @Override
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) { public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
if (erasedIndexes.length == 0) { checkParameters(inputs, erasedIndexes, outputs);
return;
}
doDecode(inputs, erasedIndexes, outputs); doDecode(inputs, erasedIndexes, outputs);
} }
@ -69,25 +72,32 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
@Override @Override
public void decode(ECChunk[] inputs, int[] erasedIndexes, public void decode(ECChunk[] inputs, int[] erasedIndexes,
ECChunk[] outputs) { ECChunk[] outputs) {
doDecode(inputs, erasedIndexes, outputs); ByteBuffer[] newInputs = ECChunk.toBuffers(inputs);
ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs);
decode(newInputs, erasedIndexes, newOutputs);
} }
/** /**
* Perform the real decoding using chunks * Check and validate decoding parameters, throw exception accordingly. The
* checking assumes it's a MDS code. Other code can override this.
* @param inputs * @param inputs
* @param erasedIndexes * @param erasedIndexes
* @param outputs * @param outputs
*/ */
protected void doDecode(ECChunk[] inputs, int[] erasedIndexes, protected void checkParameters(Object[] inputs, int[] erasedIndexes,
ECChunk[] outputs) { Object[] outputs) {
if (inputs[0].getBuffer().hasArray()) { if (inputs.length != getNumParityUnits() + getNumDataUnits()) {
byte[][] inputBytesArr = ECChunk.toArray(inputs); throw new IllegalArgumentException("Invalid inputs length");
byte[][] outputBytesArr = ECChunk.toArray(outputs); }
doDecode(inputBytesArr, erasedIndexes, outputBytesArr);
} else { if (erasedIndexes.length != outputs.length) {
ByteBuffer[] inputBuffers = ECChunk.toBuffers(inputs); throw new IllegalArgumentException(
ByteBuffer[] outputBuffers = ECChunk.toBuffers(outputs); "erasedIndexes and outputs mismatch in length");
doDecode(inputBuffers, erasedIndexes, outputBuffers); }
if (erasedIndexes.length > getNumParityUnits()) {
throw new IllegalArgumentException(
"Too many erased, not recoverable");
} }
} }
} }

View File

@ -31,23 +31,28 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
@Override @Override
public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) { public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
assert (inputs.length == getNumDataUnits()); checkParameters(inputs, outputs);
assert (outputs.length == getNumParityUnits());
doEncode(inputs, outputs); boolean hasArray = inputs[0].hasArray();
if (hasArray) {
byte[][] newInputs = toArrays(inputs);
byte[][] newOutputs = toArrays(outputs);
doEncode(newInputs, newOutputs);
} else {
doEncode(inputs, outputs);
}
} }
/** /**
* Perform the real encoding work using ByteBuffer * Perform the real encoding work using direct ByteBuffer
* @param inputs * @param inputs Direct ByteBuffers expected
* @param outputs * @param outputs Direct ByteBuffers expected
*/ */
protected abstract void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs); protected abstract void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs);
@Override @Override
public void encode(byte[][] inputs, byte[][] outputs) { public void encode(byte[][] inputs, byte[][] outputs) {
assert (inputs.length == getNumDataUnits()); checkParameters(inputs, outputs);
assert (outputs.length == getNumParityUnits());
doEncode(inputs, outputs); doEncode(inputs, outputs);
} }
@ -61,33 +66,22 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
@Override @Override
public void encode(ECChunk[] inputs, ECChunk[] outputs) { public void encode(ECChunk[] inputs, ECChunk[] outputs) {
assert (inputs.length == getNumDataUnits()); ByteBuffer[] newInputs = ECChunk.toBuffers(inputs);
assert (outputs.length == getNumParityUnits()); ByteBuffer[] newOutputs = ECChunk.toBuffers(outputs);
encode(newInputs, newOutputs);
doEncode(inputs, outputs);
} }
/** /**
* Perform the real encoding work using chunks. * Check and validate decoding parameters, throw exception accordingly.
* @param inputs * @param inputs
* @param outputs * @param outputs
*/ */
protected void doEncode(ECChunk[] inputs, ECChunk[] outputs) { protected void checkParameters(Object[] inputs, Object[] outputs) {
/** if (inputs.length != getNumDataUnits()) {
* Note callers may pass byte array, or ByteBuffer via ECChunk according throw new IllegalArgumentException("Invalid inputs length");
* to how ECChunk is created. Some implementations of coder use byte array }
* (ex: pure Java), some use native ByteBuffer (ex: ISA-L), all for the if (outputs.length != getNumParityUnits()) {
* better performance. throw new IllegalArgumentException("Invalid outputs length");
*/
if (inputs[0].getBuffer().hasArray()) {
byte[][] inputBytesArr = ECChunk.toArray(inputs);
byte[][] outputBytesArr = ECChunk.toArray(outputs);
doEncode(inputBytesArr, outputBytesArr);
} else {
ByteBuffer[] inputBuffers = ECChunk.toBuffers(inputs);
ByteBuffer[] outputBuffers = ECChunk.toBuffers(outputs);
doEncode(inputBuffers, outputBuffers);
} }
} }
} }

View File

@ -64,13 +64,13 @@ public interface RawErasureCoder extends Configurable {
public int getChunkSize(); public int getChunkSize();
/** /**
* Tell if native or off-heap buffer is preferred or not. It's for callers to * Tell if direct buffer is preferred or not. It's for callers to
* decide how to allocate coding chunk buffers, either on heap or off heap. * decide how to allocate coding chunk buffers, using DirectByteBuffer or
* It will return false by default. * bytes array. It will return false by default.
* @return true if native buffer is preferred for performance consideration, * @return true if native buffer is preferred for performance consideration,
* otherwise false. * otherwise false.
*/ */
public boolean preferNativeBuffer(); public boolean preferDirectBuffer();
/** /**
* Should be called when release this coder. Good chance to release encoding * Should be called when release this coder. Good chance to release encoding

View File

@ -31,24 +31,30 @@ import java.nio.ByteBuffer;
public interface RawErasureDecoder extends RawErasureCoder { public interface RawErasureDecoder extends RawErasureCoder {
/** /**
* Decode with inputs and erasedIndexes, generates outputs * Decode with inputs and erasedIndexes, generates outputs.
* @param inputs * @param inputs inputs to read data from
* @param outputs * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs outputs to write into for data generated according to
* erasedIndexes
*/ */
public void decode(ByteBuffer[] inputs, int[] erasedIndexes, public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs); ByteBuffer[] outputs);
/** /**
* Decode with inputs and erasedIndexes, generates outputs * Decode with inputs and erasedIndexes, generates outputs.
* @param inputs * @param inputs inputs to read data from
* @param outputs * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs outputs to write into for data generated according to
* erasedIndexes
*/ */
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs); public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs);
/** /**
* Decode with inputs and erasedIndexes, generates outputs * Decode with inputs and erasedIndexes, generates outputs.
* @param inputs * @param inputs inputs to read data from
* @param outputs * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs outputs to write into for data generated according to
* erasedIndexes
*/ */
public void decode(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs); public void decode(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs);

View File

@ -27,17 +27,11 @@ public class XORRawDecoder extends AbstractRawErasureDecoder {
@Override @Override
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) { ByteBuffer[] outputs) {
assert(erasedIndexes.length == outputs.length); resetBuffer(outputs[0]);
assert(erasedIndexes.length <= 1);
int bufSize = inputs[0].remaining(); int bufSize = getChunkSize();
int erasedIdx = erasedIndexes[0]; int erasedIdx = erasedIndexes[0];
// Set the output to zeros.
for (int j = 0; j < bufSize; j++) {
outputs[0].put(j, (byte) 0);
}
// Process the inputs. // Process the inputs.
for (int i = 0; i < inputs.length; i++) { for (int i = 0; i < inputs.length; i++) {
// Skip the erased location. // Skip the erased location.
@ -52,19 +46,13 @@ public class XORRawDecoder extends AbstractRawErasureDecoder {
} }
@Override @Override
protected void doDecode(byte[][] inputs, int[] erasedIndexes, protected void doDecode(byte[][] inputs,
byte[][] outputs) { int[] erasedIndexes, byte[][] outputs) {
assert(erasedIndexes.length == outputs.length); resetBuffer(outputs[0]);
assert(erasedIndexes.length <= 1);
int bufSize = inputs[0].length; int bufSize = getChunkSize();
int erasedIdx = erasedIndexes[0]; int erasedIdx = erasedIndexes[0];
// Set the output to zeros.
for (int j = 0; j < bufSize; j++) {
outputs[0][j] = 0;
}
// Process the inputs. // Process the inputs.
for (int i = 0; i < inputs.length; i++) { for (int i = 0; i < inputs.length; i++) {
// Skip the erased location. // Skip the erased location.

View File

@ -26,8 +26,9 @@ public class XORRawEncoder extends AbstractRawErasureEncoder {
@Override @Override
protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
int bufSize = inputs[0].remaining(); resetBuffer(outputs[0]);
int bufSize = getChunkSize();
// Get the first buffer's data. // Get the first buffer's data.
for (int j = 0; j < bufSize; j++) { for (int j = 0; j < bufSize; j++) {
outputs[0].put(j, inputs[0].get(j)); outputs[0].put(j, inputs[0].get(j));
@ -43,8 +44,9 @@ public class XORRawEncoder extends AbstractRawErasureEncoder {
@Override @Override
protected void doEncode(byte[][] inputs, byte[][] outputs) { protected void doEncode(byte[][] inputs, byte[][] outputs) {
int bufSize = inputs[0].length; resetBuffer(outputs[0]);
int bufSize = getChunkSize();
// Get the first buffer's data. // Get the first buffer's data.
for (int j = 0; j < bufSize; j++) { for (int j = 0; j < bufSize; j++) {
outputs[0][j] = inputs[0][j]; outputs[0][j] = inputs[0][j];

View File

@ -75,8 +75,8 @@ public abstract class TestCoderBase {
*/ */
protected void compareAndVerify(ECChunk[] erasedChunks, protected void compareAndVerify(ECChunk[] erasedChunks,
ECChunk[] recoveredChunks) { ECChunk[] recoveredChunks) {
byte[][] erased = ECChunk.toArray(erasedChunks); byte[][] erased = ECChunk.toArrays(erasedChunks);
byte[][] recovered = ECChunk.toArray(recoveredChunks); byte[][] recovered = ECChunk.toArrays(recoveredChunks);
boolean result = Arrays.deepEquals(erased, recovered); boolean result = Arrays.deepEquals(erased, recovered);
assertTrue("Decoding and comparing failed.", result); assertTrue("Decoding and comparing failed.", result);
} }

View File

@ -56,14 +56,12 @@ public class TestRSErasureCoder extends TestErasureCoderBase {
* This tests if the two configuration items work or not. * This tests if the two configuration items work or not.
*/ */
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, conf.set(CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, RSRawErasureCoderFactory.class.getCanonicalName());
RSRawErasureCoderFactory.class.getCanonicalName());
conf.setBoolean(
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_USEXOR_KEY, false);
prepare(conf, 10, 4, new int[]{0}); prepare(conf, 10, 4, new int[]{0});
testCoding(true); testCoding(true);
testCoding(true);
} }
@Test @Test