HDFS-8557 Allow to configure RS and XOR raw coders. Contributed by Kai Zheng

This commit is contained in:
Kai Zheng 2015-06-10 15:35:26 +08:00
parent c41b02cc00
commit e299fe86b8
14 changed files with 216 additions and 118 deletions

View File

@ -66,4 +66,6 @@
HADOOP-12011. Allow to dump verbose information to ease debugging in raw erasure coders HADOOP-12011. Allow to dump verbose information to ease debugging in raw erasure coders
(Kai Zheng) (Kai Zheng)
HADOOP-12065. Using more meaningful keys in EC schema. (Kai Zheng) HADOOP-12065. Using more meaningful keys in EC schema. (Kai Zheng)
HDFS-8557. Allow to configure RS and XOR raw coders (Kai Zheng)

View File

@ -143,10 +143,14 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Supported erasure codec classes */ /** Supported erasure codec classes */
public static final String IO_ERASURECODE_CODECS_KEY = "io.erasurecode.codecs"; public static final String IO_ERASURECODE_CODECS_KEY = "io.erasurecode.codecs";
/** Raw coder factory for the RS codec */ /** Raw coder factory for the RS codec. */
public static final String IO_ERASURECODE_CODEC_RS_RAWCODER_KEY = public static final String IO_ERASURECODE_CODEC_RS_RAWCODER_KEY =
"io.erasurecode.codec.rs.rawcoder"; "io.erasurecode.codec.rs.rawcoder";
/** Raw coder factory for the XOR codec. */
public static final String IO_ERASURECODE_CODEC_XOR_RAWCODER_KEY =
"io.erasurecode.codec.xor.rawcoder";
/** /**
* Service Authorization * Service Authorization
*/ */

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.erasurecode.rawcoder.*;
/**
* A codec utility.
*/
public final class CodecUtil {
private CodecUtil() {}
/**
* Create RS raw encoder according to configuration.
* @param conf
* @param numDataUnits
* @param numParityUnits
* @return raw encoder
*/
public static RawErasureEncoder createRSRawEncoder(
Configuration conf, int numDataUnits, int numParityUnits) {
RawErasureCoder rawCoder = createRawCoder(conf,
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
true, numDataUnits, numParityUnits);
if (rawCoder == null) {
rawCoder = new RSRawEncoder(numDataUnits, numParityUnits);
}
return (RawErasureEncoder) rawCoder;
}
/**
* Create RS raw decoder according to configuration.
* @param conf
* @param numDataUnits
* @param numParityUnits
* @return raw decoder
*/
public static RawErasureDecoder createRSRawDecoder(
Configuration conf, int numDataUnits, int numParityUnits) {
RawErasureCoder rawCoder = createRawCoder(conf,
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
false, numDataUnits, numParityUnits);
if (rawCoder == null) {
rawCoder = new RSRawDecoder(numDataUnits, numParityUnits);
}
return (RawErasureDecoder) rawCoder;
}
/**
* Create XOR raw encoder according to configuration.
* @param conf
* @param numDataUnits
* @param numParityUnits
* @return raw encoder
*/
public static RawErasureEncoder createXORRawEncoder(
Configuration conf, int numDataUnits, int numParityUnits) {
RawErasureCoder rawCoder = createRawCoder(conf,
CommonConfigurationKeys.IO_ERASURECODE_CODEC_XOR_RAWCODER_KEY,
true, numDataUnits, numParityUnits);
if (rawCoder == null) {
rawCoder = new XORRawEncoder(numDataUnits, numParityUnits);
}
return (RawErasureEncoder) rawCoder;
}
/**
* Create XOR raw decoder according to configuration.
* @param conf
* @param numDataUnits
* @param numParityUnits
* @return raw decoder
*/
public static RawErasureDecoder createXORRawDecoder(
Configuration conf, int numDataUnits, int numParityUnits) {
RawErasureCoder rawCoder = createRawCoder(conf,
CommonConfigurationKeys.IO_ERASURECODE_CODEC_XOR_RAWCODER_KEY,
false, numDataUnits, numParityUnits);
if (rawCoder == null) {
rawCoder = new XORRawDecoder(numDataUnits, numParityUnits);
}
return (RawErasureDecoder) rawCoder;
}
/**
* Create raw coder using specified conf and raw coder factory key.
* @param conf
* @param rawCoderFactoryKey
* @param isEncoder
* @param numDataUnits
* @param numParityUnits
* @return raw coder
*/
public static RawErasureCoder createRawCoder(Configuration conf,
String rawCoderFactoryKey, boolean isEncoder, int numDataUnits,
int numParityUnits) {
if (conf == null) {
return null;
}
Class<? extends RawErasureCoderFactory> factClass = null;
factClass = conf.getClass(rawCoderFactoryKey,
factClass, RawErasureCoderFactory.class);
if (factClass == null) {
return null;
}
RawErasureCoderFactory fact;
try {
fact = factClass.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException("Failed to create raw coder", e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Failed to create raw coder", e);
}
return isEncoder ? fact.createEncoder(numDataUnits, numParityUnits) :
fact.createDecoder(numDataUnits, numParityUnits);
}
}

View File

@ -17,13 +17,8 @@
*/ */
package org.apache.hadoop.io.erasurecode.coder; package org.apache.hadoop.io.erasurecode.coder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureCoderFactory;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
/** /**
* A common class of basic facilities to be shared by encoder and decoder * A common class of basic facilities to be shared by encoder and decoder
@ -36,73 +31,13 @@ public abstract class AbstractErasureCoder
private final int numDataUnits; private final int numDataUnits;
private final int numParityUnits; private final int numParityUnits;
/**
* Create raw decoder using the factory specified by rawCoderFactoryKey
* @param rawCoderFactoryKey
* @return raw decoder
*/
protected RawErasureDecoder createRawDecoder(
String rawCoderFactoryKey, int dataUnitsCount, int parityUnitsCount) {
RawErasureCoder rawCoder = createRawCoder(getConf(),
rawCoderFactoryKey, false, dataUnitsCount, parityUnitsCount);
return (RawErasureDecoder) rawCoder;
}
/**
* Create raw encoder using the factory specified by rawCoderFactoryKey
* @param rawCoderFactoryKey
* @return raw encoder
*/
protected RawErasureEncoder createRawEncoder(
String rawCoderFactoryKey, int dataUnitsCount, int parityUnitsCount) {
RawErasureCoder rawCoder = createRawCoder(getConf(),
rawCoderFactoryKey, true, dataUnitsCount, parityUnitsCount);
return (RawErasureEncoder) rawCoder;
}
/**
* Create raw coder using specified conf and raw coder factory key.
* @param conf
* @param rawCoderFactoryKey
* @param isEncoder
* @return raw coder
*/
public static RawErasureCoder createRawCoder(Configuration conf,
String rawCoderFactoryKey, boolean isEncoder, int numDataUnits,
int numParityUnits) {
if (conf == null) {
return null;
}
Class<? extends RawErasureCoderFactory> factClass = null;
factClass = conf.getClass(rawCoderFactoryKey,
factClass, RawErasureCoderFactory.class);
if (factClass == null) {
return null;
}
RawErasureCoderFactory fact;
try {
fact = factClass.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException("Failed to create raw coder", e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Failed to create raw coder", e);
}
return isEncoder ? fact.createEncoder(numDataUnits, numParityUnits) :
fact.createDecoder(numDataUnits, numParityUnits);
}
public AbstractErasureCoder(int numDataUnits, int numParityUnits) { public AbstractErasureCoder(int numDataUnits, int numParityUnits) {
this.numDataUnits = numDataUnits; this.numDataUnits = numDataUnits;
this.numParityUnits = numParityUnits; this.numParityUnits = numParityUnits;
} }
public AbstractErasureCoder(ECSchema schema) { public AbstractErasureCoder(ECSchema schema) {
this(schema.getNumDataUnits(), schema.getNumParityUnits()); this(schema.getNumDataUnits(), schema.getNumParityUnits());
} }
@Override @Override

View File

@ -17,11 +17,10 @@
*/ */
package org.apache.hadoop.io.erasurecode.coder; package org.apache.hadoop.io.erasurecode.coder;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
/** /**
@ -53,12 +52,8 @@ public class RSErasureDecoder extends AbstractErasureDecoder {
private RawErasureDecoder checkCreateRSRawDecoder() { private RawErasureDecoder checkCreateRSRawDecoder() {
if (rsRawDecoder == null) { if (rsRawDecoder == null) {
rsRawDecoder = createRawDecoder( rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(),
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, getNumDataUnits(), getNumParityUnits());
getNumDataUnits(), getNumParityUnits());
if (rsRawDecoder == null) {
rsRawDecoder = new RSRawDecoder(getNumDataUnits(), getNumParityUnits());
}
} }
return rsRawDecoder; return rsRawDecoder;
} }

View File

@ -17,11 +17,10 @@
*/ */
package org.apache.hadoop.io.erasurecode.coder; package org.apache.hadoop.io.erasurecode.coder;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
/** /**
@ -53,12 +52,8 @@ public class RSErasureEncoder extends AbstractErasureEncoder {
private RawErasureEncoder checkCreateRSRawEncoder() { private RawErasureEncoder checkCreateRSRawEncoder() {
if (rawEncoder == null) { if (rawEncoder == null) {
rawEncoder = createRawEncoder( rawEncoder = CodecUtil.createRSRawEncoder(getConf(),
CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
getNumDataUnits(), getNumParityUnits()); getNumDataUnits(), getNumParityUnits());
if (rawEncoder == null) {
rawEncoder = new RSRawEncoder(getNumDataUnits(), getNumParityUnits());
}
} }
return rawEncoder; return rawEncoder;
} }

View File

@ -17,11 +17,11 @@
*/ */
package org.apache.hadoop.io.erasurecode.coder; package org.apache.hadoop.io.erasurecode.coder;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.XORRawDecoder;
/** /**
* Xor erasure decoder that decodes a block group. * Xor erasure decoder that decodes a block group.
@ -39,10 +39,10 @@ public class XORErasureDecoder extends AbstractErasureDecoder {
} }
@Override @Override
protected ErasureCodingStep prepareDecodingStep(final ECBlockGroup blockGroup) { protected ErasureCodingStep prepareDecodingStep(
// May be configured final ECBlockGroup blockGroup) {
RawErasureDecoder rawDecoder = new XORRawDecoder( RawErasureDecoder rawDecoder = CodecUtil.createXORRawDecoder(getConf(),
getNumDataUnits(), getNumParityUnits()); getNumDataUnits(), getNumParityUnits());
ECBlock[] inputBlocks = getInputBlocks(blockGroup); ECBlock[] inputBlocks = getInputBlocks(blockGroup);

View File

@ -17,11 +17,11 @@
*/ */
package org.apache.hadoop.io.erasurecode.coder; package org.apache.hadoop.io.erasurecode.coder;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECBlock; import org.apache.hadoop.io.erasurecode.ECBlock;
import org.apache.hadoop.io.erasurecode.ECBlockGroup; import org.apache.hadoop.io.erasurecode.ECBlockGroup;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.XORRawEncoder;
/** /**
* Xor erasure encoder that encodes a block group. * Xor erasure encoder that encodes a block group.
@ -39,10 +39,10 @@ public class XORErasureEncoder extends AbstractErasureEncoder {
} }
@Override @Override
protected ErasureCodingStep prepareEncodingStep(final ECBlockGroup blockGroup) { protected ErasureCodingStep prepareEncodingStep(
// May be configured final ECBlockGroup blockGroup) {
RawErasureEncoder rawEncoder = new XORRawEncoder( RawErasureEncoder rawEncoder = CodecUtil.createXORRawEncoder(getConf(),
getNumDataUnits(), getNumParityUnits()); getNumDataUnits(), getNumParityUnits());
ECBlock[] inputBlocks = getInputBlocks(blockGroup); ECBlock[] inputBlocks = getInputBlocks(blockGroup);

View File

@ -42,6 +42,7 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
@ -155,7 +156,8 @@ public class DFSStripedInputStream extends DFSInputStream {
curStripeRange = new StripeRange(0, 0); curStripeRange = new StripeRange(0, 0);
readingService = readingService =
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
decoder = new RSRawDecoder(dataBlkNum, parityBlkNum); decoder = CodecUtil.createRSRawDecoder(dfsClient.getConfiguration(),
dataBlkNum, parityBlkNum);
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Creating an striped input stream for file " + src); DFSClient.LOG.debug("Creating an striped input stream for file " + src);
} }
@ -207,8 +209,8 @@ public class DFSStripedInputStream extends DFSInputStream {
// The purpose is to get start offset into each block. // The purpose is to get start offset into each block.
long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema, long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema,
cellSize, targetBlockGroup, offsetIntoBlockGroup); cellSize, targetBlockGroup, offsetIntoBlockGroup);
Preconditions.checkState( Preconditions.checkState(offsetsForInternalBlocks.length ==
offsetsForInternalBlocks.length == dataBlkNum + parityBlkNum); dataBlkNum + parityBlkNum);
long minOffset = offsetsForInternalBlocks[dataBlkNum]; long minOffset = offsetsForInternalBlocks[dataBlkNum];
retry = new ReaderRetryPolicy(); retry = new ReaderRetryPolicy();
@ -726,8 +728,10 @@ public class DFSStripedInputStream extends DFSInputStream {
void prepareParityChunk() { void prepareParityChunk() {
for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) { for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
if (alignedStripe.chunks[i] == null) { if (alignedStripe.chunks[i] == null) {
final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); final int decodeIndex = convertIndex4Decode(i,
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]); dataBlkNum, parityBlkNum);
alignedStripe.chunks[i] =
new StripingChunk(decodeInputs[decodeIndex]);
alignedStripe.chunks[i].addByteArraySlice(0, alignedStripe.chunks[i].addByteArraySlice(0,
(int) alignedStripe.getSpanInBlock()); (int) alignedStripe.getSpanInBlock());
break; break;
@ -807,7 +811,8 @@ public class DFSStripedInputStream extends DFSInputStream {
parityBlkNum); parityBlkNum);
decodeInputs[decodeIndex] = ByteBuffer.allocateDirect( decodeInputs[decodeIndex] = ByteBuffer.allocateDirect(
(int) alignedStripe.range.spanInBlock); (int) alignedStripe.range.spanInBlock);
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]); alignedStripe.chunks[i] =
new StripingChunk(decodeInputs[decodeIndex]);
if (blockReaders[i] == null) { if (blockReaders[i] == null) {
prepareParityBlockReader(i); prepareParityBlockReader(i);
} }
@ -839,7 +844,8 @@ public class DFSStripedInputStream extends DFSInputStream {
// decoders to work // decoders to work
final int span = (int) alignedStripe.getSpanInBlock(); final int span = (int) alignedStripe.getSpanInBlock();
for (int i = 0; i < alignedStripe.chunks.length; i++) { for (int i = 0; i < alignedStripe.chunks.length; i++) {
final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); final int decodeIndex = convertIndex4Decode(i,
dataBlkNum, parityBlkNum);
if (alignedStripe.chunks[i] != null && if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.ALLZERO) { alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
for (int j = 0; j < span; j++) { for (int j = 0; j < span; j++) {
@ -857,7 +863,8 @@ public class DFSStripedInputStream extends DFSInputStream {
for (int i = 0; i < alignedStripe.chunks.length; i++) { for (int i = 0; i < alignedStripe.chunks.length; i++) {
if (alignedStripe.chunks[i] != null && if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.MISSING) { alignedStripe.chunks[i].state == StripingChunk.MISSING) {
decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum); decodeIndices[pos++] = convertIndex4Decode(i,
dataBlkNum, parityBlkNum);
} }
} }
decodeIndices = Arrays.copyOf(decodeIndices, pos); decodeIndices = Arrays.copyOf(decodeIndices, pos);

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
@ -247,13 +248,16 @@ public class DFSStripedOutputStream extends DFSOutputStream {
numDataBlocks = schema.getNumDataUnits(); numDataBlocks = schema.getNumDataUnits();
numAllBlocks = numDataBlocks + numParityBlocks; numAllBlocks = numDataBlocks + numParityBlocks;
encoder = new RSRawEncoder(numDataBlocks, numParityBlocks); encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
numDataBlocks, numParityBlocks);
coordinator = new Coordinator(dfsClient.getConf(), numDataBlocks, numAllBlocks); coordinator = new Coordinator(dfsClient.getConf(),
numDataBlocks, numAllBlocks);
try { try {
cellBuffers = new CellBuffers(numParityBlocks); cellBuffers = new CellBuffers(numParityBlocks);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw DFSUtil.toInterruptedIOException("Failed to create cell buffers", ie); throw DFSUtil.toInterruptedIOException(
"Failed to create cell buffers", ie);
} }
List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks); List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
@ -318,7 +322,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
} }
} }
private void handleStreamerFailure(String err, Exception e) throws IOException { private void handleStreamerFailure(String err,
Exception e) throws IOException {
LOG.warn("Failed: " + err + ", " + this, e); LOG.warn("Failed: " + err + ", " + this, e);
getCurrentStreamer().setIsFailed(true); getCurrentStreamer().setIsFailed(true);
checkStreamers(); checkStreamers();
@ -487,7 +492,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
return; return;
} }
final int firstCellSize = (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize); final int firstCellSize =
(int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize? final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize?
firstCellSize : cellSize; firstCellSize : cellSize;
final ByteBuffer[] buffers = cellBuffers.getBuffers(); final ByteBuffer[] buffers = cellBuffers.getBuffers();

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECReco
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
@ -112,7 +113,7 @@ public final class ErasureCodingWorker {
} }
private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) { private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
return new RSRawDecoder(numDataUnits, numParityUnits); return CodecUtil.createRSRawDecoder(conf, numDataUnits, numParityUnits);
} }
private void initializeStripedReadThreadPool(int num) { private void initializeStripedReadThreadPool(int num) {

View File

@ -37,8 +37,10 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -230,8 +232,9 @@ public class TestDFSStripedInputStream {
for (int m : missingBlkIdx) { for (int m : missingBlkIdx) {
decodeInputs[m] = null; decodeInputs[m] = null;
} }
RSRawDecoder rsRawDecoder = new RSRawDecoder(DATA_BLK_NUM, PARITY_BLK_NUM); RawErasureDecoder rawDecoder = CodecUtil.createRSRawDecoder(conf,
rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs); DATA_BLK_NUM, PARITY_BLK_NUM);
rawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE; int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE); System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
} }

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -43,7 +44,8 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestDFSStripedOutputStream { public class TestDFSStripedOutputStream {
public static final Log LOG = LogFactory.getLog(TestDFSStripedOutputStream.class); public static final Log LOG = LogFactory.getLog(
TestDFSStripedOutputStream.class);
static { static {
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL); GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
@ -55,6 +57,7 @@ public class TestDFSStripedOutputStream {
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private DistributedFileSystem fs; private DistributedFileSystem fs;
private Configuration conf;
private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final int stripesPerBlock = 4; private final int stripesPerBlock = 4;
private final int blockSize = cellSize * stripesPerBlock; private final int blockSize = cellSize * stripesPerBlock;
@ -62,7 +65,7 @@ public class TestDFSStripedOutputStream {
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
int numDNs = dataBlocks + parityBlocks + 2; int numDNs = dataBlocks + parityBlocks + 2;
Configuration conf = new Configuration(); conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null, 0); cluster.getFileSystem().getClient().createErasureCodingZone("/", null, 0);
@ -140,7 +143,8 @@ public class TestDFSStripedOutputStream {
@Test @Test
public void testFileMoreThanABlockGroup2() throws IOException { public void testFileMoreThanABlockGroup2() throws IOException {
testOneFile("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize+ 123); testOneFile("/MoreThanABlockGroup2",
blockSize * dataBlocks + cellSize+ 123);
} }
@ -251,13 +255,14 @@ public class TestDFSStripedOutputStream {
} }
} }
static void verifyParity(final long size, final int cellSize, void verifyParity(final long size, final int cellSize,
byte[][] dataBytes, byte[][] parityBytes) { byte[][] dataBytes, byte[][] parityBytes) {
verifyParity(size, cellSize, dataBytes, parityBytes, -1); verifyParity(conf, size, cellSize, dataBytes, parityBytes, -1);
} }
static void verifyParity(final long size, final int cellSize, static void verifyParity(Configuration conf, final long size,
byte[][] dataBytes, byte[][] parityBytes, int killedDnIndex) { final int cellSize, byte[][] dataBytes,
byte[][] parityBytes, int killedDnIndex) {
// verify the parity blocks // verify the parity blocks
int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength( int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
size, cellSize, dataBytes.length, dataBytes.length); size, cellSize, dataBytes.length, dataBytes.length);
@ -275,7 +280,8 @@ public class TestDFSStripedOutputStream {
} }
} }
final RawErasureEncoder encoder = final RawErasureEncoder encoder =
new RSRawEncoder(dataBytes.length, parityBytes.length); CodecUtil.createRSRawEncoder(conf,
dataBytes.length, parityBytes.length);
encoder.encode(dataBytes, expectedParityBytes); encoder.encode(dataBytes, expectedParityBytes);
for (int i = 0; i < parityBytes.length; i++) { for (int i = 0; i < parityBytes.length; i++) {
if (i != killedDnIndex) { if (i != killedDnIndex) {

View File

@ -335,7 +335,7 @@ public class TestDFSStripedOutputStreamWithFailure {
} }
// check parity // check parity
TestDFSStripedOutputStream.verifyParity( TestDFSStripedOutputStream.verifyParity(dfs.getConf(),
lbs.getLocatedBlocks().get(group).getBlockSize(), lbs.getLocatedBlocks().get(group).getBlockSize(),
CELL_SIZE, dataBlockBytes, parityBlockBytes, CELL_SIZE, dataBlockBytes, parityBlockBytes,
killedDnIndex - dataBlockBytes.length); killedDnIndex - dataBlockBytes.length);