HADOOP-13200. Implement customizable and configurable erasure coders. Contributed by Tim Yao.

This commit is contained in:
Wei-Chiu Chuang 2017-04-27 12:39:47 -07:00
parent ddaeb3e497
commit bbf8cac14d
22 changed files with 502 additions and 76 deletions

View File

@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeXORRawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This class registers all coder implementations.
*
* {@link CodecRegistry} maps codec names to coder factories. All coder
* factories are dynamically identified and loaded using ServiceLoader.
*/
@InterfaceAudience.Private
public final class CodecRegistry {
private static final Logger LOG =
LoggerFactory.getLogger(CodecRegistry.class);
private static CodecRegistry instance = new CodecRegistry();
public static CodecRegistry getInstance() {
return instance;
}
private Map<String, List<RawErasureCoderFactory>> coderMap;
private Map<String, String[]> coderNameMap;
private CodecRegistry() {
coderMap = new HashMap<>();
coderNameMap = new HashMap<>();
final ServiceLoader<RawErasureCoderFactory> coderFactories =
ServiceLoader.load(RawErasureCoderFactory.class);
updateCoders(coderFactories);
}
/**
* Update coderMap and coderNameMap with iterable type of coder factories.
* @param coderFactories
*/
@VisibleForTesting
void updateCoders(Iterable<RawErasureCoderFactory> coderFactories) {
for (RawErasureCoderFactory coderFactory : coderFactories) {
String codecName = coderFactory.getCodecName();
List<RawErasureCoderFactory> coders = coderMap.get(codecName);
if (coders == null) {
coders = new ArrayList<>();
coders.add(coderFactory);
coderMap.put(codecName, coders);
LOG.debug("Codec registered: codec = {}, coder = {}",
coderFactory.getCodecName(), coderFactory.getCoderName());
} else {
Boolean hasConflit = false;
for (RawErasureCoderFactory coder : coders) {
if (coder.getCoderName().equals(coderFactory.getCoderName())) {
hasConflit = true;
LOG.error("Coder {} cannot be registered because its coder name " +
"{} has conflict with {}", coderFactory.getClass().getName(),
coderFactory.getCoderName(), coder.getClass().getName());
break;
}
}
if (!hasConflit) {
// set native coders as default if user does not
// specify a fallback order
if (coderFactory instanceof NativeRSRawErasureCoderFactory ||
coderFactory instanceof NativeXORRawErasureCoderFactory) {
coders.add(0, coderFactory);
} else {
coders.add(coderFactory);
}
LOG.debug("Codec registered: codec = {}, coder = {}",
coderFactory.getCodecName(), coderFactory.getCoderName());
}
}
}
// update coderNameMap accordingly
coderNameMap.clear();
for (Map.Entry<String, List<RawErasureCoderFactory>> entry :
coderMap.entrySet()) {
String codecName = entry.getKey();
List<RawErasureCoderFactory> coders = entry.getValue();
coderNameMap.put(codecName, coders.stream().
map(RawErasureCoderFactory::getCoderName).
collect(Collectors.toList()).toArray(new String[0]));
}
}
/**
* Get all coder names of the given codec.
* @param codecName the name of codec
* @return an array of all coder names
*/
public String[] getCoderNames(String codecName) {
String[] coderNames = coderNameMap.get(codecName);
if (coderNames == null) {
throw new IllegalArgumentException("No available raw coder factory for "
+ codecName);
}
return coderNames;
}
/**
* Get all coder factories of the given codec.
* @param codecName the name of codec
* @return a list of all coder factories
*/
public List<RawErasureCoderFactory> getCoders(String codecName) {
List<RawErasureCoderFactory> coders = coderMap.get(codecName);
if (coders == null) {
throw new IllegalArgumentException("No available raw coder factory for "
+ codecName);
}
return coders;
}
/**
* Get all codec names.
* @return a set of all codec names
*/
public Set<String> getCodecNames() {
return coderMap.keySet();
}
/**
* Get a specific coder factory defined by codec name and coder name.
* @param codecName name of the codec
* @param coderName name of the coder
* @return the specific coder
*/
public RawErasureCoderFactory getCoderByName(
String codecName, String coderName) {
List<RawErasureCoderFactory> coders = getCoders(codecName);
// find the RawErasureCoderFactory with the name of coderName
for (RawErasureCoderFactory coder : coders) {
if (coder.getCoderName().equals(coderName)) {
return coder;
}
}
// if not found, throw exception
throw new IllegalArgumentException("No implementation for coder "
+ coderName + " of codec " + codecName);
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.io.erasurecode; package org.apache.hadoop.io.erasurecode;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -30,18 +28,12 @@ import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
import org.apache.hadoop.io.erasurecode.codec.XORErasureCodec; import org.apache.hadoop.io.erasurecode.codec.XORErasureCodec;
import org.apache.hadoop.io.erasurecode.coder.ErasureDecoder; import org.apache.hadoop.io.erasurecode.coder.ErasureDecoder;
import org.apache.hadoop.io.erasurecode.coder.ErasureEncoder; import org.apache.hadoop.io.erasurecode.coder.ErasureEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeXORRawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RSLegacyRawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoderFactory; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.XORRawErasureCoderFactory;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.Map;
/** /**
* A codec & coder utility to help create coders conveniently. * A codec & coder utility to help create coders conveniently.
@ -79,27 +71,12 @@ public final class CodecUtil {
/** Comma separated raw codec name. The first coder is prior to the latter. */ /** Comma separated raw codec name. The first coder is prior to the latter. */
public static final String IO_ERASURECODE_CODEC_RS_LEGACY_RAWCODERS_KEY = public static final String IO_ERASURECODE_CODEC_RS_LEGACY_RAWCODERS_KEY =
IO_ERASURECODE_CODEC + "rs-legacy.rawcoders"; IO_ERASURECODE_CODEC + "rs-legacy.rawcoders";
public static final String IO_ERASURECODE_CODEC_RS_LEGACY_RAWCODERS_DEFAULT =
RSLegacyRawErasureCoderFactory.class.getCanonicalName();
public static final String IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY = public static final String IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY =
IO_ERASURECODE_CODEC + "rs.rawcoders"; IO_ERASURECODE_CODEC + "rs.rawcoders";
public static final String IO_ERASURECODE_CODEC_RS_RAWCODERS_DEFAULT =
NativeRSRawErasureCoderFactory.class.getCanonicalName() +
"," + RSRawErasureCoderFactory.class.getCanonicalName();
/** Raw coder factory for the XOR codec. */ /** Raw coder factory for the XOR codec. */
public static final String IO_ERASURECODE_CODEC_XOR_RAWCODERS_KEY = public static final String IO_ERASURECODE_CODEC_XOR_RAWCODERS_KEY =
IO_ERASURECODE_CODEC + "xor.rawcoders"; IO_ERASURECODE_CODEC + "xor.rawcoders";
public static final String IO_ERASURECODE_CODEC_XOR_RAWCODERS_DEFAULT =
NativeXORRawErasureCoderFactory.class.getCanonicalName() +
"," + XORRawErasureCoderFactory.class.getCanonicalName();
// Default coders for each codec names.
public static final Map<String, String> DEFAULT_CODERS_MAP = ImmutableMap.of(
"rs", IO_ERASURECODE_CODEC_RS_RAWCODERS_DEFAULT,
"rs-legacy", IO_ERASURECODE_CODEC_RS_LEGACY_RAWCODERS_DEFAULT,
"xor", IO_ERASURECODE_CODEC_XOR_RAWCODERS_DEFAULT
);
private CodecUtil() { } private CodecUtil() { }
@ -168,70 +145,61 @@ public final class CodecUtil {
} }
private static RawErasureCoderFactory createRawCoderFactory( private static RawErasureCoderFactory createRawCoderFactory(
Configuration conf, String rawCoderFactoryKey) { String coderName, String codecName) {
RawErasureCoderFactory fact; RawErasureCoderFactory fact;
try { fact = CodecRegistry.getInstance().
Class<? extends RawErasureCoderFactory> factClass = conf.getClassByName( getCoderByName(codecName, coderName);
rawCoderFactoryKey).asSubclass(RawErasureCoderFactory.class);
fact = factClass.newInstance();
} catch (ClassNotFoundException | InstantiationException |
IllegalAccessException e) {
throw new RuntimeException("Failed to create raw coder factory", e);
}
if (fact == null) {
throw new RuntimeException("Failed to create raw coder factory");
}
return fact; return fact;
} }
// Return comma separated coder names // Return a list of coder names
private static String getRawCoders(Configuration conf, String codec) { private static String[] getRawCoderNames(
return conf.get( Configuration conf, String codecName) {
IO_ERASURECODE_CODEC + codec + ".rawcoders", return conf.getStrings(
DEFAULT_CODERS_MAP.getOrDefault(codec, codec) IO_ERASURECODE_CODEC + codecName + ".rawcoders",
CodecRegistry.getInstance().getCoderNames(codecName)
); );
} }
private static RawErasureEncoder createRawEncoderWithFallback( private static RawErasureEncoder createRawEncoderWithFallback(
Configuration conf, String codec, ErasureCoderOptions coderOptions) { Configuration conf, String codecName, ErasureCoderOptions coderOptions) {
String coders = getRawCoders(conf, codec); String[] rawCoderNames = getRawCoderNames(conf, codecName);
for (String factName : Splitter.on(",").split(coders)) { for (String rawCoderName : rawCoderNames) {
try { try {
if (factName != null) { if (rawCoderName != null) {
RawErasureCoderFactory fact = createRawCoderFactory(conf, RawErasureCoderFactory fact = createRawCoderFactory(
factName); rawCoderName, codecName);
return fact.createEncoder(coderOptions); return fact.createEncoder(coderOptions);
} }
} catch (LinkageError | Exception e) { } catch (LinkageError | Exception e) {
// Fallback to next coder if possible // Fallback to next coder if possible
LOG.warn("Failed to create raw erasure encoder " + factName + LOG.warn("Failed to create raw erasure encoder " + rawCoderName +
", fallback to next codec if possible", e); ", fallback to next codec if possible", e);
} }
} }
throw new IllegalArgumentException("Fail to create raw erasure " + throw new IllegalArgumentException("Fail to create raw erasure " +
"encoder with given codec: " + codec); "encoder with given codec: " + codecName);
} }
private static RawErasureDecoder createRawDecoderWithFallback( private static RawErasureDecoder createRawDecoderWithFallback(
Configuration conf, String codec, ErasureCoderOptions coderOptions) { Configuration conf, String codecName, ErasureCoderOptions coderOptions) {
String coders = getRawCoders(conf, codec); String[] coders = getRawCoderNames(conf, codecName);
for (String factName : Splitter.on(",").split(coders)) { for (String rawCoderName : coders) {
try { try {
if (factName != null) { if (rawCoderName != null) {
RawErasureCoderFactory fact = createRawCoderFactory(conf, RawErasureCoderFactory fact = createRawCoderFactory(
factName); rawCoderName, codecName);
return fact.createDecoder(coderOptions); return fact.createDecoder(coderOptions);
} }
} catch (LinkageError | Exception e) { } catch (LinkageError | Exception e) {
// Fallback to next coder if possible // Fallback to next coder if possible
LOG.warn("Failed to create raw erasure decoder " + factName + LOG.warn("Failed to create raw erasure decoder " + rawCoderName +
", fallback to next codec if possible", e); ", fallback to next codec if possible", e);
} }
} }
throw new IllegalArgumentException("Fail to create raw erasure " + throw new IllegalArgumentException("Fail to create raw erasure " +
"encoder with given codec: " + codec); "encoder with given codec: " + codecName);
} }
private static ErasureCodec createCodec(Configuration conf, private static ErasureCodec createCodec(Configuration conf,

View File

@ -25,6 +25,7 @@ public final class ErasureCodeConstants {
private ErasureCodeConstants() { private ErasureCodeConstants() {
} }
public static final String DUMMY_CODEC_NAME = "dummy";
public static final String RS_CODEC_NAME = "rs"; public static final String RS_CODEC_NAME = "rs";
public static final String RS_LEGACY_CODEC_NAME = "rs-legacy"; public static final String RS_LEGACY_CODEC_NAME = "rs-legacy";
public static final String XOR_CODEC_NAME = "xor"; public static final String XOR_CODEC_NAME = "xor";

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder; package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/** /**
@ -25,6 +26,7 @@ import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class DummyRawErasureCoderFactory implements RawErasureCoderFactory { public class DummyRawErasureCoderFactory implements RawErasureCoderFactory {
public static final String CODER_NAME = "dummy_dummy";
@Override @Override
public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) { public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) {
@ -35,4 +37,14 @@ public class DummyRawErasureCoderFactory implements RawErasureCoderFactory {
public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) { public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) {
return new DummyRawDecoder(coderOptions); return new DummyRawDecoder(coderOptions);
} }
@Override
public String getCoderName() {
return CODER_NAME;
}
@Override
public String getCodecName() {
return ErasureCodeConstants.DUMMY_CODEC_NAME;
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder; package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/** /**
@ -27,6 +28,8 @@ import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
@InterfaceAudience.Private @InterfaceAudience.Private
public class NativeRSRawErasureCoderFactory implements RawErasureCoderFactory { public class NativeRSRawErasureCoderFactory implements RawErasureCoderFactory {
public static final String CODER_NAME = "rs_native";
@Override @Override
public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) { public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) {
return new NativeRSRawEncoder(coderOptions); return new NativeRSRawEncoder(coderOptions);
@ -36,4 +39,14 @@ public class NativeRSRawErasureCoderFactory implements RawErasureCoderFactory {
public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) { public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) {
return new NativeRSRawDecoder(coderOptions); return new NativeRSRawDecoder(coderOptions);
} }
@Override
public String getCoderName() {
return CODER_NAME;
}
@Override
public String getCodecName() {
return ErasureCodeConstants.RS_CODEC_NAME;
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder; package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/** /**
@ -27,6 +28,8 @@ import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
@InterfaceAudience.Private @InterfaceAudience.Private
public class NativeXORRawErasureCoderFactory implements RawErasureCoderFactory { public class NativeXORRawErasureCoderFactory implements RawErasureCoderFactory {
public static final String CODER_NAME = "xor_native";
@Override @Override
public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) { public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) {
return new NativeXORRawEncoder(coderOptions); return new NativeXORRawEncoder(coderOptions);
@ -36,4 +39,14 @@ public class NativeXORRawErasureCoderFactory implements RawErasureCoderFactory {
public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) { public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) {
return new NativeXORRawDecoder(coderOptions); return new NativeXORRawDecoder(coderOptions);
} }
@Override
public String getCoderName() {
return CODER_NAME;
}
@Override
public String getCodecName() {
return ErasureCodeConstants.XOR_CODEC_NAME;
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder; package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/** /**
@ -26,6 +27,8 @@ import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
@InterfaceAudience.Private @InterfaceAudience.Private
public class RSLegacyRawErasureCoderFactory implements RawErasureCoderFactory { public class RSLegacyRawErasureCoderFactory implements RawErasureCoderFactory {
public static final String CODER_NAME = "rs-legacy_java";
@Override @Override
public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) { public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) {
return new RSLegacyRawEncoder(coderOptions); return new RSLegacyRawEncoder(coderOptions);
@ -35,4 +38,14 @@ public class RSLegacyRawErasureCoderFactory implements RawErasureCoderFactory {
public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) { public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) {
return new RSLegacyRawDecoder(coderOptions); return new RSLegacyRawDecoder(coderOptions);
} }
@Override
public String getCoderName() {
return CODER_NAME;
}
@Override
public String getCodecName() {
return ErasureCodeConstants.RS_LEGACY_CODEC_NAME;
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder; package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/** /**
@ -26,6 +27,8 @@ import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
@InterfaceAudience.Private @InterfaceAudience.Private
public class RSRawErasureCoderFactory implements RawErasureCoderFactory { public class RSRawErasureCoderFactory implements RawErasureCoderFactory {
public static final String CODER_NAME = "rs_java";
@Override @Override
public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) { public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) {
return new RSRawEncoder(coderOptions); return new RSRawEncoder(coderOptions);
@ -35,4 +38,14 @@ public class RSRawErasureCoderFactory implements RawErasureCoderFactory {
public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) { public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) {
return new RSRawDecoder(coderOptions); return new RSRawDecoder(coderOptions);
} }
@Override
public String getCoderName() {
return CODER_NAME;
}
@Override
public String getCodecName() {
return ErasureCodeConstants.RS_CODEC_NAME;
}
} }

View File

@ -41,4 +41,16 @@ public interface RawErasureCoderFactory {
* @return raw erasure decoder * @return raw erasure decoder
*/ */
RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions); RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions);
/**
* Get the name of the coder.
* @return coder name
*/
String getCoderName();
/**
* Get the name of its codec.
* @return codec name
*/
String getCodecName();
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.io.erasurecode.rawcoder; package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
/** /**
@ -26,6 +27,8 @@ import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
@InterfaceAudience.Private @InterfaceAudience.Private
public class XORRawErasureCoderFactory implements RawErasureCoderFactory { public class XORRawErasureCoderFactory implements RawErasureCoderFactory {
public static final String CODER_NAME = "xor_java";
@Override @Override
public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) { public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) {
return new XORRawEncoder(coderOptions); return new XORRawEncoder(coderOptions);
@ -35,4 +38,14 @@ public class XORRawErasureCoderFactory implements RawErasureCoderFactory {
public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) { public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) {
return new XORRawDecoder(coderOptions); return new XORRawDecoder(coderOptions);
} }
@Override
public String getCoderName() {
return CODER_NAME;
}
@Override
public String getCodecName() {
return ErasureCodeConstants.XOR_CODEC_NAME;
}
} }

View File

@ -0,0 +1,18 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory
org.apache.hadoop.io.erasurecode.rawcoder.NativeXORRawErasureCoderFactory
org.apache.hadoop.io.erasurecode.rawcoder.RSRawErasureCoderFactory
org.apache.hadoop.io.erasurecode.rawcoder.RSLegacyRawErasureCoderFactory
org.apache.hadoop.io.erasurecode.rawcoder.XORRawErasureCoderFactory

View File

@ -668,7 +668,7 @@
<property> <property>
<name>io.erasurecode.codec.rs.rawcoders</name> <name>io.erasurecode.codec.rs.rawcoders</name>
<value>org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory,org.apache.hadoop.io.erasurecode.rawcoder.RSRawErasureCoderFactory</value> <value>rs_native,rs_java</value>
<description> <description>
Comma separated raw coder implementations for the rs codec. The earlier Comma separated raw coder implementations for the rs codec. The earlier
factory is prior to followings in case of failure of creating raw coders. factory is prior to followings in case of failure of creating raw coders.
@ -677,7 +677,7 @@
<property> <property>
<name>io.erasurecode.codec.rs-legacy.rawcoders</name> <name>io.erasurecode.codec.rs-legacy.rawcoders</name>
<value>org.apache.hadoop.io.erasurecode.rawcoder.RSLegacyRawErasureCoderFactory</value> <value>rs-legacy_java</value>
<description> <description>
Comma separated raw coder implementations for the rs-legacy codec. The earlier Comma separated raw coder implementations for the rs-legacy codec. The earlier
factory is prior to followings in case of failure of creating raw coders. factory is prior to followings in case of failure of creating raw coders.
@ -686,7 +686,7 @@
<property> <property>
<name>io.erasurecode.codec.xor.rawcoders</name> <name>io.erasurecode.codec.xor.rawcoders</name>
<value>org.apache.hadoop.io.erasurecode.rawcoder.NativeXORRawErasureCoderFactory,org.apache.hadoop.io.erasurecode.rawcoder.XORRawErasureCoderFactory</value> <value>xor_native,xor_java</value>
<description> <description>
Comma separated raw coder implementations for the xor codec. The earlier Comma separated raw coder implementations for the xor codec. The earlier
factory is prior to followings in case of failure of creating raw coders. factory is prior to followings in case of failure of creating raw coders.

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.XORRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.XORRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.XORRawErasureCoderFactory;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -104,8 +105,8 @@ public class TestCodecRawCoderMapping {
ErasureCoderOptions coderOptions = new ErasureCoderOptions( ErasureCoderOptions coderOptions = new ErasureCoderOptions(
numDataUnit, numParityUnit); numDataUnit, numParityUnit);
conf.set(CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, conf.set(CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
RSRawErasureCoderFactory.class.getCanonicalName() + RSRawErasureCoderFactory.CODER_NAME +
"," + NativeRSRawErasureCoderFactory.class.getCanonicalName()); "," + NativeRSRawErasureCoderFactory.CODER_NAME);
// should return default raw coder of rs codec // should return default raw coder of rs codec
RawErasureEncoder encoder = CodecUtil.createRawEncoder( RawErasureEncoder encoder = CodecUtil.createRawEncoder(
conf, ErasureCodeConstants.RS_CODEC_NAME, coderOptions); conf, ErasureCodeConstants.RS_CODEC_NAME, coderOptions);
@ -133,8 +134,7 @@ public class TestCodecRawCoderMapping {
ErasureCoderOptions coderOptions = new ErasureCoderOptions( ErasureCoderOptions coderOptions = new ErasureCoderOptions(
numDataUnit, numParityUnit); numDataUnit, numParityUnit);
conf.set(CodecUtil.IO_ERASURECODE_CODEC_XOR_RAWCODERS_KEY, conf.set(CodecUtil.IO_ERASURECODE_CODEC_XOR_RAWCODERS_KEY,
"invalid-codec," + "invalid-codec," + XORRawErasureCoderFactory.CODER_NAME);
"org.apache.hadoop.io.erasurecode.rawcoder.XORRawErasureCoderFactory");
// should return second coder specified by IO_ERASURECODE_CODEC_CODERS // should return second coder specified by IO_ERASURECODE_CODEC_CODERS
RawErasureEncoder encoder = CodecUtil.createRawEncoder( RawErasureEncoder encoder = CodecUtil.createRawEncoder(
conf, ErasureCodeConstants.XOR_CODEC_NAME, coderOptions); conf, ErasureCodeConstants.XOR_CODEC_NAME, coderOptions);

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeXORRawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RSLegacyRawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.XORRawErasureCoderFactory;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test CodecRegistry.
*/
public class TestCodecRegistry {
@Test
public void testGetCodecs() {
Set<String> codecs = CodecRegistry.getInstance().getCodecNames();
assertEquals(3, codecs.size());
assertTrue(codecs.contains(ErasureCodeConstants.RS_CODEC_NAME));
assertTrue(codecs.contains(ErasureCodeConstants.RS_LEGACY_CODEC_NAME));
assertTrue(codecs.contains(ErasureCodeConstants.XOR_CODEC_NAME));
}
@Test
public void testGetCoders() {
List<RawErasureCoderFactory> coders = CodecRegistry.getInstance().
getCoders(ErasureCodeConstants.RS_CODEC_NAME);
assertEquals(2, coders.size());
assertTrue(coders.get(0) instanceof NativeRSRawErasureCoderFactory);
assertTrue(coders.get(1) instanceof RSRawErasureCoderFactory);
coders = CodecRegistry.getInstance().
getCoders(ErasureCodeConstants.RS_LEGACY_CODEC_NAME);
assertEquals(1, coders.size());
assertTrue(coders.get(0) instanceof RSLegacyRawErasureCoderFactory);
coders = CodecRegistry.getInstance().
getCoders(ErasureCodeConstants.XOR_CODEC_NAME);
assertEquals(2, coders.size());
assertTrue(coders.get(0) instanceof NativeXORRawErasureCoderFactory);
assertTrue(coders.get(1) instanceof XORRawErasureCoderFactory);
}
@Test(expected = IllegalArgumentException.class)
public void testGetCodersWrong() {
List<RawErasureCoderFactory> coders =
CodecRegistry.getInstance().getCoders("WRONG_CODEC");
}
@Test
public void testGetCoderNames() {
String[] coderNames = CodecRegistry.getInstance().
getCoderNames(ErasureCodeConstants.RS_CODEC_NAME);
assertEquals(2, coderNames.length);
assertEquals(NativeRSRawErasureCoderFactory.CODER_NAME, coderNames[0]);
assertEquals(RSRawErasureCoderFactory.CODER_NAME, coderNames[1]);
coderNames = CodecRegistry.getInstance().
getCoderNames(ErasureCodeConstants.RS_LEGACY_CODEC_NAME);
assertEquals(1, coderNames.length);
assertEquals(RSLegacyRawErasureCoderFactory.CODER_NAME,
coderNames[0]);
coderNames = CodecRegistry.getInstance().
getCoderNames(ErasureCodeConstants.XOR_CODEC_NAME);
assertEquals(2, coderNames.length);
assertEquals(NativeXORRawErasureCoderFactory.CODER_NAME,
coderNames[0]);
assertEquals(XORRawErasureCoderFactory.CODER_NAME, coderNames[1]);
}
@Test
public void testGetCoderByName() {
RawErasureCoderFactory coder = CodecRegistry.getInstance().
getCoderByName(ErasureCodeConstants.RS_CODEC_NAME,
RSRawErasureCoderFactory.CODER_NAME);
assertTrue(coder instanceof RSRawErasureCoderFactory);
coder = CodecRegistry.getInstance().getCoderByName(
ErasureCodeConstants.RS_CODEC_NAME,
NativeRSRawErasureCoderFactory.CODER_NAME);
assertTrue(coder instanceof NativeRSRawErasureCoderFactory);
coder = CodecRegistry.getInstance().getCoderByName(
ErasureCodeConstants.RS_LEGACY_CODEC_NAME,
RSLegacyRawErasureCoderFactory.CODER_NAME);
assertTrue(coder instanceof RSLegacyRawErasureCoderFactory);
coder = CodecRegistry.getInstance().getCoderByName(
ErasureCodeConstants.XOR_CODEC_NAME,
XORRawErasureCoderFactory.CODER_NAME);
assertTrue(coder instanceof XORRawErasureCoderFactory);
coder = CodecRegistry.getInstance().getCoderByName(
ErasureCodeConstants.XOR_CODEC_NAME,
NativeXORRawErasureCoderFactory.CODER_NAME);
assertTrue(coder instanceof NativeXORRawErasureCoderFactory);
}
@Test(expected = IllegalArgumentException.class)
public void testGetCoderByNameWrong() {
RawErasureCoderFactory coder = CodecRegistry.getInstance().
getCoderByName(ErasureCodeConstants.RS_CODEC_NAME, "WRONG_RS");
}
@Test
public void testUpdateCoders() {
class RSUserDefinedIncorrectFactory implements RawErasureCoderFactory {
public RawErasureEncoder createEncoder(ErasureCoderOptions coderOptions) {
return null;
}
public RawErasureDecoder createDecoder(ErasureCoderOptions coderOptions) {
return null;
}
public String getCoderName() {
return "rs_java";
}
public String getCodecName() {
return ErasureCodeConstants.RS_CODEC_NAME;
}
}
List<RawErasureCoderFactory> userDefinedFactories = new ArrayList<>();
userDefinedFactories.add(new RSUserDefinedIncorrectFactory());
CodecRegistry.getInstance().updateCoders(userDefinedFactories);
// check RS coders
List<RawErasureCoderFactory> rsCoders = CodecRegistry.getInstance().
getCoders(ErasureCodeConstants.RS_CODEC_NAME);
assertEquals(2, rsCoders.size());
assertTrue(rsCoders.get(0) instanceof NativeRSRawErasureCoderFactory);
assertTrue(rsCoders.get(1) instanceof RSRawErasureCoderFactory);
// check RS coder names
String[] rsCoderNames = CodecRegistry.getInstance().
getCoderNames(ErasureCodeConstants.RS_CODEC_NAME);
assertEquals(2, rsCoderNames.length);
assertEquals(NativeRSRawErasureCoderFactory.CODER_NAME, rsCoderNames[0]);
assertEquals(RSRawErasureCoderFactory.CODER_NAME, rsCoderNames[1]);
}
}

View File

@ -51,7 +51,7 @@ public class TestHHXORErasureCoder extends TestHHErasureCoderBase {
*/ */
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, conf.set(CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
RSRawErasureCoderFactory.class.getCanonicalName()); RSRawErasureCoderFactory.CODER_NAME);
prepare(conf, 10, 4, new int[]{0}, new int[0]); prepare(conf, 10, 4, new int[]{0}, new int[0]);
testCoding(true); testCoding(true);

View File

@ -58,7 +58,7 @@ public class TestRSErasureCoder extends TestErasureCoderBase {
*/ */
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, conf.set(CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
RSRawErasureCoderFactory.class.getCanonicalName()); RSRawErasureCoderFactory.CODER_NAME);
prepare(conf, 10, 4, new int[]{0}, new int[0]); prepare(conf, 10, 4, new int[]{0}, new int[0]);
testCoding(true); testCoding(true);

View File

@ -117,11 +117,15 @@ Deployment
be more appropriate. If the administrator only cares about node-level fault-tolerance, `RS-10-4-64k` would still be appropriate as long as be more appropriate. If the administrator only cares about node-level fault-tolerance, `RS-10-4-64k` would still be appropriate as long as
there are at least 14 DataNodes in the cluster. there are at least 14 DataNodes in the cluster.
The codec implementation for Reed-Solomon and XOR can be configured with the following client and DataNode configuration keys: The codec implementations for Reed-Solomon and XOR can be configured with the following client and DataNode configuration keys:
`io.erasurecode.codec.rs.rawcoder` for the default RS codec, `io.erasurecode.codec.rs.rawcoders` for the default RS codec,
`io.erasurecode.codec.rs-legacy.rawcoder` for the legacy RS codec, `io.erasurecode.codec.rs-legacy.rawcoders` for the legacy RS codec,
`io.erasurecode.codec.xor.rawcoder` for the XOR codec. `io.erasurecode.codec.xor.rawcoders` for the XOR codec.
The default implementations for all of these codecs are pure Java. For default RS codec, there is also a native implementation which leverages Intel ISA-L library to improve the performance of codec. For XOR codec, a native implementation which leverages Intel ISA-L library to improve the performance of codec is also supported. Please refer to section "Enable Intel ISA-L" for more detail information. User can also configure self-defined codec with configuration key like:
`io.erasurecode.codec.self-defined-codec.rawcoders`.
The values for these key are lists of coder names with a fall-back mechanism.
All these codecs have implementations in pure Java. For default RS codec, there is also a native implementation which leverages Intel ISA-L library to improve the performance of codec. For XOR codec, a native implementation which leverages Intel ISA-L library to improve the performance of codec is also supported. Please refer to section "Enable Intel ISA-L" for more detail information.
The default implementation for RS Legacy is pure Java, and the default implementations for default RS and XOR are native implementations using Intel ISA-L library.
Erasure coding background recovery work on the DataNodes can also be tuned via the following configuration parameters: Erasure coding background recovery work on the DataNodes can also be tuned via the following configuration parameters:

View File

@ -98,7 +98,7 @@ public class TestDFSStripedInputStream {
if (ErasureCodeNative.isNativeCodeLoaded()) { if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set( conf.set(
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
NativeRSRawErasureCoderFactory.class.getCanonicalName()); NativeRSRawErasureCoderFactory.CODER_NAME);
} }
SimulatedFSDataset.setFactory(conf); SimulatedFSDataset.setFactory(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes( cluster = new MiniDFSCluster.Builder(conf).numDataNodes(

View File

@ -85,7 +85,7 @@ public class TestDFSStripedOutputStream {
if (ErasureCodeNative.isNativeCodeLoaded()) { if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set( conf.set(
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
NativeRSRawErasureCoderFactory.class.getCanonicalName()); NativeRSRawErasureCoderFactory.CODER_NAME);
} }
DFSTestUtil.enableAllECPolicies(conf); DFSTestUtil.enableAllECPolicies(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();

View File

@ -214,7 +214,7 @@ public class TestDFSStripedOutputStreamWithFailure {
if (ErasureCodeNative.isNativeCodeLoaded()) { if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set( conf.set(
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
NativeRSRawErasureCoderFactory.class.getCanonicalName()); NativeRSRawErasureCoderFactory.CODER_NAME);
} }
DFSTestUtil.enableAllECPolicies(conf); DFSTestUtil.enableAllECPolicies(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();

View File

@ -100,7 +100,7 @@ public class TestReconstructStripedFile {
if (ErasureCodeNative.isNativeCodeLoaded()) { if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set( conf.set(
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
NativeRSRawErasureCoderFactory.class.getCanonicalName()); NativeRSRawErasureCoderFactory.CODER_NAME);
} }
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
StripedFileTestUtil.getDefaultECPolicy().getName()); StripedFileTestUtil.getDefaultECPolicy().getName());

View File

@ -68,7 +68,7 @@ public class TestUnsetAndChangeDirectoryEcPolicy {
if (ErasureCodeNative.isNativeCodeLoaded()) { if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set( conf.set(
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
NativeRSRawErasureCoderFactory.class.getCanonicalName()); NativeRSRawErasureCoderFactory.CODER_NAME);
} }
DFSTestUtil.enableAllECPolicies(conf); DFSTestUtil.enableAllECPolicies(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes( cluster = new MiniDFSCluster.Builder(conf).numDataNodes(