HADOOP-11647. Reed-Solomon ErasureCoder. Contributed by Kai Zheng

This commit is contained in:
Kai Zheng 2015-03-20 19:15:52 +08:00 committed by Zhe Zhang
parent 90d332d6be
commit df297245a7
10 changed files with 315 additions and 6 deletions

View File

@ -26,3 +26,6 @@
HADOOP-11707. Add factory to create raw erasure coder. Contributed by Kai Zheng HADOOP-11707. Add factory to create raw erasure coder. Contributed by Kai Zheng
( Kai Zheng ) ( Kai Zheng )
HADOOP-11647. Reed-Solomon ErasureCoder. Contributed by Kai Zheng
( Kai Zheng )

View File

@ -136,6 +136,21 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final boolean IO_COMPRESSION_CODEC_LZ4_USELZ4HC_DEFAULT = public static final boolean IO_COMPRESSION_CODEC_LZ4_USELZ4HC_DEFAULT =
false; false;
/**
* Erasure Coding configuration family
*/
/** Supported erasure codec classes */
public static final String IO_ERASURECODE_CODECS_KEY = "io.erasurecode.codecs";
/** Use XOR raw coder when possible for the RS codec */
public static final String IO_ERASURECODE_CODEC_RS_USEXOR_KEY =
"io.erasurecode.codec.rs.usexor";
/** Raw coder factory for the RS codec */
public static final String IO_ERASURECODE_CODEC_RS_RAWCODER_KEY =
"io.erasurecode.codec.rs.rawcoder";
/** /**
* Service Authorization * Service Authorization
*/ */

View File

