HADOOP-11828. Implement the Hitchhiker erasure coding algorithm. Contributed by Jack Liu Quan.
Change-Id: If43475ccc2574df60949c947af562722db076251
This commit is contained in:
parent
2ac39ca762
commit
1bb31fb22e
|
@ -262,6 +262,9 @@ Trunk (Unreleased)
|
||||||
HADOOP-11887. Introduce Intel ISA-L erasure coding library for native
|
HADOOP-11887. Introduce Intel ISA-L erasure coding library for native
|
||||||
erasure encoding support (Kai Zheng via Colin P. McCabe)
|
erasure encoding support (Kai Zheng via Colin P. McCabe)
|
||||||
|
|
||||||
|
HADOOP-11828. Implement the Hitchhiker erasure coding algorithm.
|
||||||
|
(Jack Liuquan via zhz)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HADOOP-12617. SPNEGO authentication request to non-default realm gets
|
HADOOP-12617. SPNEGO authentication request to non-default realm gets
|
||||||
|
|
|
@ -56,7 +56,7 @@ public abstract class AbstractErasureDecoder extends AbstractErasureCoder {
|
||||||
* We have all the data blocks and parity blocks as input blocks for
|
* We have all the data blocks and parity blocks as input blocks for
|
||||||
* recovering by default. It's codec specific
|
* recovering by default. It's codec specific
|
||||||
* @param blockGroup
|
* @param blockGroup
|
||||||
* @return
|
* @return input blocks
|
||||||
*/
|
*/
|
||||||
protected ECBlock[] getInputBlocks(ECBlockGroup blockGroup) {
|
protected ECBlock[] getInputBlocks(ECBlockGroup blockGroup) {
|
||||||
ECBlock[] inputBlocks = new ECBlock[getNumDataUnits() +
|
ECBlock[] inputBlocks = new ECBlock[getNumDataUnits() +
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.erasurecode.coder;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECBlock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abstract class for Hitchhiker common facilities shared by
|
||||||
|
* {@link HHXORErasureEncodingStep}and {@link HHXORErasureDecodingStep}.
|
||||||
|
*
|
||||||
|
* It implements {@link AbstractErasureCodingStep}.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public abstract class AbstractHHErasureCodingStep
|
||||||
|
extends AbstractErasureCodingStep {
|
||||||
|
|
||||||
|
private static final int SUB_PACKET_SIZE = 2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor given input blocks and output blocks.
|
||||||
|
*
|
||||||
|
* @param inputBlocks
|
||||||
|
* @param outputBlocks
|
||||||
|
*/
|
||||||
|
public AbstractHHErasureCodingStep(ECBlock[] inputBlocks,
|
||||||
|
ECBlock[] outputBlocks) {
|
||||||
|
super(inputBlocks, outputBlocks);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getSubPacketSize() {
|
||||||
|
return SUB_PACKET_SIZE;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,95 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.erasurecode.coder;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECBlock;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hitchhiker is a new erasure coding algorithm developed as a research project
|
||||||
|
* at UC Berkeley by Rashmi Vinayak.
|
||||||
|
* It has been shown to reduce network traffic and disk I/O by 25%-45% during
|
||||||
|
* data reconstruction while retaining the same storage capacity and failure
|
||||||
|
* tolerance capability of RS codes.
|
||||||
|
* The Hitchhiker algorithm is described in K.V.Rashmi, et al.,
|
||||||
|
* "A "Hitchhiker's" Guide to Fast and Efficient Data Reconstruction in
|
||||||
|
* Erasure-coded Data Centers", in ACM SIGCOMM 2014.
|
||||||
|
* This is Hitchhiker-XOR erasure decoder that decodes a block group.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class HHXORErasureDecoder extends AbstractErasureDecoder {
|
||||||
|
private RawErasureDecoder rsRawDecoder;
|
||||||
|
private RawErasureEncoder xorRawEncoder;
|
||||||
|
|
||||||
|
public HHXORErasureDecoder(int numDataUnits, int numParityUnits) {
|
||||||
|
super(numDataUnits, numParityUnits);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HHXORErasureDecoder(ECSchema schema) {
|
||||||
|
super(schema);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ErasureCodingStep prepareDecodingStep(
|
||||||
|
final ECBlockGroup blockGroup) {
|
||||||
|
|
||||||
|
RawErasureDecoder rawDecoder;
|
||||||
|
RawErasureEncoder rawEncoder;
|
||||||
|
|
||||||
|
ECBlock[] inputBlocks = getInputBlocks(blockGroup);
|
||||||
|
ECBlock[] outputBlocks = getOutputBlocks(blockGroup);
|
||||||
|
|
||||||
|
rawDecoder = checkCreateRSRawDecoder();
|
||||||
|
rawEncoder = checkCreateXorRawEncoder();
|
||||||
|
|
||||||
|
return new HHXORErasureDecodingStep(inputBlocks,
|
||||||
|
getErasedIndexes(inputBlocks), outputBlocks, rawDecoder,
|
||||||
|
rawEncoder);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RawErasureDecoder checkCreateRSRawDecoder() {
|
||||||
|
if (rsRawDecoder == null) {
|
||||||
|
rsRawDecoder = CodecUtil.createRSRawDecoder(getConf(),
|
||||||
|
getNumDataUnits(), getNumParityUnits());
|
||||||
|
}
|
||||||
|
return rsRawDecoder;
|
||||||
|
}
|
||||||
|
|
||||||
|
private RawErasureEncoder checkCreateXorRawEncoder() {
|
||||||
|
if (xorRawEncoder == null) {
|
||||||
|
xorRawEncoder = CodecUtil.createXORRawEncoder(getConf(),
|
||||||
|
getNumDataUnits(), getNumParityUnits());
|
||||||
|
xorRawEncoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, false);
|
||||||
|
}
|
||||||
|
return xorRawEncoder;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void release() {
|
||||||
|
if (rsRawDecoder != null) {
|
||||||
|
rsRawDecoder.release();
|
||||||
|
}
|
||||||
|
if (xorRawEncoder != null) {
|
||||||
|
xorRawEncoder.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,349 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.erasurecode.coder;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECBlock;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||||
|
import org.apache.hadoop.io.erasurecode.coder.util.HHUtil;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hitchhiker-XOR Erasure decoding step, a wrapper of all the necessary
|
||||||
|
* information to perform a decoding step involved in the whole process of
|
||||||
|
* decoding a block group.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class HHXORErasureDecodingStep extends AbstractHHErasureCodingStep {
|
||||||
|
private int pbIndex;
|
||||||
|
private int[] piggyBackIndex;
|
||||||
|
private int[] piggyBackFullIndex;
|
||||||
|
private int[] erasedIndexes;
|
||||||
|
private RawErasureDecoder rsRawDecoder;
|
||||||
|
private RawErasureEncoder xorRawEncoder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The constructor with all the necessary info.
|
||||||
|
* @param inputBlocks
|
||||||
|
* @param erasedIndexes the indexes of erased blocks in inputBlocks array
|
||||||
|
* @param outputBlocks
|
||||||
|
* @param rawDecoder underlying RS decoder for hitchhiker decoding
|
||||||
|
* @param rawEncoder underlying XOR encoder for hitchhiker decoding
|
||||||
|
*/
|
||||||
|
public HHXORErasureDecodingStep(ECBlock[] inputBlocks, int[] erasedIndexes,
|
||||||
|
ECBlock[] outputBlocks, RawErasureDecoder rawDecoder,
|
||||||
|
RawErasureEncoder rawEncoder) {
|
||||||
|
super(inputBlocks, outputBlocks);
|
||||||
|
this.pbIndex = rawDecoder.getNumParityUnits() - 1;
|
||||||
|
this.erasedIndexes = erasedIndexes;
|
||||||
|
this.rsRawDecoder = rawDecoder;
|
||||||
|
this.xorRawEncoder = rawEncoder;
|
||||||
|
|
||||||
|
this.piggyBackIndex = HHUtil.initPiggyBackIndexWithoutPBVec(
|
||||||
|
rawDecoder.getNumDataUnits(), rawDecoder.getNumParityUnits());
|
||||||
|
this.piggyBackFullIndex = HHUtil.initPiggyBackFullIndexVec(
|
||||||
|
rawDecoder.getNumDataUnits(), piggyBackIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks) {
|
||||||
|
if (erasedIndexes.length == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer[] inputBuffers = ECChunk.toBuffers(inputChunks);
|
||||||
|
ByteBuffer[] outputBuffers = ECChunk.toBuffers(outputChunks);
|
||||||
|
performCoding(inputBuffers, outputBuffers);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void performCoding(ByteBuffer[] inputs, ByteBuffer[] outputs) {
|
||||||
|
final int numDataUnits = rsRawDecoder.getNumDataUnits();
|
||||||
|
final int numParityUnits = rsRawDecoder.getNumParityUnits();
|
||||||
|
final int numTotalUnits = numDataUnits + numParityUnits;
|
||||||
|
final int subPacketSize = getSubPacketSize();
|
||||||
|
|
||||||
|
ByteBuffer fisrtValidInput = HHUtil.findFirstValidInput(inputs);
|
||||||
|
final int bufSize = fisrtValidInput.remaining();
|
||||||
|
|
||||||
|
if (inputs.length != numTotalUnits * getSubPacketSize()) {
|
||||||
|
throw new IllegalArgumentException("Invalid inputs length");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (outputs.length != erasedIndexes.length * getSubPacketSize()) {
|
||||||
|
throw new IllegalArgumentException("Invalid outputs length");
|
||||||
|
}
|
||||||
|
|
||||||
|
// notes:inputs length = numDataUnits * subPacketizationSize
|
||||||
|
// first numDataUnits length is first sub-stripe,
|
||||||
|
// second numDataUnits length is second sub-stripe
|
||||||
|
ByteBuffer[][] newIn = new ByteBuffer[subPacketSize][numTotalUnits];
|
||||||
|
for (int i = 0; i < subPacketSize; ++i) {
|
||||||
|
for (int j = 0; j < numTotalUnits; ++j) {
|
||||||
|
newIn[i][j] = inputs[i * numTotalUnits + j];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer[][] newOut = new ByteBuffer[subPacketSize][erasedIndexes.length];
|
||||||
|
for (int i = 0; i < subPacketSize; ++i) {
|
||||||
|
for (int j = 0; j < erasedIndexes.length; ++j) {
|
||||||
|
newOut[i][j] = outputs[i * erasedIndexes.length + j];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (erasedIndexes.length == 1 && erasedIndexes[0] < numDataUnits) {
|
||||||
|
// Only reconstruct one data unit missing
|
||||||
|
doDecodeSingle(newIn, newOut, erasedIndexes[0], bufSize,
|
||||||
|
fisrtValidInput.isDirect());
|
||||||
|
} else {
|
||||||
|
doDecodeMultiAndParity(newIn, newOut, erasedIndexes, bufSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doDecodeSingle(ByteBuffer[][] inputs, ByteBuffer[][] outputs,
|
||||||
|
int erasedLocationToFix, int bufSize,
|
||||||
|
boolean isDirect) {
|
||||||
|
final int numDataUnits = rsRawDecoder.getNumDataUnits();
|
||||||
|
final int numParityUnits = rsRawDecoder.getNumParityUnits();
|
||||||
|
final int subPacketSize = getSubPacketSize();
|
||||||
|
|
||||||
|
int[][] inputPositions = new int[subPacketSize][inputs[0].length];
|
||||||
|
for (int i = 0; i < subPacketSize; ++i) {
|
||||||
|
for (int j = 0; j < inputs[i].length; ++j) {
|
||||||
|
if (inputs[i][j] != null) {
|
||||||
|
inputPositions[i][j] = inputs[i][j].position();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer[] tempInputs = new ByteBuffer[numDataUnits + numParityUnits];
|
||||||
|
for (int i = 0; i < tempInputs.length; ++i) {
|
||||||
|
tempInputs[i] = inputs[1][i];
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer[][] tmpOutputs = new ByteBuffer[subPacketSize][numParityUnits];
|
||||||
|
for (int i = 0; i < getSubPacketSize(); ++i) {
|
||||||
|
for (int j = 0; j < erasedIndexes.length; ++j) {
|
||||||
|
tmpOutputs[i][j] = outputs[i][j];
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int m = erasedIndexes.length; m < numParityUnits; ++m) {
|
||||||
|
tmpOutputs[i][m] = HHUtil.allocateByteBuffer(isDirect, bufSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// First consider the second subPacket
|
||||||
|
int[] erasedLocation = new int[numParityUnits];
|
||||||
|
erasedLocation[0] = erasedLocationToFix;
|
||||||
|
|
||||||
|
// assign the erased locations based on the locations not read for
|
||||||
|
// second subPacket but from decoding
|
||||||
|
for (int i = 1; i < numParityUnits; i++) {
|
||||||
|
erasedLocation[i] = numDataUnits + i;
|
||||||
|
tempInputs[numDataUnits + i] = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
rsRawDecoder.decode(tempInputs, erasedLocation, tmpOutputs[1]);
|
||||||
|
|
||||||
|
int piggyBackParityIndex = piggyBackFullIndex[erasedLocationToFix];
|
||||||
|
ByteBuffer piggyBack = HHUtil.getPiggyBackForDecode(inputs, tmpOutputs,
|
||||||
|
piggyBackParityIndex, numDataUnits, numParityUnits, pbIndex);
|
||||||
|
|
||||||
|
// Second consider the first subPacket.
|
||||||
|
// get the value of the piggyback associated with the erased location
|
||||||
|
if (isDirect) {
|
||||||
|
// decode the erased value in the first subPacket by using the piggyback
|
||||||
|
int idxToWrite = 0;
|
||||||
|
doDecodeByPiggyBack(inputs[0], tmpOutputs[0][idxToWrite], piggyBack,
|
||||||
|
erasedLocationToFix);
|
||||||
|
} else {
|
||||||
|
ByteBuffer buffer;
|
||||||
|
byte[][][] newInputs = new byte[getSubPacketSize()][inputs[0].length][];
|
||||||
|
int[][] inputOffsets = new int[getSubPacketSize()][inputs[0].length];
|
||||||
|
byte[][][] newOutputs = new byte[getSubPacketSize()][numParityUnits][];
|
||||||
|
int[][] outOffsets = new int[getSubPacketSize()][numParityUnits];
|
||||||
|
|
||||||
|
for (int i = 0; i < getSubPacketSize(); ++i) {
|
||||||
|
for (int j = 0; j < inputs[0].length; ++j) {
|
||||||
|
buffer = inputs[i][j];
|
||||||
|
if (buffer != null) {
|
||||||
|
inputOffsets[i][j] = buffer.arrayOffset() + buffer.position();
|
||||||
|
newInputs[i][j] = buffer.array();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < getSubPacketSize(); ++i) {
|
||||||
|
for (int j = 0; j < numParityUnits; ++j) {
|
||||||
|
buffer = tmpOutputs[i][j];
|
||||||
|
if (buffer != null) {
|
||||||
|
outOffsets[i][j] = buffer.arrayOffset() + buffer.position();
|
||||||
|
newOutputs[i][j] = buffer.array();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] newPiggyBack = piggyBack.array();
|
||||||
|
|
||||||
|
// decode the erased value in the first subPacket by using the piggyback
|
||||||
|
int idxToWrite = 0;
|
||||||
|
doDecodeByPiggyBack(newInputs[0], inputOffsets[0],
|
||||||
|
newOutputs[0][idxToWrite], outOffsets[0][idxToWrite],
|
||||||
|
newPiggyBack, erasedLocationToFix, bufSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < subPacketSize; ++i) {
|
||||||
|
for (int j = 0; j < inputs[i].length; ++j) {
|
||||||
|
if (inputs[i][j] != null) {
|
||||||
|
inputs[i][j].position(inputPositions[i][j] + bufSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doDecodeByPiggyBack(ByteBuffer[] inputs,
|
||||||
|
ByteBuffer outputs,
|
||||||
|
ByteBuffer piggyBack,
|
||||||
|
int erasedLocationToFix) {
|
||||||
|
final int thisPiggyBackSetIdx = piggyBackFullIndex[erasedLocationToFix];
|
||||||
|
final int startIndex = piggyBackIndex[thisPiggyBackSetIdx - 1];
|
||||||
|
final int endIndex = piggyBackIndex[thisPiggyBackSetIdx];
|
||||||
|
|
||||||
|
// recover first sub-stripe data by XOR piggyback
|
||||||
|
int bufSize = piggyBack.remaining();
|
||||||
|
for (int i = piggyBack.position();
|
||||||
|
i < piggyBack.position() + bufSize; i++) {
|
||||||
|
for (int j = startIndex; j < endIndex; j++) {
|
||||||
|
if (inputs[j] != null) {
|
||||||
|
piggyBack.put(i, (byte)
|
||||||
|
(piggyBack.get(i) ^ inputs[j].get(inputs[j].position() + i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
outputs.put(outputs.position() + i, piggyBack.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doDecodeByPiggyBack(byte[][] inputs, int[] inputOffsets,
|
||||||
|
byte[] outputs, int outOffset,
|
||||||
|
byte[] piggyBack, int erasedLocationToFix,
|
||||||
|
int bufSize) {
|
||||||
|
final int thisPiggyBackSetIdx = piggyBackFullIndex[erasedLocationToFix];
|
||||||
|
final int startIndex = piggyBackIndex[thisPiggyBackSetIdx - 1];
|
||||||
|
final int endIndex = piggyBackIndex[thisPiggyBackSetIdx];
|
||||||
|
|
||||||
|
// recover first sub-stripe data by XOR piggyback
|
||||||
|
for (int i = 0; i < bufSize; i++) {
|
||||||
|
for (int j = startIndex; j < endIndex; j++) {
|
||||||
|
if (inputs[j] != null) {
|
||||||
|
piggyBack[i] = (byte) (piggyBack[i] ^ inputs[j][i + inputOffsets[j]]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
outputs[i + outOffset] = piggyBack[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doDecodeMultiAndParity(ByteBuffer[][] inputs,
|
||||||
|
ByteBuffer[][] outputs,
|
||||||
|
int[] erasedLocationToFix, int bufSize) {
|
||||||
|
final int numDataUnits = rsRawDecoder.getNumDataUnits();
|
||||||
|
final int numParityUnits = rsRawDecoder.getNumParityUnits();
|
||||||
|
final int numTotalUnits = numDataUnits + numParityUnits;
|
||||||
|
int[] parityToFixFlag = new int[numTotalUnits];
|
||||||
|
|
||||||
|
for (int i = 0; i < erasedLocationToFix.length; ++i) {
|
||||||
|
if (erasedLocationToFix[i] >= numDataUnits) {
|
||||||
|
parityToFixFlag[erasedLocationToFix[i]] = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int[] inputPositions = new int[inputs[0].length];
|
||||||
|
for (int i = 0; i < inputPositions.length; i++) {
|
||||||
|
if (inputs[0][i] != null) {
|
||||||
|
inputPositions[i] = inputs[0][i].position();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// decoded first sub-stripe
|
||||||
|
rsRawDecoder.decode(inputs[0], erasedLocationToFix, outputs[0]);
|
||||||
|
|
||||||
|
for (int i = 0; i < inputs[0].length; i++) {
|
||||||
|
if (inputs[0][i] != null) {
|
||||||
|
// dataLen bytes consumed
|
||||||
|
inputs[0][i].position(inputPositions[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer[] tempInput = new ByteBuffer[numDataUnits];
|
||||||
|
for (int i = 0; i < numDataUnits; ++i) {
|
||||||
|
tempInput[i] = inputs[0][i];
|
||||||
|
//
|
||||||
|
// if (!isDirect && tempInput[i] != null) {
|
||||||
|
// tempInput[i].position(tempInput[i].position() - bufSize);
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < erasedLocationToFix.length; ++i) {
|
||||||
|
if (erasedLocationToFix[i] < numDataUnits) {
|
||||||
|
tempInput[erasedLocationToFix[i]] = outputs[0][i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer[] piggyBack = HHUtil.getPiggyBacksFromInput(tempInput,
|
||||||
|
piggyBackIndex, numParityUnits, 0, xorRawEncoder);
|
||||||
|
|
||||||
|
for (int j = numDataUnits + 1; j < numTotalUnits; ++j) {
|
||||||
|
if (parityToFixFlag[j] == 0 && inputs[1][j] != null) {
|
||||||
|
// f(b) + f(a1,a2,a3....)
|
||||||
|
for (int k = inputs[1][j].position(),
|
||||||
|
m = piggyBack[j - numDataUnits - 1].position();
|
||||||
|
k < inputs[1][j].limit(); ++k, ++m) {
|
||||||
|
inputs[1][j].put(k, (byte)
|
||||||
|
(inputs[1][j].get(k) ^
|
||||||
|
piggyBack[j - numDataUnits - 1].get(m)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// decoded second sub-stripe
|
||||||
|
rsRawDecoder.decode(inputs[1], erasedLocationToFix, outputs[1]);
|
||||||
|
|
||||||
|
// parity index = 0, the data have no piggyBack
|
||||||
|
for (int j = 0; j < erasedLocationToFix.length; ++j) {
|
||||||
|
if (erasedLocationToFix[j] < numTotalUnits
|
||||||
|
&& erasedLocationToFix[j] > numDataUnits) {
|
||||||
|
int parityIndex = erasedLocationToFix[j] - numDataUnits - 1;
|
||||||
|
for (int k = outputs[1][j].position(),
|
||||||
|
m = piggyBack[parityIndex].position();
|
||||||
|
k < outputs[1][j].limit(); ++k, ++m) {
|
||||||
|
outputs[1][j].put(k, (byte)
|
||||||
|
(outputs[1][j].get(k) ^ piggyBack[parityIndex].get(m)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < inputs[0].length; i++) {
|
||||||
|
if (inputs[0][i] != null) {
|
||||||
|
// dataLen bytes consumed
|
||||||
|
inputs[0][i].position(inputPositions[i] + bufSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,92 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.erasurecode.coder;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECBlock;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECBlockGroup;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.CoderOption;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hitchhiker is a new erasure coding algorithm developed as a research project
|
||||||
|
* at UC Berkeley by Rashmi Vinayak.
|
||||||
|
* It has been shown to reduce network traffic and disk I/O by 25%-45% during
|
||||||
|
* data reconstruction while retaining the same storage capacity and failure
|
||||||
|
* tolerance capability of RS codes.
|
||||||
|
* The Hitchhiker algorithm is described in K.V.Rashmi, et al.,
|
||||||
|
* "A "Hitchhiker's" Guide to Fast and Efficient Data Reconstruction in
|
||||||
|
* Erasure-coded Data Centers", in ACM SIGCOMM 2014.
|
||||||
|
* This is Hitchhiker-XOR erasure encoder that encodes a block group.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class HHXORErasureEncoder extends AbstractErasureEncoder {
|
||||||
|
private RawErasureEncoder rsRawEncoder;
|
||||||
|
private RawErasureEncoder xorRawEncoder;
|
||||||
|
|
||||||
|
public HHXORErasureEncoder(int numDataUnits, int numParityUnits) {
|
||||||
|
super(numDataUnits, numParityUnits);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HHXORErasureEncoder(ECSchema schema) {
|
||||||
|
super(schema);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ErasureCodingStep prepareEncodingStep(
|
||||||
|
final ECBlockGroup blockGroup) {
|
||||||
|
|
||||||
|
RawErasureEncoder rsRawEncoderTmp = checkCreateRSRawEncoder();
|
||||||
|
RawErasureEncoder xorRawEncoderTmp = checkCreateXorRawEncoder();
|
||||||
|
|
||||||
|
ECBlock[] inputBlocks = getInputBlocks(blockGroup);
|
||||||
|
|
||||||
|
return new HHXORErasureEncodingStep(inputBlocks,
|
||||||
|
getOutputBlocks(blockGroup), rsRawEncoderTmp, xorRawEncoderTmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RawErasureEncoder checkCreateRSRawEncoder() {
|
||||||
|
if (rsRawEncoder == null) {
|
||||||
|
rsRawEncoder = CodecUtil.createRSRawEncoder(getConf(),
|
||||||
|
getNumDataUnits(), getNumParityUnits());
|
||||||
|
}
|
||||||
|
return rsRawEncoder;
|
||||||
|
}
|
||||||
|
|
||||||
|
private RawErasureEncoder checkCreateXorRawEncoder() {
|
||||||
|
if (xorRawEncoder == null) {
|
||||||
|
xorRawEncoder = CodecUtil.createXORRawEncoder(getConf(),
|
||||||
|
getNumDataUnits(), getNumParityUnits());
|
||||||
|
xorRawEncoder.setCoderOption(CoderOption.ALLOW_CHANGE_INPUTS, false);
|
||||||
|
}
|
||||||
|
return xorRawEncoder;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void release() {
|
||||||
|
if (rsRawEncoder != null) {
|
||||||
|
rsRawEncoder.release();
|
||||||
|
}
|
||||||
|
if (xorRawEncoder != null) {
|
||||||
|
xorRawEncoder.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,146 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.erasurecode.coder;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECBlock;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||||
|
import org.apache.hadoop.io.erasurecode.coder.util.HHUtil;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hitchhiker-XOR Erasure encoding step, a wrapper of all the necessary
|
||||||
|
* information to perform an encoding step involved in the whole process of
|
||||||
|
* encoding a block group.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class HHXORErasureEncodingStep extends AbstractHHErasureCodingStep {
|
||||||
|
private int[] piggyBackIndex;
|
||||||
|
private RawErasureEncoder rsRawEncoder;
|
||||||
|
private RawErasureEncoder xorRawEncoder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The constructor with all the necessary info.
|
||||||
|
*
|
||||||
|
* @param inputBlocks
|
||||||
|
* @param outputBlocks
|
||||||
|
* @param rsRawEncoder underlying RS encoder for hitchhiker encoding
|
||||||
|
* @param xorRawEncoder underlying XOR encoder for hitchhiker encoding
|
||||||
|
*/
|
||||||
|
public HHXORErasureEncodingStep(ECBlock[] inputBlocks, ECBlock[] outputBlocks,
|
||||||
|
RawErasureEncoder rsRawEncoder,
|
||||||
|
RawErasureEncoder xorRawEncoder) {
|
||||||
|
super(inputBlocks, outputBlocks);
|
||||||
|
|
||||||
|
this.rsRawEncoder = rsRawEncoder;
|
||||||
|
this.xorRawEncoder = xorRawEncoder;
|
||||||
|
piggyBackIndex = HHUtil.initPiggyBackIndexWithoutPBVec(
|
||||||
|
rsRawEncoder.getNumDataUnits(), rsRawEncoder.getNumParityUnits());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void performCoding(ECChunk[] inputChunks, ECChunk[] outputChunks) {
|
||||||
|
ByteBuffer[] inputBuffers = ECChunk.toBuffers(inputChunks);
|
||||||
|
ByteBuffer[] outputBuffers = ECChunk.toBuffers(outputChunks);
|
||||||
|
performCoding(inputBuffers, outputBuffers);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void performCoding(ByteBuffer[] inputs, ByteBuffer[] outputs) {
|
||||||
|
final int numDataUnits = this.rsRawEncoder.getNumDataUnits();
|
||||||
|
final int numParityUnits = this.rsRawEncoder.getNumParityUnits();
|
||||||
|
final int subSPacketSize = getSubPacketSize();
|
||||||
|
|
||||||
|
// inputs length = numDataUnits * subPacketSize
|
||||||
|
if (inputs.length != numDataUnits * subSPacketSize) {
|
||||||
|
throw new IllegalArgumentException("Invalid inputs length");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (outputs.length != numParityUnits * subSPacketSize) {
|
||||||
|
throw new IllegalArgumentException("Invalid outputs length");
|
||||||
|
}
|
||||||
|
|
||||||
|
// first numDataUnits length is first sub-stripe,
|
||||||
|
// second numDataUnits length is second sub-stripe
|
||||||
|
ByteBuffer[][] hhInputs = new ByteBuffer[subSPacketSize][numDataUnits];
|
||||||
|
for (int i = 0; i < subSPacketSize; ++i) {
|
||||||
|
for (int j = 0; j < numDataUnits; ++j) {
|
||||||
|
hhInputs[i][j] = inputs[i * numDataUnits + j];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer[][] hhOutputs = new ByteBuffer[subSPacketSize][numParityUnits];
|
||||||
|
for (int i = 0; i < subSPacketSize; ++i) {
|
||||||
|
for (int j = 0; j < numParityUnits; ++j) {
|
||||||
|
hhOutputs[i][j] = outputs[i * numParityUnits + j];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
doEncode(hhInputs, hhOutputs);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doEncode(ByteBuffer[][] inputs, ByteBuffer[][] outputs) {
|
||||||
|
final int numParityUnits = this.rsRawEncoder.getNumParityUnits();
|
||||||
|
|
||||||
|
// calc piggyBacks using first sub-packet
|
||||||
|
ByteBuffer[] piggyBacks = HHUtil.getPiggyBacksFromInput(inputs[0],
|
||||||
|
piggyBackIndex, numParityUnits, 0, xorRawEncoder);
|
||||||
|
|
||||||
|
// Step1: RS encode each byte-stripe of sub-packets
|
||||||
|
for (int i = 0; i < getSubPacketSize(); ++i) {
|
||||||
|
rsRawEncoder.encode(inputs[i], outputs[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step2: Adding piggybacks to the parities
|
||||||
|
// Only second sub-packet is added with a piggyback.
|
||||||
|
encodeWithPiggyBacks(piggyBacks, outputs, numParityUnits,
|
||||||
|
inputs[0][0].isDirect());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void encodeWithPiggyBacks(ByteBuffer[] piggyBacks,
|
||||||
|
ByteBuffer[][] outputs,
|
||||||
|
int numParityUnits,
|
||||||
|
boolean bIsDirect) {
|
||||||
|
if (!bIsDirect) {
|
||||||
|
for (int i = 0; i < numParityUnits - 1; i++) {
|
||||||
|
int parityIndex = i + 1;
|
||||||
|
int bufSize = piggyBacks[i].remaining();
|
||||||
|
byte[] newOut = outputs[1][parityIndex].array();
|
||||||
|
int offset = outputs[1][parityIndex].arrayOffset()
|
||||||
|
+ outputs[1][parityIndex].position();
|
||||||
|
|
||||||
|
for (int k = offset, j = 0; j < bufSize; k++, j++) {
|
||||||
|
newOut[k] = (byte) (newOut[k] ^ piggyBacks[i].get(j));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < numParityUnits - 1; i++) {
|
||||||
|
int parityIndex = i + 1;
|
||||||
|
for (int k = piggyBacks[i].position(),
|
||||||
|
m = outputs[1][parityIndex].position();
|
||||||
|
k < piggyBacks[i].limit(); k++, m++) {
|
||||||
|
outputs[1][parityIndex].put(m,
|
||||||
|
(byte) (outputs[1][parityIndex].get(m) ^ piggyBacks[i].get(k)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,216 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.erasurecode.coder.util;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Some utilities for Hitchhiker coding.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public final class HHUtil {
|
||||||
|
private HHUtil() {
|
||||||
|
// No called
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int[] initPiggyBackIndexWithoutPBVec(int numDataUnits,
|
||||||
|
int numParityUnits) {
|
||||||
|
final int piggyBackSize = numDataUnits / (numParityUnits - 1);
|
||||||
|
int[] piggyBackIndex = new int[numParityUnits];
|
||||||
|
|
||||||
|
for (int i = 0; i < numDataUnits; ++i) {
|
||||||
|
if ((i % piggyBackSize) == 0) {
|
||||||
|
piggyBackIndex[i / piggyBackSize] = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
piggyBackIndex[numParityUnits - 1] = numDataUnits;
|
||||||
|
return piggyBackIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int[] initPiggyBackFullIndexVec(int numDataUnits,
|
||||||
|
int[] piggyBackIndex) {
|
||||||
|
int[] piggyBackFullIndex = new int[numDataUnits];
|
||||||
|
|
||||||
|
for (int i = 1; i < piggyBackIndex.length; ++i) {
|
||||||
|
for (int j = piggyBackIndex[i - 1]; j < piggyBackIndex[i]; ++j) {
|
||||||
|
piggyBackFullIndex[j] = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return piggyBackFullIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ByteBuffer[] getPiggyBacksFromInput(ByteBuffer[] inputs,
|
||||||
|
int[] piggyBackIndex,
|
||||||
|
int numParityUnits,
|
||||||
|
int pgIndex,
|
||||||
|
RawErasureEncoder encoder) {
|
||||||
|
ByteBuffer[] emptyInput = new ByteBuffer[inputs.length];
|
||||||
|
ByteBuffer[] tempInput = new ByteBuffer[inputs.length];
|
||||||
|
int[] inputPositions = new int[inputs.length];
|
||||||
|
|
||||||
|
for (int m = 0; m < inputs.length; ++m) {
|
||||||
|
if (inputs[m] != null) {
|
||||||
|
emptyInput[m] = allocateByteBuffer(inputs[m].isDirect(),
|
||||||
|
inputs[m].remaining());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer[] tempOutput = new ByteBuffer[numParityUnits];
|
||||||
|
for (int m = 0; m < numParityUnits; ++m) {
|
||||||
|
tempOutput[m] = allocateByteBuffer(inputs[m].isDirect(),
|
||||||
|
inputs[0].remaining());
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer[] piggyBacks = new ByteBuffer[numParityUnits - 1];
|
||||||
|
assert (piggyBackIndex.length >= numParityUnits);
|
||||||
|
|
||||||
|
// using underlying RS code to create piggybacks
|
||||||
|
for (int i = 0; i < numParityUnits - 1; ++i) {
|
||||||
|
for (int k = piggyBackIndex[i]; k < piggyBackIndex[i + 1]; ++k) {
|
||||||
|
tempInput[k] = inputs[k];
|
||||||
|
inputPositions[k] = inputs[k].position();
|
||||||
|
}
|
||||||
|
for (int n = 0; n < emptyInput.length; ++n) {
|
||||||
|
if (tempInput[n] == null) {
|
||||||
|
tempInput[n] = emptyInput[n];
|
||||||
|
inputPositions[n] = emptyInput[n].position();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
encoder.encode(tempInput, tempOutput);
|
||||||
|
|
||||||
|
piggyBacks[i] = cloneBufferData(tempOutput[pgIndex]);
|
||||||
|
|
||||||
|
for (int j = 0; j < tempInput.length; j++) {
|
||||||
|
if (tempInput[j] != null) {
|
||||||
|
tempInput[j].position(inputPositions[j]);
|
||||||
|
tempInput[j] = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int j = 0; j < tempOutput.length; j++) {
|
||||||
|
tempOutput[j].clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return piggyBacks;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ByteBuffer cloneBufferData(ByteBuffer srcBuffer) {
|
||||||
|
ByteBuffer destBuffer;
|
||||||
|
byte[] bytesArr = new byte[srcBuffer.remaining()];
|
||||||
|
|
||||||
|
srcBuffer.mark();
|
||||||
|
srcBuffer.get(bytesArr);
|
||||||
|
srcBuffer.reset();
|
||||||
|
|
||||||
|
if (!srcBuffer.isDirect()) {
|
||||||
|
destBuffer = ByteBuffer.wrap(bytesArr);
|
||||||
|
} else {
|
||||||
|
destBuffer = ByteBuffer.allocateDirect(srcBuffer.remaining());
|
||||||
|
destBuffer.put(bytesArr);
|
||||||
|
destBuffer.flip();
|
||||||
|
}
|
||||||
|
|
||||||
|
return destBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ByteBuffer allocateByteBuffer(boolean useDirectBuffer,
|
||||||
|
int bufSize) {
|
||||||
|
if (useDirectBuffer) {
|
||||||
|
return ByteBuffer.allocateDirect(bufSize);
|
||||||
|
} else {
|
||||||
|
return ByteBuffer.allocate(bufSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ByteBuffer getPiggyBackForDecode(ByteBuffer[][] inputs,
|
||||||
|
ByteBuffer[][] outputs,
|
||||||
|
int pbParityIndex,
|
||||||
|
int numDataUnits,
|
||||||
|
int numParityUnits,
|
||||||
|
int pbIndex) {
|
||||||
|
ByteBuffer fisrtValidInput = HHUtil.findFirstValidInput(inputs[0]);
|
||||||
|
int bufSize = fisrtValidInput.remaining();
|
||||||
|
|
||||||
|
ByteBuffer piggybacks = allocateByteBuffer(fisrtValidInput.isDirect(),
|
||||||
|
bufSize);
|
||||||
|
|
||||||
|
// Use piggyBackParityIndex to figure out which parity location has the
|
||||||
|
// associated piggyBack
|
||||||
|
// Obtain the piggyback by subtracting the decoded (second sub-packet
|
||||||
|
// only ) parity value from the actually read parity value
|
||||||
|
if (pbParityIndex < numParityUnits) {
|
||||||
|
// not the last piggybackSet
|
||||||
|
int inputIdx = numDataUnits + pbParityIndex;
|
||||||
|
int inputPos = inputs[1][inputIdx].position();
|
||||||
|
int outputPos = outputs[1][pbParityIndex].position();
|
||||||
|
|
||||||
|
for (int m = 0, k = inputPos, n = outputPos; m < bufSize; k++, m++, n++) {
|
||||||
|
int valueWithPb = 0xFF & inputs[1][inputIdx].get(k);
|
||||||
|
int valueWithoutPb = 0xFF & outputs[1][pbParityIndex].get(n);
|
||||||
|
piggybacks.put(m, (byte) RSUtil.GF.add(valueWithPb, valueWithoutPb));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// last piggybackSet
|
||||||
|
int sum = 0;
|
||||||
|
for (int k = 0; k < bufSize; k++) {
|
||||||
|
sum = 0;
|
||||||
|
for (int i = 1; i < numParityUnits; i++) {
|
||||||
|
int inIdx = numDataUnits + i;
|
||||||
|
int inPos = inputs[1][numDataUnits + i].position();
|
||||||
|
int outPos = outputs[1][i].position();
|
||||||
|
|
||||||
|
sum = RSUtil.GF.add(sum, (0xFF & inputs[1][inIdx].get(inPos + k)));
|
||||||
|
sum = RSUtil.GF.add(sum, (0xFF & outputs[1][i].get(outPos + k)));
|
||||||
|
}
|
||||||
|
|
||||||
|
sum = RSUtil.GF.add(sum,
|
||||||
|
(0xFF & inputs[0][numDataUnits + pbIndex].get(
|
||||||
|
inputs[0][numDataUnits + pbIndex].position() + k)));
|
||||||
|
|
||||||
|
piggybacks.put(k, (byte) sum);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return piggybacks;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find the valid input from all the inputs.
|
||||||
|
* @param inputs input buffers to look for valid input
|
||||||
|
* @return the first valid input
|
||||||
|
*/
|
||||||
|
public static <T> T findFirstValidInput(T[] inputs) {
|
||||||
|
for (T input : inputs) {
|
||||||
|
if (input != null) {
|
||||||
|
return input;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new HadoopIllegalArgumentException(
|
||||||
|
"Invalid inputs are found, all being null");
|
||||||
|
}
|
||||||
|
}
|
|
@ -41,7 +41,7 @@ public abstract class TestErasureCoderBase extends TestCoderBase {
|
||||||
* at all for simple.
|
* at all for simple.
|
||||||
*/
|
*/
|
||||||
protected static class TestBlock extends ECBlock {
|
protected static class TestBlock extends ECBlock {
|
||||||
private ECChunk[] chunks;
|
protected ECChunk[] chunks;
|
||||||
|
|
||||||
// For simple, just assume the block have the chunks already ready.
|
// For simple, just assume the block have the chunks already ready.
|
||||||
// In practice we need to read/write chunks from/to the block via file IO.
|
// In practice we need to read/write chunks from/to the block via file IO.
|
||||||
|
@ -101,7 +101,7 @@ public abstract class TestErasureCoderBase extends TestCoderBase {
|
||||||
* This is typically how a coding step should be performed.
|
* This is typically how a coding step should be performed.
|
||||||
* @param codingStep
|
* @param codingStep
|
||||||
*/
|
*/
|
||||||
private void performCodingStep(ErasureCodingStep codingStep) {
|
protected void performCodingStep(ErasureCodingStep codingStep) {
|
||||||
// Pretend that we're opening these input blocks and output blocks.
|
// Pretend that we're opening these input blocks and output blocks.
|
||||||
ECBlock[] inputBlocks = codingStep.getInputBlocks();
|
ECBlock[] inputBlocks = codingStep.getInputBlocks();
|
||||||
ECBlock[] outputBlocks = codingStep.getOutputBlocks();
|
ECBlock[] outputBlocks = codingStep.getOutputBlocks();
|
||||||
|
|
|
@ -0,0 +1,61 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.erasurecode.coder;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECBlock;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Erasure coder test base with utilities for hitchhiker.
|
||||||
|
*/
|
||||||
|
public abstract class TestHHErasureCoderBase extends TestErasureCoderBase{
|
||||||
|
protected int subPacketSize = 2;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void performCodingStep(ErasureCodingStep codingStep) {
|
||||||
|
// Pretend that we're opening these input blocks and output blocks.
|
||||||
|
ECBlock[] inputBlocks = codingStep.getInputBlocks();
|
||||||
|
ECBlock[] outputBlocks = codingStep.getOutputBlocks();
|
||||||
|
// We allocate input and output chunks accordingly.
|
||||||
|
ECChunk[] inputChunks = new ECChunk[inputBlocks.length * subPacketSize];
|
||||||
|
ECChunk[] outputChunks = new ECChunk[outputBlocks.length * subPacketSize];
|
||||||
|
|
||||||
|
for (int i = 0; i < numChunksInBlock; i += subPacketSize) {
|
||||||
|
// Pretend that we're reading input chunks from input blocks.
|
||||||
|
for (int k = 0; k < subPacketSize; ++k) {
|
||||||
|
for (int j = 0; j < inputBlocks.length; ++j) {
|
||||||
|
inputChunks[k * inputBlocks.length + j] = ((TestBlock)
|
||||||
|
inputBlocks[j]).chunks[i + k];
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pretend that we allocate and will write output results to the blocks.
|
||||||
|
for (int j = 0; j < outputBlocks.length; ++j) {
|
||||||
|
outputChunks[k * outputBlocks.length + j] = allocateOutputChunk();
|
||||||
|
((TestBlock) outputBlocks[j]).chunks[i + k] =
|
||||||
|
outputChunks[k * outputBlocks.length + j];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Given the input chunks and output chunk buffers, just call it !
|
||||||
|
codingStep.performCoding(inputChunks, outputChunks);
|
||||||
|
}
|
||||||
|
|
||||||
|
codingStep.finish();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,120 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.erasurecode.coder;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawErasureCoderFactory;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestHHXORErasureCoder extends TestHHErasureCoderBase {
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
this.encoderClass = HHXORErasureEncoder.class;
|
||||||
|
this.decoderClass = HHXORErasureDecoder.class;
|
||||||
|
this.numChunksInBlock = 10;
|
||||||
|
this.subPacketSize = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCodingNoDirectBuffer_10x4_erasing_d0() {
|
||||||
|
prepare(null, 10, 4, new int[]{0}, new int[0]);
|
||||||
|
/**
|
||||||
|
* Doing twice to test if the coders can be repeatedly reused. This matters
|
||||||
|
* as the underlying coding buffers are shared, which may have bugs.
|
||||||
|
*/
|
||||||
|
testCoding(false);
|
||||||
|
testCoding(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCodingDirectBufferWithConf_10x4_erasing_d0() {
|
||||||
|
/**
|
||||||
|
* This tests if the configuration items work or not.
|
||||||
|
*/
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
|
||||||
|
RSRawErasureCoderFactory.class.getCanonicalName());
|
||||||
|
prepare(conf, 10, 4, new int[]{0}, new int[0]);
|
||||||
|
|
||||||
|
testCoding(true);
|
||||||
|
testCoding(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCodingDirectBuffer_10x4_erasing_p1() {
|
||||||
|
prepare(null, 10, 4, new int[]{}, new int[]{1});
|
||||||
|
testCoding(true);
|
||||||
|
testCoding(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCodingDirectBuffer_10x4_erasing_d4() {
|
||||||
|
prepare(null, 10, 4, new int[] {4}, new int[] {});
|
||||||
|
testCoding(true);
|
||||||
|
testCoding(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCodingDirectBuffer_10x4_erasing_d0_p0() {
|
||||||
|
prepare(null, 10, 4, new int[] {0}, new int[] {0});
|
||||||
|
testCoding(true);
|
||||||
|
testCoding(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCodingBothBuffers_10x4_erasing_d0_p0() {
|
||||||
|
prepare(null, 10, 4, new int[] {0}, new int[] {0});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Doing in mixed buffer usage model to test if the coders can be repeatedly
|
||||||
|
* reused with different buffer usage model. This matters as the underlying
|
||||||
|
* coding buffers are shared, which may have bugs.
|
||||||
|
*/
|
||||||
|
testCoding(true);
|
||||||
|
testCoding(false);
|
||||||
|
testCoding(true);
|
||||||
|
testCoding(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCodingDirectBuffer_10x4_erasure_of_d2_d4_p0() {
|
||||||
|
prepare(null, 10, 4, new int[] {2, 4}, new int[] {0});
|
||||||
|
testCoding(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCodingDirectBuffer_10x4_erasing_d0_d1_p0_p1() {
|
||||||
|
prepare(null, 10, 4, new int[] {0, 1}, new int[] {0, 1});
|
||||||
|
testCoding(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Test
|
||||||
|
// public void testCodingNoDirectBuffer_3x3_erasing_d0_p0() {
|
||||||
|
// prepare(null, 3, 3, new int[] {0}, new int[] {0});
|
||||||
|
// testCoding(false);
|
||||||
|
// }
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCodingDirectBuffer_6x3_erasing_d0_p0() {
|
||||||
|
prepare(null, 6, 3, new int[] {0}, new int[] {0});
|
||||||
|
testCoding(true);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue