HDFS-8382 Remove chunkSize and initialize from erasure coder. Contributed by Kai Zheng
This commit is contained in:
parent
0ed92e5b13
commit
b30e96bfb4
|
@ -19,7 +19,6 @@ package org.apache.hadoop.io.erasurecode.codec;
|
|||
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.io.erasurecode.coder.*;
|
||||
import org.apache.hadoop.io.erasurecode.grouper.BlockGrouper;
|
||||
|
||||
/**
|
||||
|
@ -28,10 +27,9 @@ import org.apache.hadoop.io.erasurecode.grouper.BlockGrouper;
|
|||
public abstract class AbstractErasureCodec extends Configured
|
||||
implements ErasureCodec {
|
||||
|
||||
private ECSchema schema;
|
||||
private final ECSchema schema;
|
||||
|
||||
@Override
|
||||
public void setSchema(ECSchema schema) {
|
||||
public AbstractErasureCodec(ECSchema schema) {
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
|
@ -39,7 +37,7 @@ public abstract class AbstractErasureCodec extends Configured
|
|||
return schema.getCodecName();
|
||||
}
|
||||
|
||||
protected ECSchema getSchema() {
|
||||
public ECSchema getSchema() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
|
@ -50,39 +48,4 @@ public abstract class AbstractErasureCodec extends Configured
|
|||
|
||||
return blockGrouper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ErasureCoder createEncoder() {
|
||||
ErasureCoder encoder = doCreateEncoder();
|
||||
prepareErasureCoder(encoder);
|
||||
return encoder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new encoder instance to be initialized afterwards.
|
||||
* @return encoder
|
||||
*/
|
||||
protected abstract ErasureCoder doCreateEncoder();
|
||||
|
||||
@Override
|
||||
public ErasureCoder createDecoder() {
|
||||
ErasureCoder decoder = doCreateDecoder();
|
||||
prepareErasureCoder(decoder);
|
||||
return decoder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new decoder instance to be initialized afterwards.
|
||||
* @return decoder
|
||||
*/
|
||||
protected abstract ErasureCoder doCreateDecoder();
|
||||
|
||||
private void prepareErasureCoder(ErasureCoder erasureCoder) {
|
||||
if (getSchema() == null) {
|
||||
throw new RuntimeException("No schema been set yet");
|
||||
}
|
||||
|
||||
erasureCoder.setConf(getConf());
|
||||
erasureCoder.initialize(getSchema());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.io.erasurecode.codec;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.io.erasurecode.coder.ErasureCoder;
|
||||
import org.apache.hadoop.io.erasurecode.grouper.BlockGrouper;
|
||||
|
||||
|
@ -29,12 +28,6 @@ import org.apache.hadoop.io.erasurecode.grouper.BlockGrouper;
|
|||
*/
|
||||
public interface ErasureCodec extends Configurable {
|
||||
|
||||
/**
|
||||
* Set EC schema to be used by this codec.
|
||||
* @param schema
|
||||
*/
|
||||
public void setSchema(ECSchema schema);
|
||||
|
||||
/**
|
||||
* Create block grouper
|
||||
* @return block grouper
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.io.erasurecode.codec;
|
||||
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.io.erasurecode.coder.ErasureCoder;
|
||||
import org.apache.hadoop.io.erasurecode.coder.RSErasureDecoder;
|
||||
import org.apache.hadoop.io.erasurecode.coder.RSErasureEncoder;
|
||||
|
@ -26,13 +27,17 @@ import org.apache.hadoop.io.erasurecode.coder.RSErasureEncoder;
|
|||
*/
|
||||
public class RSErasureCodec extends AbstractErasureCodec {
|
||||
|
||||
@Override
|
||||
protected ErasureCoder doCreateEncoder() {
|
||||
return new RSErasureEncoder();
|
||||
public RSErasureCodec(ECSchema schema) {
|
||||
super(schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ErasureCoder doCreateDecoder() {
|
||||
return new RSErasureDecoder();
|
||||
public ErasureCoder createEncoder() {
|
||||
return new RSErasureEncoder(getSchema());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ErasureCoder createDecoder() {
|
||||
return new RSErasureDecoder(getSchema());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,19 +27,18 @@ import org.apache.hadoop.io.erasurecode.coder.XORErasureEncoder;
|
|||
*/
|
||||
public class XORErasureCodec extends AbstractErasureCodec {
|
||||
|
||||
@Override
|
||||
public void setSchema(ECSchema schema) {
|
||||
super.setSchema(schema);
|
||||
public XORErasureCodec(ECSchema schema) {
|
||||
super(schema);
|
||||
assert(schema.getNumParityUnits() == 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ErasureCoder doCreateEncoder() {
|
||||
return new XORErasureEncoder();
|
||||
public ErasureCoder createEncoder() {
|
||||
return new XORErasureEncoder(getSchema());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ErasureCoder doCreateDecoder() {
|
||||
return new XORErasureDecoder();
|
||||
public ErasureCoder createDecoder() {
|
||||
return new XORErasureDecoder(getSchema());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,18 +33,18 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
|||
public abstract class AbstractErasureCoder
|
||||
extends Configured implements ErasureCoder {
|
||||
|
||||
private int numDataUnits;
|
||||
private int numParityUnits;
|
||||
private int chunkSize;
|
||||
private final int numDataUnits;
|
||||
private final int numParityUnits;
|
||||
|
||||
/**
|
||||
* Create raw decoder using the factory specified by rawCoderFactoryKey
|
||||
* @param rawCoderFactoryKey
|
||||
* @return raw decoder
|
||||
*/
|
||||
protected RawErasureDecoder createRawDecoder(String rawCoderFactoryKey) {
|
||||
protected RawErasureDecoder createRawDecoder(
|
||||
String rawCoderFactoryKey, int dataUnitsCount, int parityUnitsCount) {
|
||||
RawErasureCoder rawCoder = createRawCoder(getConf(),
|
||||
rawCoderFactoryKey, false);
|
||||
rawCoderFactoryKey, false, dataUnitsCount, parityUnitsCount);
|
||||
return (RawErasureDecoder) rawCoder;
|
||||
}
|
||||
|
||||
|
@ -53,9 +53,10 @@ public abstract class AbstractErasureCoder
|
|||
* @param rawCoderFactoryKey
|
||||
* @return raw encoder
|
||||
*/
|
||||
protected RawErasureEncoder createRawEncoder(String rawCoderFactoryKey) {
|
||||
protected RawErasureEncoder createRawEncoder(
|
||||
String rawCoderFactoryKey, int dataUnitsCount, int parityUnitsCount) {
|
||||
RawErasureCoder rawCoder = createRawCoder(getConf(),
|
||||
rawCoderFactoryKey, true);
|
||||
rawCoderFactoryKey, true, dataUnitsCount, parityUnitsCount);
|
||||
return (RawErasureEncoder) rawCoder;
|
||||
}
|
||||
|
||||
|
@ -67,7 +68,8 @@ public abstract class AbstractErasureCoder
|
|||
* @return raw coder
|
||||
*/
|
||||
public static RawErasureCoder createRawCoder(Configuration conf,
|
||||
String rawCoderFactoryKey, boolean isEncoder) {
|
||||
String rawCoderFactoryKey, boolean isEncoder, int numDataUnits,
|
||||
int numParityUnits) {
|
||||
|
||||
if (conf == null) {
|
||||
return null;
|
||||
|
@ -90,21 +92,17 @@ public abstract class AbstractErasureCoder
|
|||
throw new RuntimeException("Failed to create raw coder", e);
|
||||
}
|
||||
|
||||
return isEncoder ? fact.createEncoder() : fact.createDecoder();
|
||||
return isEncoder ? fact.createEncoder(numDataUnits, numParityUnits) :
|
||||
fact.createDecoder(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(int numDataUnits, int numParityUnits,
|
||||
int chunkSize) {
|
||||
public AbstractErasureCoder(int numDataUnits, int numParityUnits) {
|
||||
this.numDataUnits = numDataUnits;
|
||||
this.numParityUnits = numParityUnits;
|
||||
this.chunkSize = chunkSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(ECSchema schema) {
|
||||
initialize(schema.getNumDataUnits(), schema.getNumParityUnits(),
|
||||
schema.getChunkSize());
|
||||
public AbstractErasureCoder(ECSchema schema) {
|
||||
this(schema.getNumDataUnits(), schema.getNumParityUnits());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -118,12 +116,7 @@ public abstract class AbstractErasureCoder
|
|||
}
|
||||
|
||||
@Override
|
||||
public int getChunkSize() {
|
||||
return chunkSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preferNativeBuffer() {
|
||||
public boolean preferDirectBuffer() {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.io.erasurecode.coder;
|
|||
|
||||
import org.apache.hadoop.io.erasurecode.ECBlock;
|
||||
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
|
||||
/**
|
||||
* An abstract erasure decoder that's to be inherited by new decoders.
|
||||
|
@ -27,6 +28,14 @@ import org.apache.hadoop.io.erasurecode.ECBlockGroup;
|
|||
*/
|
||||
public abstract class AbstractErasureDecoder extends AbstractErasureCoder {
|
||||
|
||||
public AbstractErasureDecoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
public AbstractErasureDecoder(ECSchema schema) {
|
||||
super(schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ErasureCodingStep calculateCoding(ECBlockGroup blockGroup) {
|
||||
// We may have more than this when considering complicate cases. HADOOP-11550
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.io.erasurecode.coder;
|
|||
|
||||
import org.apache.hadoop.io.erasurecode.ECBlock;
|
||||
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
|
||||
/**
|
||||
* An abstract erasure encoder that's to be inherited by new encoders.
|
||||
|
@ -27,6 +28,14 @@ import org.apache.hadoop.io.erasurecode.ECBlockGroup;
|
|||
*/
|
||||
public abstract class AbstractErasureEncoder extends AbstractErasureCoder {
|
||||
|
||||
public AbstractErasureEncoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
public AbstractErasureEncoder(ECSchema schema) {
|
||||
super(schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ErasureCodingStep calculateCoding(ECBlockGroup blockGroup) {
|
||||
// We may have more than this when considering complicate cases. HADOOP-11550
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.io.erasurecode.coder;
|
|||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
|
||||
/**
|
||||
* An erasure coder to perform encoding or decoding given a group. Generally it
|
||||
|
@ -37,20 +36,6 @@ import org.apache.hadoop.io.erasurecode.ECSchema;
|
|||
*/
|
||||
public interface ErasureCoder extends Configurable {
|
||||
|
||||
/**
|
||||
* Initialize with the important parameters for the code.
|
||||
* @param numDataUnits how many data inputs for the coding
|
||||
* @param numParityUnits how many parity outputs the coding generates
|
||||
* @param chunkSize the size of the input/output buffer
|
||||
*/
|
||||
public void initialize(int numDataUnits, int numParityUnits, int chunkSize);
|
||||
|
||||
/**
|
||||
* Initialize with an EC schema.
|
||||
* @param schema
|
||||
*/
|
||||
public void initialize(ECSchema schema);
|
||||
|
||||
/**
|
||||
* The number of data input units for the coding. A unit can be a byte,
|
||||
* chunk or buffer or even a block.
|
||||
|
@ -65,12 +50,6 @@ public interface ErasureCoder extends Configurable {
|
|||
*/
|
||||
public int getNumParityUnits();
|
||||
|
||||
/**
|
||||
* Chunk buffer size for the input/output
|
||||
* @return chunk buffer size
|
||||
*/
|
||||
public int getChunkSize();
|
||||
|
||||
/**
|
||||
* Calculate the encoding or decoding steps given a block blockGroup.
|
||||
*
|
||||
|
@ -83,13 +62,13 @@ public interface ErasureCoder extends Configurable {
|
|||
public ErasureCodingStep calculateCoding(ECBlockGroup blockGroup);
|
||||
|
||||
/**
|
||||
* Tell if native or off-heap buffer is preferred or not. It's for callers to
|
||||
* Tell if direct or off-heap buffer is preferred or not. It's for callers to
|
||||
* decide how to allocate coding chunk buffers, either on heap or off heap.
|
||||
* It will return false by default.
|
||||
* @return true if native buffer is preferred for performance consideration,
|
||||
* @return true if direct buffer is preferred for performance consideration,
|
||||
* otherwise false.
|
||||
*/
|
||||
public boolean preferNativeBuffer();
|
||||
public boolean preferDirectBuffer();
|
||||
|
||||
/**
|
||||
* Release the resources if any. Good chance to invoke RawErasureCoder#release.
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.io.erasurecode.coder;
|
|||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.io.erasurecode.ECBlock;
|
||||
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||
|
||||
|
@ -31,6 +32,14 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
|||
public class RSErasureDecoder extends AbstractErasureDecoder {
|
||||
private RawErasureDecoder rsRawDecoder;
|
||||
|
||||
public RSErasureDecoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
public RSErasureDecoder(ECSchema schema) {
|
||||
super(schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ErasureCodingStep prepareDecodingStep(final ECBlockGroup blockGroup) {
|
||||
|
||||
|
@ -45,12 +54,11 @@ public class RSErasureDecoder extends AbstractErasureDecoder {
|
|||
private RawErasureDecoder checkCreateRSRawDecoder() {
|
||||
if (rsRawDecoder == null) {
|
||||
rsRawDecoder = createRawDecoder(
|
||||
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY);
|
||||
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
|
||||
getNumDataUnits(), getNumParityUnits());
|
||||
if (rsRawDecoder == null) {
|
||||
rsRawDecoder = new RSRawDecoder();
|
||||
rsRawDecoder = new RSRawDecoder(getNumDataUnits(), getNumParityUnits());
|
||||
}
|
||||
rsRawDecoder.initialize(getNumDataUnits(),
|
||||
getNumParityUnits(), getChunkSize());
|
||||
}
|
||||
return rsRawDecoder;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.io.erasurecode.coder;
|
|||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.io.erasurecode.ECBlock;
|
||||
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
||||
|
||||
|
@ -31,6 +32,14 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
|||
public class RSErasureEncoder extends AbstractErasureEncoder {
|
||||
private RawErasureEncoder rawEncoder;
|
||||
|
||||
public RSErasureEncoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
public RSErasureEncoder(ECSchema schema) {
|
||||
super(schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ErasureCodingStep prepareEncodingStep(final ECBlockGroup blockGroup) {
|
||||
|
||||
|
@ -45,12 +54,11 @@ public class RSErasureEncoder extends AbstractErasureEncoder {
|
|||
private RawErasureEncoder checkCreateRSRawEncoder() {
|
||||
if (rawEncoder == null) {
|
||||
rawEncoder = createRawEncoder(
|
||||
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY);
|
||||
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
|
||||
getNumDataUnits(), getNumParityUnits());
|
||||
if (rawEncoder == null) {
|
||||
rawEncoder = new RSRawEncoder();
|
||||
rawEncoder = new RSRawEncoder(getNumDataUnits(), getNumParityUnits());
|
||||
}
|
||||
rawEncoder.initialize(getNumDataUnits(),
|
||||
getNumParityUnits(), getChunkSize());
|
||||
}
|
||||
return rawEncoder;
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.io.erasurecode.coder;
|
|||
|
||||
import org.apache.hadoop.io.erasurecode.ECBlock;
|
||||
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder;
|
||||
|
||||
|
@ -29,12 +30,19 @@ import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder;
|
|||
*/
|
||||
public class XORErasureDecoder extends AbstractErasureDecoder {
|
||||
|
||||
public XORErasureDecoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
public XORErasureDecoder(ECSchema schema) {
|
||||
super(schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ErasureCodingStep prepareDecodingStep(final ECBlockGroup blockGroup) {
|
||||
// May be configured
|
||||
RawErasureDecoder rawDecoder = new XORRawDecoder();
|
||||
rawDecoder.initialize(getNumDataUnits(),
|
||||
getNumParityUnits(), getChunkSize());
|
||||
RawErasureDecoder rawDecoder = new XORRawDecoder(
|
||||
getNumDataUnits(), getNumParityUnits());
|
||||
|
||||
ECBlock[] inputBlocks = getInputBlocks(blockGroup);
|
||||
|
||||
|
|
|
@ -19,22 +19,30 @@ package org.apache.hadoop.io.erasurecode.coder;
|
|||
|
||||
import org.apache.hadoop.io.erasurecode.ECBlock;
|
||||
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.XORRawEncoder;
|
||||
|
||||
/**
|
||||
* Xor erasure encoder that encodes a block group.
|
||||
*
|
||||
* It implements {@link ErasureEncoder}.
|
||||
* It implements {@link ErasureCoder}.
|
||||
*/
|
||||
public class XORErasureEncoder extends AbstractErasureEncoder {
|
||||
|
||||
public XORErasureEncoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
public XORErasureEncoder(ECSchema schema) {
|
||||
super(schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ErasureCodingStep prepareEncodingStep(final ECBlockGroup blockGroup) {
|
||||
// May be configured
|
||||
RawErasureEncoder rawEncoder = new XORRawEncoder();
|
||||
rawEncoder.initialize(getNumDataUnits(),
|
||||
getNumParityUnits(), getChunkSize());
|
||||
RawErasureEncoder rawEncoder = new XORRawEncoder(
|
||||
getNumDataUnits(), getNumParityUnits());
|
||||
|
||||
ECBlock[] inputBlocks = getInputBlocks(blockGroup);
|
||||
|
||||
|
|
|
@ -31,16 +31,12 @@ import java.util.Arrays;
|
|||
public abstract class AbstractRawErasureCoder
|
||||
extends Configured implements RawErasureCoder {
|
||||
|
||||
private int numDataUnits;
|
||||
private int numParityUnits;
|
||||
private int chunkSize;
|
||||
private final int numDataUnits;
|
||||
private final int numParityUnits;
|
||||
|
||||
@Override
|
||||
public void initialize(int numDataUnits, int numParityUnits,
|
||||
int chunkSize) {
|
||||
public AbstractRawErasureCoder(int numDataUnits, int numParityUnits) {
|
||||
this.numDataUnits = numDataUnits;
|
||||
this.numParityUnits = numParityUnits;
|
||||
this.chunkSize = chunkSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -53,11 +49,6 @@ public abstract class AbstractRawErasureCoder
|
|||
return numParityUnits;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getChunkSize() {
|
||||
return chunkSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preferDirectBuffer() {
|
||||
return false;
|
||||
|
|
|
@ -30,6 +30,10 @@ import java.nio.ByteBuffer;
|
|||
public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
|
||||
implements RawErasureDecoder {
|
||||
|
||||
public AbstractRawErasureDecoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
ByteBuffer[] outputs) {
|
||||
|
|
|
@ -30,6 +30,10 @@ import java.nio.ByteBuffer;
|
|||
public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
|
||||
implements RawErasureEncoder {
|
||||
|
||||
public AbstractRawErasureEncoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void encode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
|
||||
checkParameters(inputs, outputs);
|
||||
|
|
|
@ -31,9 +31,8 @@ public class RSRawDecoder extends AbstractRawErasureDecoder {
|
|||
private int[] errSignature;
|
||||
private int[] primitivePower;
|
||||
|
||||
@Override
|
||||
public void initialize(int numDataUnits, int numParityUnits, int chunkSize) {
|
||||
super.initialize(numDataUnits, numParityUnits, chunkSize);
|
||||
public RSRawDecoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
assert (getNumDataUnits() + getNumParityUnits() < RSUtil.GF.getFieldSize());
|
||||
|
||||
this.errSignature = new int[numParityUnits];
|
||||
|
|
|
@ -29,9 +29,9 @@ import java.nio.ByteBuffer;
|
|||
public class RSRawEncoder extends AbstractRawErasureEncoder {
|
||||
private int[] generatingPolynomial;
|
||||
|
||||
@Override
|
||||
public void initialize(int numDataUnits, int numParityUnits, int chunkSize) {
|
||||
super.initialize(numDataUnits, numParityUnits, chunkSize);
|
||||
public RSRawEncoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
|
||||
assert (getNumDataUnits() + getNumParityUnits() < RSUtil.GF.getFieldSize());
|
||||
|
||||
int[] primitivePower = RSUtil.getPrimitivePower(numDataUnits,
|
||||
|
|
|
@ -23,12 +23,12 @@ package org.apache.hadoop.io.erasurecode.rawcoder;
|
|||
public class RSRawErasureCoderFactory implements RawErasureCoderFactory {
|
||||
|
||||
@Override
|
||||
public RawErasureEncoder createEncoder() {
|
||||
return new RSRawEncoder();
|
||||
public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) {
|
||||
return new RSRawEncoder(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RawErasureDecoder createDecoder() {
|
||||
return new RSRawDecoder();
|
||||
public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) {
|
||||
return new RSRawDecoder(numDataUnits, numParityUnits);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,14 +35,6 @@ import org.apache.hadoop.conf.Configurable;
|
|||
*/
|
||||
public interface RawErasureCoder extends Configurable {
|
||||
|
||||
/**
|
||||
* Initialize with the important parameters for the code.
|
||||
* @param numDataUnits how many data inputs for the coding
|
||||
* @param numParityUnits how many parity outputs the coding generates
|
||||
* @param chunkSize the size of the input/output buffer
|
||||
*/
|
||||
public void initialize(int numDataUnits, int numParityUnits, int chunkSize);
|
||||
|
||||
/**
|
||||
* The number of data input units for the coding. A unit can be a byte,
|
||||
* chunk or buffer or even a block.
|
||||
|
@ -57,12 +49,6 @@ public interface RawErasureCoder extends Configurable {
|
|||
*/
|
||||
public int getNumParityUnits();
|
||||
|
||||
/**
|
||||
* Chunk buffer size for the input/output
|
||||
* @return chunk buffer size
|
||||
*/
|
||||
public int getChunkSize();
|
||||
|
||||
/**
|
||||
* Tell if direct buffer is preferred or not. It's for callers to
|
||||
* decide how to allocate coding chunk buffers, using DirectByteBuffer or
|
||||
|
|
|
@ -26,13 +26,17 @@ public interface RawErasureCoderFactory {
|
|||
|
||||
/**
|
||||
* Create raw erasure encoder.
|
||||
* @param numDataUnits
|
||||
* @param numParityUnits
|
||||
* @return raw erasure encoder
|
||||
*/
|
||||
public RawErasureEncoder createEncoder();
|
||||
public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits);
|
||||
|
||||
/**
|
||||
* Create raw erasure decoder.
|
||||
* @param numDataUnits
|
||||
* @param numParityUnits
|
||||
* @return raw erasure decoder
|
||||
*/
|
||||
public RawErasureDecoder createDecoder();
|
||||
public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,10 @@ import java.nio.ByteBuffer;
|
|||
*/
|
||||
public class XORRawDecoder extends AbstractRawErasureDecoder {
|
||||
|
||||
public XORRawDecoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||
ByteBuffer[] outputs) {
|
||||
|
|
|
@ -28,6 +28,10 @@ import java.nio.ByteBuffer;
|
|||
*/
|
||||
public class XORRawEncoder extends AbstractRawErasureEncoder {
|
||||
|
||||
public XORRawEncoder(int numDataUnits, int numParityUnits) {
|
||||
super(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
|
||||
ByteBuffer output = outputs[0];
|
||||
resetOutputBuffer(output);
|
||||
|
|
|
@ -23,12 +23,12 @@ package org.apache.hadoop.io.erasurecode.rawcoder;
|
|||
public class XORRawErasureCoderFactory implements RawErasureCoderFactory {
|
||||
|
||||
@Override
|
||||
public RawErasureEncoder createEncoder() {
|
||||
return new XORRawEncoder();
|
||||
public RawErasureEncoder createEncoder(int numDataUnits, int numParityUnits) {
|
||||
return new XORRawEncoder(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RawErasureDecoder createDecoder() {
|
||||
return new XORRawDecoder();
|
||||
public RawErasureDecoder createDecoder(int numDataUnits, int numParityUnits) {
|
||||
return new XORRawDecoder(numDataUnits, numParityUnits);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@ import org.apache.hadoop.io.erasurecode.ECBlockGroup;
|
|||
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||
import org.apache.hadoop.io.erasurecode.TestCoderBase;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
|
||||
/**
|
||||
* Erasure coder test base with utilities.
|
||||
*/
|
||||
|
@ -139,23 +141,6 @@ public abstract class TestErasureCoderBase extends TestCoderBase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create erasure encoder for test.
|
||||
* @return
|
||||
*/
|
||||
private ErasureCoder createEncoder() {
|
||||
ErasureCoder encoder;
|
||||
try {
|
||||
encoder = encoderClass.newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to create encoder", e);
|
||||
}
|
||||
|
||||
encoder.initialize(numDataUnits, numParityUnits, getChunkSize());
|
||||
encoder.setConf(getConf());
|
||||
return encoder;
|
||||
}
|
||||
|
||||
private void prepareCoders() {
|
||||
if (encoder == null) {
|
||||
encoder = createEncoder();
|
||||
|
@ -167,18 +152,39 @@ public abstract class TestErasureCoderBase extends TestCoderBase {
|
|||
}
|
||||
|
||||
/**
|
||||
* Create the erasure decoder for the test.
|
||||
* Create the raw erasure encoder to test
|
||||
* @return
|
||||
*/
|
||||
private ErasureCoder createDecoder() {
|
||||
protected ErasureCoder createEncoder() {
|
||||
ErasureCoder encoder;
|
||||
try {
|
||||
Constructor<? extends ErasureCoder> constructor =
|
||||
(Constructor<? extends ErasureCoder>)
|
||||
encoderClass.getConstructor(int.class, int.class);
|
||||
encoder = constructor.newInstance(numDataUnits, numParityUnits);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to create encoder", e);
|
||||
}
|
||||
|
||||
encoder.setConf(getConf());
|
||||
return encoder;
|
||||
}
|
||||
|
||||
/**
|
||||
* create the raw erasure decoder to test
|
||||
* @return
|
||||
*/
|
||||
protected ErasureCoder createDecoder() {
|
||||
ErasureCoder decoder;
|
||||
try {
|
||||
decoder = decoderClass.newInstance();
|
||||
Constructor<? extends ErasureCoder> constructor =
|
||||
(Constructor<? extends ErasureCoder>)
|
||||
decoderClass.getConstructor(int.class, int.class);
|
||||
decoder = constructor.newInstance(numDataUnits, numParityUnits);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to create decoder", e);
|
||||
}
|
||||
|
||||
decoder.initialize(numDataUnits, numParityUnits, getChunkSize());
|
||||
decoder.setConf(getConf());
|
||||
return decoder;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ import org.apache.hadoop.io.erasurecode.ECChunk;
|
|||
import org.apache.hadoop.io.erasurecode.TestCoderBase;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
|
||||
/**
|
||||
* Raw coder test base with utilities.
|
||||
*/
|
||||
|
@ -136,12 +138,14 @@ public abstract class TestRawCoderBase extends TestCoderBase {
|
|||
protected RawErasureEncoder createEncoder() {
|
||||
RawErasureEncoder encoder;
|
||||
try {
|
||||
encoder = encoderClass.newInstance();
|
||||
Constructor<? extends RawErasureEncoder> constructor =
|
||||
(Constructor<? extends RawErasureEncoder>)
|
||||
encoderClass.getConstructor(int.class, int.class);
|
||||
encoder = constructor.newInstance(numDataUnits, numParityUnits);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to create encoder", e);
|
||||
}
|
||||
|
||||
encoder.initialize(numDataUnits, numParityUnits, getChunkSize());
|
||||
encoder.setConf(getConf());
|
||||
return encoder;
|
||||
}
|
||||
|
@ -153,14 +157,15 @@ public abstract class TestRawCoderBase extends TestCoderBase {
|
|||
protected RawErasureDecoder createDecoder() {
|
||||
RawErasureDecoder decoder;
|
||||
try {
|
||||
decoder = decoderClass.newInstance();
|
||||
Constructor<? extends RawErasureDecoder> constructor =
|
||||
(Constructor<? extends RawErasureDecoder>)
|
||||
decoderClass.getConstructor(int.class, int.class);
|
||||
decoder = constructor.newInstance(numDataUnits, numParityUnits);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to create decoder", e);
|
||||
}
|
||||
|
||||
decoder.initialize(numDataUnits, numParityUnits, getChunkSize());
|
||||
decoder.setConf(getConf());
|
||||
return decoder;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -255,3 +255,5 @@
|
|||
(waltersu4549)
|
||||
|
||||
HDFS-7768. Change fsck to support EC files. (Takanobu Asanuma via szetszwo)
|
||||
|
||||
HDFS-8382. Remove chunkSize and initialize from erasure coder. (Kai Zheng)
|
||||
|
|
|
@ -247,8 +247,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
|
|||
numDataBlocks = schema.getNumDataUnits();
|
||||
numAllBlocks = numDataBlocks + numParityBlocks;
|
||||
|
||||
encoder = new RSRawEncoder();
|
||||
encoder.initialize(numDataBlocks, numParityBlocks, cellSize);
|
||||
encoder = new RSRawEncoder(numDataBlocks, numParityBlocks);
|
||||
|
||||
coordinator = new Coordinator(dfsClient.getConf(), numDataBlocks, numAllBlocks);
|
||||
try {
|
||||
|
|
|
@ -111,12 +111,12 @@ public final class ErasureCodingWorker {
|
|||
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
|
||||
}
|
||||
|
||||
private RawErasureEncoder newEncoder() {
|
||||
return new RSRawEncoder();
|
||||
private RawErasureEncoder newEncoder(int numDataUnits, int numParityUnits) {
|
||||
return new RSRawEncoder(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
private RawErasureDecoder newDecoder() {
|
||||
return new RSRawDecoder();
|
||||
private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
|
||||
return new RSRawDecoder(numDataUnits, numParityUnits);
|
||||
}
|
||||
|
||||
private void initializeStripedReadThreadPool(int num) {
|
||||
|
@ -517,16 +517,14 @@ public final class ErasureCodingWorker {
|
|||
// Initialize encoder
|
||||
private void initEncoderIfNecessary() {
|
||||
if (encoder == null) {
|
||||
encoder = newEncoder();
|
||||
encoder.initialize(dataBlkNum, parityBlkNum, bufferSize);
|
||||
encoder = newEncoder(dataBlkNum, parityBlkNum);
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize decoder
|
||||
private void initDecoderIfNecessary() {
|
||||
if (decoder == null) {
|
||||
decoder = newDecoder();
|
||||
decoder.initialize(dataBlkNum, parityBlkNum, bufferSize);
|
||||
decoder = newDecoder(dataBlkNum, parityBlkNum);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -287,8 +287,7 @@ public class StripedBlockUtil {
|
|||
}
|
||||
|
||||
byte[][] outputs = new byte[parityBlkNum][(int) alignedStripe.getSpanInBlock()];
|
||||
RSRawDecoder rsRawDecoder = new RSRawDecoder();
|
||||
rsRawDecoder.initialize(dataBlkNum, parityBlkNum, (int) alignedStripe.getSpanInBlock());
|
||||
RSRawDecoder rsRawDecoder = new RSRawDecoder(dataBlkNum, parityBlkNum);
|
||||
rsRawDecoder.decode(decodeInputs, decodeIndices, outputs);
|
||||
|
||||
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
|
||||
|
|
|
@ -274,8 +274,8 @@ public class TestDFSStripedOutputStream {
|
|||
System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length);
|
||||
}
|
||||
}
|
||||
final RawErasureEncoder encoder = new RSRawEncoder();
|
||||
encoder.initialize(dataBytes.length, parityBytes.length, cellSize);
|
||||
final RawErasureEncoder encoder =
|
||||
new RSRawEncoder(dataBytes.length, parityBytes.length);
|
||||
encoder.encode(dataBytes, expectedParityBytes);
|
||||
for (int i = 0; i < parityBytes.length; i++) {
|
||||
if (i != killedDnIndex) {
|
||||
|
|
|
@ -382,8 +382,7 @@ public class TestWriteReadStripedFile {
|
|||
Assert.assertEquals("The length of file should be the same to write size",
|
||||
length - startOffsetInFile, readLen);
|
||||
|
||||
RSRawDecoder rsRawDecoder = new RSRawDecoder();
|
||||
rsRawDecoder.initialize(dataBlocks, parityBlocks, 1);
|
||||
RSRawDecoder rsRawDecoder = new RSRawDecoder(dataBlocks, parityBlocks);
|
||||
byte[] expected = new byte[readLen];
|
||||
for (int i = startOffsetInFile; i < length; i++) {
|
||||
//TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
|
||||
|
|
Loading…
Reference in New Issue