@ -17,7 +17,12 @@
*/ */
package org.apache.hadoop.io.erasurecode.coder; package org.apache.hadoop.io.erasurecode.coder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
/** /**
* A common class of basic facilities to be shared by encoder and decoder * A common class of basic facilities to be shared by encoder and decoder
@ -31,6 +36,66 @@ public abstract class AbstractErasureCoder
private int numParityUnits; private int numParityUnits;
private int chunkSize; private int chunkSize;
/**
* Create raw decoder using the factory specified by rawCoderFactoryKey
* @param rawCoderFactoryKey
* @return raw decoder
*/
protected RawErasureDecoder createRawDecoder(String rawCoderFactoryKey) {
RawErasureCoder rawCoder = createRawCoder(getConf(),
rawCoderFactoryKey, false);
return (RawErasureDecoder) rawCoder;
}
/**
* Create raw encoder using the factory specified by rawCoderFactoryKey
* @param rawCoderFactoryKey
* @return raw encoder
*/
protected RawErasureEncoder createRawEncoder(String rawCoderFactoryKey) {
RawErasureCoder rawCoder = createRawCoder(getConf(),
rawCoderFactoryKey, true);
return (RawErasureEncoder) rawCoder;
}
/**
* Create raw coder using specified conf and raw coder factory key.
* @param conf
* @param rawCoderFactoryKey
* @param isEncoder
* @return raw coder
*/
protected static RawErasureCoder createRawCoder(Configuration conf,
String rawCoderFactoryKey, boolean isEncoder) {
if (conf == null) {
return null;
}
Class<? extends RawErasureCoderFactory> factClass = null;
factClass = conf.getClass(rawCoderFactoryKey,
factClass, RawErasureCoderFactory.class);
if (factClass == null) {
return null;
}
RawErasureCoderFactory fact;
try {
fact = factClass.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException("Failed to create raw coder", e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Failed to create raw coder", e);
}
if (fact != null) {
return isEncoder ? fact.createEncoder() : fact.createDecoder();
}
return null;
}
@Override @Override
public void initialize(int numDataUnits, int numParityUnits, public void initialize(int numDataUnits, int numParityUnits,
int chunkSize) { int chunkSize) {

View File

@ -30,7 +30,8 @@ public abstract class AbstractErasureDecoder extends AbstractErasureCoder
@Override @Override
public ErasureCodingStep decode(ECBlockGroup blockGroup) { public ErasureCodingStep decode(ECBlockGroup blockGroup) {
return performDecoding(blockGroup); // We may have more than this when considering complicate cases. HADOOP-11550
return prepareDecodingStep(blockGroup);
} }
/** /**
@ -38,7 +39,8 @@ public abstract class AbstractErasureDecoder extends AbstractErasureCoder
* @param blockGroup * @param blockGroup
* @return decoding step for caller to do the real work * @return decoding step for caller to do the real work
*/ */
protected abstract ErasureCodingStep performDecoding(ECBlockGroup blockGroup); protected abstract ErasureCodingStep prepareDecodingStep(
ECBlockGroup blockGroup);
/** /**
* We have all the data blocks and parity blocks as input blocks for * We have all the data blocks and parity blocks as input blocks for

View File

@ -30,7 +30,8 @@ public abstract class AbstractErasureEncoder extends AbstractErasureCoder
@Override @Override
public ErasureCodingStep encode(ECBlockGroup blockGroup) { public ErasureCodingStep encode(ECBlockGroup blockGroup) {
return performEncoding(blockGroup); // We may have more than this when considering complicate cases. HADOOP-11550
return prepareEncodingStep(blockGroup);
} }
/** /**
@ -38,7 +39,8 @@ public abstract class AbstractErasureEncoder extends AbstractErasureCoder
* @param blockGroup * @param blockGroup
* @return encoding step for caller to do the real work * @return encoding step for caller to do the real work
*/ */
protected abstract ErasureCodingStep performEncoding(ECBlockGroup blockGroup); protected abstract ErasureCodingStep prepareEncodingStep(
ECBlockGroup blockGroup);
protected ECBlock[] getInputBlocks(ECBlockGroup blockGroup) { protected ECBlock[] getInputBlocks(ECBlockGroup blockGroup) {
return blockGroup.getDataBlocks(); return blockGroup.getDataBlocks();

View File

@ -0,0 +1,83 @@
package org.apache.hadoop.io.erasurecode.coder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.rawcoder.JRSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.XorRawDecoder;
/**
* Reed-Solomon erasure decoder that decodes a block group.
*
* It implements {@link ErasureDecoder}.
*/
public class RSErasureDecoder extends AbstractErasureDecoder {
private RawErasureDecoder rsRawDecoder;
private RawErasureDecoder xorRawDecoder;
private boolean useXorWhenPossible = true;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
if (conf != null) {
this.useXorWhenPossible = conf.getBoolean(
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_USEXOR_KEY, true);
}
}
@Override
protected ErasureCodingStep prepareDecodingStep(final ECBlockGroup blockGroup) {
RawErasureDecoder rawDecoder;
ECBlock[] inputBlocks = getInputBlocks(blockGroup);
ECBlock[] outputBlocks = getOutputBlocks(blockGroup);
/**
* Optimization: according to some benchmark, when only one block is erased
* and to be recovering, the most simple XOR scheme can be much efficient.
* We will have benchmark tests to verify this opt is effect or not.
*/
if (outputBlocks.length == 1 && useXorWhenPossible) {
rawDecoder = checkCreateXorRawDecoder();
} else {
rawDecoder = checkCreateRSRawDecoder();
}
return new ErasureDecodingStep(inputBlocks,
getErasedIndexes(inputBlocks), outputBlocks, rawDecoder);
}
private RawErasureDecoder checkCreateRSRawDecoder() {
if (rsRawDecoder == null) {
rsRawDecoder = createRawDecoder(
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY);
if (rsRawDecoder == null) {
rsRawDecoder = new JRSRawDecoder();
}
rsRawDecoder.initialize(getNumDataUnits(),
getNumParityUnits(), getChunkSize());
}
return rsRawDecoder;
}
private RawErasureDecoder checkCreateXorRawDecoder() {
if (xorRawDecoder == null) {
xorRawDecoder = new XorRawDecoder();
xorRawDecoder.initialize(getNumDataUnits(), 1, getChunkSize());
}
return xorRawDecoder;
}
@Override
public void release() {
if (xorRawDecoder != null) {
xorRawDecoder.release();
} else if (rsRawDecoder != null) {
rsRawDecoder.release();
}
}
}

View File

@ -0,0 +1,47 @@
package org.apache.hadoop.io.erasurecode.coder;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.rawcoder.JRSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
/**
* Reed-Solomon erasure encoder that encodes a block group.
*
* It implements {@link ErasureEncoder}.
*/
public class RSErasureEncoder extends AbstractErasureEncoder {
private RawErasureEncoder rawEncoder;
@Override
protected ErasureCodingStep prepareEncodingStep(final ECBlockGroup blockGroup) {
RawErasureEncoder rawEncoder = checkCreateRSRawEncoder();
ECBlock[] inputBlocks = getInputBlocks(blockGroup);
return new ErasureEncodingStep(inputBlocks,
getOutputBlocks(blockGroup), rawEncoder);
}
private RawErasureEncoder checkCreateRSRawEncoder() {
if (rawEncoder == null) {
rawEncoder = createRawEncoder(
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY);
if (rawEncoder == null) {
rawEncoder = new JRSRawEncoder();
}
rawEncoder.initialize(getNumDataUnits(),
getNumParityUnits(), getChunkSize());
}
return rawEncoder;
}
@Override
public void release() {
if (rawEncoder != null) {
rawEncoder.release();
}
}
}

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.io.erasurecode.rawcoder.XorRawDecoder;
public class XorErasureDecoder extends AbstractErasureDecoder { public class XorErasureDecoder extends AbstractErasureDecoder {
@Override @Override
protected ErasureCodingStep performDecoding(final ECBlockGroup blockGroup) { protected ErasureCodingStep prepareDecodingStep(final ECBlockGroup blockGroup) {
// May be configured // May be configured
RawErasureDecoder rawDecoder = new XorRawDecoder(); RawErasureDecoder rawDecoder = new XorRawDecoder();
rawDecoder.initialize(getNumDataUnits(), rawDecoder.initialize(getNumDataUnits(),

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.io.erasurecode.rawcoder.XorRawEncoder;
public class XorErasureEncoder extends AbstractErasureEncoder { public class XorErasureEncoder extends AbstractErasureEncoder {
@Override @Override
protected ErasureCodingStep performEncoding(final ECBlockGroup blockGroup) { protected ErasureCodingStep prepareEncodingStep(final ECBlockGroup blockGroup) {
// May be configured // May be configured
RawErasureEncoder rawEncoder = new XorRawEncoder(); RawErasureEncoder rawEncoder = new XorRawEncoder();
rawEncoder.initialize(getNumDataUnits(), rawEncoder.initialize(getNumDataUnits(),

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.coder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.erasurecode.rawcoder.JRSRawErasureCoderFactory;
import org.junit.Before;
import org.junit.Test;
/**
* Test Reed-Solomon encoding and decoding.
*/
public class TestRSErasureCoder extends TestErasureCoderBase {
@Before
public void setup() {
this.encoderClass = RSErasureEncoder.class;
this.decoderClass = RSErasureDecoder.class;
this.numDataUnits = 10;
this.numParityUnits = 1;
this.numChunksInBlock = 10;
}
@Test
public void testCodingNoDirectBuffer_10x4() {
prepare(null, 10, 4, null);
testCoding(false);
}
@Test
public void testCodingDirectBuffer_10x4() {
prepare(null, 10, 4, null);
testCoding(true);
}
@Test
public void testCodingDirectBufferWithConf_10x4() {
/**
* This tests if the two configuration items work or not.
*/
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
JRSRawErasureCoderFactory.class.getCanonicalName());
conf.setBoolean(
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_USEXOR_KEY, false);
prepare(conf, 10, 4, null);
testCoding(true);
}
@Test
public void testCodingDirectBuffer_10x4_erasure_of_2_4() {
prepare(null, 10, 4, new int[] {2, 4});
testCoding(true);
}
@Test
public void testCodingDirectBuffer_10x4_erasing_all() {
prepare(null, 10, 4, new int[] {0, 1, 2, 3});
testCoding(true);
}
@Test
public void testCodingNoDirectBuffer_3x3() {
prepare(null, 3, 3, null);
testCoding(false);
}
@Test
public void testCodingDirectBuffer_3x3() {
prepare(null, 3, 3, null);
testCoding(true);
}
}