HDFS-15759. EC: Verify EC reconstruction correctness on DataNode (#2585)
This commit is contained in:
parent
b4d97a8dc7
commit
95e6892675
|
@ -0,0 +1,187 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.erasurecode.rawcoder;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A utility class to validate decoding.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class DecodingValidator {
|
||||||
|
|
||||||
|
private final RawErasureDecoder decoder;
|
||||||
|
private ByteBuffer buffer;
|
||||||
|
private int[] newValidIndexes;
|
||||||
|
private int newErasedIndex;
|
||||||
|
|
||||||
|
public DecodingValidator(RawErasureDecoder decoder) {
|
||||||
|
this.decoder = decoder;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validate outputs decoded from inputs, by decoding an input back from
|
||||||
|
* the outputs and comparing it with the original one.
|
||||||
|
*
|
||||||
|
* For instance, in RS (6, 3), let (d0, d1, d2, d3, d4, d5) be sources
|
||||||
|
* and (p0, p1, p2) be parities, and assume
|
||||||
|
* inputs = [d0, null (d1), d2, d3, d4, d5, null (p0), p1, null (p2)];
|
||||||
|
* erasedIndexes = [1, 6];
|
||||||
|
* outputs = [d1, p0].
|
||||||
|
* Then
|
||||||
|
* 1. Create new inputs, erasedIndexes and outputs for validation so that
|
||||||
|
* the inputs could contain the decoded outputs, and decode them:
|
||||||
|
* newInputs = [d1, d2, d3, d4, d5, p0]
|
||||||
|
* newErasedIndexes = [0]
|
||||||
|
* newOutputs = [d0']
|
||||||
|
* 2. Compare d0 and d0'. The comparison will fail with high probability
|
||||||
|
* when the initial outputs are wrong.
|
||||||
|
*
|
||||||
|
* Note that the input buffers' positions must be the ones where data are
|
||||||
|
* read: If the input buffers have been processed by a decoder, the buffers'
|
||||||
|
* positions must be reset before being passed into this method.
|
||||||
|
*
|
||||||
|
* This method does not change outputs and erasedIndexes.
|
||||||
|
*
|
||||||
|
* @param inputs input buffers used for decoding. The buffers' position
|
||||||
|
* are moved to the end after this method.
|
||||||
|
* @param erasedIndexes indexes of erased units used for decoding
|
||||||
|
* @param outputs decoded output buffers, which are ready to be read after
|
||||||
|
* the call
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void validate(ByteBuffer[] inputs, int[] erasedIndexes,
|
||||||
|
ByteBuffer[] outputs) throws IOException {
|
||||||
|
markBuffers(outputs);
|
||||||
|
|
||||||
|
try {
|
||||||
|
ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
|
||||||
|
boolean isDirect = validInput.isDirect();
|
||||||
|
int capacity = validInput.capacity();
|
||||||
|
int remaining = validInput.remaining();
|
||||||
|
|
||||||
|
// Init buffer
|
||||||
|
if (buffer == null || buffer.isDirect() != isDirect
|
||||||
|
|| buffer.capacity() < remaining) {
|
||||||
|
buffer = allocateBuffer(isDirect, capacity);
|
||||||
|
}
|
||||||
|
buffer.clear().limit(remaining);
|
||||||
|
|
||||||
|
// Create newInputs and newErasedIndex for validation
|
||||||
|
ByteBuffer[] newInputs = new ByteBuffer[inputs.length];
|
||||||
|
int count = 0;
|
||||||
|
for (int i = 0; i < erasedIndexes.length; i++) {
|
||||||
|
newInputs[erasedIndexes[i]] = outputs[i];
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
newErasedIndex = -1;
|
||||||
|
boolean selected = false;
|
||||||
|
int numValidIndexes = CoderUtil.getValidIndexes(inputs).length;
|
||||||
|
for (int i = 0; i < newInputs.length; i++) {
|
||||||
|
if (count == numValidIndexes) {
|
||||||
|
break;
|
||||||
|
} else if (!selected && inputs[i] != null) {
|
||||||
|
newErasedIndex = i;
|
||||||
|
newInputs[i] = null;
|
||||||
|
selected = true;
|
||||||
|
} else if (newInputs[i] == null) {
|
||||||
|
newInputs[i] = inputs[i];
|
||||||
|
if (inputs[i] != null) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep it for testing
|
||||||
|
newValidIndexes = CoderUtil.getValidIndexes(newInputs);
|
||||||
|
|
||||||
|
decoder.decode(newInputs, new int[]{newErasedIndex},
|
||||||
|
new ByteBuffer[]{buffer});
|
||||||
|
|
||||||
|
if (!buffer.equals(inputs[newErasedIndex])) {
|
||||||
|
throw new InvalidDecodingException("Failed to validate decoding");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
toLimits(inputs);
|
||||||
|
resetBuffers(outputs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validate outputs decoded from inputs, by decoding an input back from
|
||||||
|
* those outputs and comparing it with the original one.
|
||||||
|
* @param inputs input buffers used for decoding
|
||||||
|
* @param erasedIndexes indexes of erased units used for decoding
|
||||||
|
* @param outputs decoded output buffers
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void validate(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs)
|
||||||
|
throws IOException {
|
||||||
|
ByteBuffer[] newInputs = CoderUtil.toBuffers(inputs);
|
||||||
|
ByteBuffer[] newOutputs = CoderUtil.toBuffers(outputs);
|
||||||
|
validate(newInputs, erasedIndexes, newOutputs);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ByteBuffer allocateBuffer(boolean direct, int capacity) {
|
||||||
|
if (direct) {
|
||||||
|
buffer = ByteBuffer.allocateDirect(capacity);
|
||||||
|
} else {
|
||||||
|
buffer = ByteBuffer.allocate(capacity);
|
||||||
|
}
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void markBuffers(ByteBuffer[] buffers) {
|
||||||
|
for (ByteBuffer buffer: buffers) {
|
||||||
|
if (buffer != null) {
|
||||||
|
buffer.mark();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void resetBuffers(ByteBuffer[] buffers) {
|
||||||
|
for (ByteBuffer buffer: buffers) {
|
||||||
|
if (buffer != null) {
|
||||||
|
buffer.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void toLimits(ByteBuffer[] buffers) {
|
||||||
|
for (ByteBuffer buffer: buffers) {
|
||||||
|
if (buffer != null) {
|
||||||
|
buffer.position(buffer.limit());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected int[] getNewValidIndexes() {
|
||||||
|
return newValidIndexes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected int getNewErasedIndex() {
|
||||||
|
return newErasedIndex;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.erasurecode.rawcoder;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown for invalid decoding.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class InvalidDecodingException
|
||||||
|
extends IOException {
|
||||||
|
private static final long serialVersionUID = 0L;
|
||||||
|
|
||||||
|
public InvalidDecodingException(String description) {
|
||||||
|
super(description);
|
||||||
|
}
|
||||||
|
}
|
|
@ -449,6 +449,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
|
||||||
| `BlocksDeletedInPendingIBR` | Number of blocks at deleted status in pending incremental block report (IBR) |
|
| `BlocksDeletedInPendingIBR` | Number of blocks at deleted status in pending incremental block report (IBR) |
|
||||||
| `EcReconstructionTasks` | Total number of erasure coding reconstruction tasks |
|
| `EcReconstructionTasks` | Total number of erasure coding reconstruction tasks |
|
||||||
| `EcFailedReconstructionTasks` | Total number of erasure coding failed reconstruction tasks |
|
| `EcFailedReconstructionTasks` | Total number of erasure coding failed reconstruction tasks |
|
||||||
|
| `EcInvalidReconstructionTasks` | Total number of erasure coding invalidated reconstruction tasks |
|
||||||
| `EcDecodingTimeNanos` | Total number of nanoseconds spent by decoding tasks |
|
| `EcDecodingTimeNanos` | Total number of nanoseconds spent by decoding tasks |
|
||||||
| `EcReconstructionBytesRead` | Total number of bytes read by erasure coding worker |
|
| `EcReconstructionBytesRead` | Total number of bytes read by erasure coding worker |
|
||||||
| `EcReconstructionBytesWritten` | Total number of bytes written by erasure coding worker |
|
| `EcReconstructionBytesWritten` | Total number of bytes written by erasure coding worker |
|
||||||
|
|
|
@ -519,4 +519,16 @@ public abstract class TestCoderBase {
|
||||||
buffer.position(buffer.position() + 1);
|
buffer.position(buffer.position() + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pollute some chunk.
|
||||||
|
* @param chunks
|
||||||
|
*/
|
||||||
|
protected void polluteSomeChunk(ECChunk[] chunks) {
|
||||||
|
int idx = new Random().nextInt(chunks.length);
|
||||||
|
ByteBuffer buffer = chunks[idx].getBuffer();
|
||||||
|
buffer.mark();
|
||||||
|
buffer.put((byte) ((buffer.get(buffer.position()) + 1)));
|
||||||
|
buffer.reset();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,237 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.io.erasurecode.rawcoder;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Assume;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test {@link DecodingValidator} under various decoders.
|
||||||
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class TestDecodingValidator extends TestRawCoderBase {
|
||||||
|
|
||||||
|
private DecodingValidator validator;
|
||||||
|
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Collection<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][] {
|
||||||
|
{RSRawErasureCoderFactory.class, 6, 3, new int[]{1}, new int[]{}},
|
||||||
|
{RSRawErasureCoderFactory.class, 6, 3, new int[]{3}, new int[]{0}},
|
||||||
|
{RSRawErasureCoderFactory.class, 6, 3, new int[]{2, 4}, new int[]{1}},
|
||||||
|
{NativeRSRawErasureCoderFactory.class, 6, 3, new int[]{0}, new int[]{}},
|
||||||
|
{XORRawErasureCoderFactory.class, 10, 1, new int[]{0}, new int[]{}},
|
||||||
|
{NativeXORRawErasureCoderFactory.class, 10, 1, new int[]{0},
|
||||||
|
new int[]{}}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestDecodingValidator(
|
||||||
|
Class<? extends RawErasureCoderFactory> factoryClass, int numDataUnits,
|
||||||
|
int numParityUnits, int[] erasedDataIndexes, int[] erasedParityIndexes) {
|
||||||
|
this.encoderFactoryClass = factoryClass;
|
||||||
|
this.decoderFactoryClass = factoryClass;
|
||||||
|
this.numDataUnits = numDataUnits;
|
||||||
|
this.numParityUnits = numParityUnits;
|
||||||
|
this.erasedDataIndexes = erasedDataIndexes;
|
||||||
|
this.erasedParityIndexes = erasedParityIndexes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
if (encoderFactoryClass == NativeRSRawErasureCoderFactory.class
|
||||||
|
|| encoderFactoryClass == NativeXORRawErasureCoderFactory.class) {
|
||||||
|
Assume.assumeTrue(ErasureCodeNative.isNativeCodeLoaded());
|
||||||
|
}
|
||||||
|
setAllowDump(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if the same validator can process direct and non-direct buffers.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testValidate() {
|
||||||
|
prepare(null, numDataUnits, numParityUnits, erasedDataIndexes,
|
||||||
|
erasedParityIndexes);
|
||||||
|
testValidate(true);
|
||||||
|
testValidate(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if the same validator can process variable width of data for
|
||||||
|
* inputs and outputs.
|
||||||
|
*/
|
||||||
|
protected void testValidate(boolean usingDirectBuffer) {
|
||||||
|
this.usingDirectBuffer = usingDirectBuffer;
|
||||||
|
prepareCoders(false);
|
||||||
|
prepareValidator(false);
|
||||||
|
|
||||||
|
performTestValidate(baseChunkSize);
|
||||||
|
performTestValidate(baseChunkSize - 17);
|
||||||
|
performTestValidate(baseChunkSize + 18);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void prepareValidator(boolean recreate) {
|
||||||
|
if (validator == null || recreate) {
|
||||||
|
validator = new DecodingValidator(decoder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void performTestValidate(int chunkSize) {
|
||||||
|
setChunkSize(chunkSize);
|
||||||
|
prepareBufferAllocator(false);
|
||||||
|
|
||||||
|
// encode
|
||||||
|
ECChunk[] dataChunks = prepareDataChunksForEncoding();
|
||||||
|
ECChunk[] parityChunks = prepareParityChunksForEncoding();
|
||||||
|
ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
|
||||||
|
try {
|
||||||
|
encoder.encode(dataChunks, parityChunks);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail("Should not get Exception: " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// decode
|
||||||
|
backupAndEraseChunks(clonedDataChunks, parityChunks);
|
||||||
|
ECChunk[] inputChunks =
|
||||||
|
prepareInputChunksForDecoding(clonedDataChunks, parityChunks);
|
||||||
|
markChunks(inputChunks);
|
||||||
|
ensureOnlyLeastRequiredChunks(inputChunks);
|
||||||
|
ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
|
||||||
|
int[] erasedIndexes = getErasedIndexesForDecoding();
|
||||||
|
try {
|
||||||
|
decoder.decode(inputChunks, erasedIndexes, recoveredChunks);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail("Should not get Exception: " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// validate
|
||||||
|
restoreChunksFromMark(inputChunks);
|
||||||
|
ECChunk[] clonedInputChunks = cloneChunksWithData(inputChunks);
|
||||||
|
ECChunk[] clonedRecoveredChunks = cloneChunksWithData(recoveredChunks);
|
||||||
|
int[] clonedErasedIndexes = erasedIndexes.clone();
|
||||||
|
|
||||||
|
try {
|
||||||
|
validator.validate(clonedInputChunks, clonedErasedIndexes,
|
||||||
|
clonedRecoveredChunks);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail("Should not get Exception: " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if input buffers' positions are moved to the end
|
||||||
|
verifyBufferPositionAtEnd(clonedInputChunks);
|
||||||
|
|
||||||
|
// Check if validator does not change recovered chunks and erased indexes
|
||||||
|
verifyChunksEqual(recoveredChunks, clonedRecoveredChunks);
|
||||||
|
Assert.assertArrayEquals("Erased indexes should not be changed",
|
||||||
|
erasedIndexes, clonedErasedIndexes);
|
||||||
|
|
||||||
|
// Check if validator uses correct indexes for validation
|
||||||
|
List<Integer> validIndexesList =
|
||||||
|
IntStream.of(CoderUtil.getValidIndexes(inputChunks)).boxed()
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
List<Integer> newValidIndexesList =
|
||||||
|
IntStream.of(validator.getNewValidIndexes()).boxed()
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
List<Integer> erasedIndexesList =
|
||||||
|
IntStream.of(erasedIndexes).boxed().collect(Collectors.toList());
|
||||||
|
int newErasedIndex = validator.getNewErasedIndex();
|
||||||
|
Assert.assertTrue(
|
||||||
|
"Valid indexes for validation should contain"
|
||||||
|
+ " erased indexes for decoding",
|
||||||
|
newValidIndexesList.containsAll(erasedIndexesList));
|
||||||
|
Assert.assertTrue(
|
||||||
|
"An erased index for validation should be contained"
|
||||||
|
+ " in valid indexes for decoding",
|
||||||
|
validIndexesList.contains(newErasedIndex));
|
||||||
|
Assert.assertFalse(
|
||||||
|
"An erased index for validation should not be contained"
|
||||||
|
+ " in valid indexes for validation",
|
||||||
|
newValidIndexesList.contains(newErasedIndex));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyChunksEqual(ECChunk[] chunks1, ECChunk[] chunks2) {
|
||||||
|
boolean result = Arrays.deepEquals(toArrays(chunks1), toArrays(chunks2));
|
||||||
|
assertTrue("Recovered chunks should not be changed", result);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if validator throws {@link InvalidDecodingException} when
|
||||||
|
* a decoded output buffer is polluted.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testValidateWithBadDecoding() throws IOException {
|
||||||
|
prepare(null, numDataUnits, numParityUnits, erasedDataIndexes,
|
||||||
|
erasedParityIndexes);
|
||||||
|
this.usingDirectBuffer = true;
|
||||||
|
prepareCoders(true);
|
||||||
|
prepareValidator(true);
|
||||||
|
prepareBufferAllocator(false);
|
||||||
|
|
||||||
|
// encode
|
||||||
|
ECChunk[] dataChunks = prepareDataChunksForEncoding();
|
||||||
|
ECChunk[] parityChunks = prepareParityChunksForEncoding();
|
||||||
|
ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
|
||||||
|
try {
|
||||||
|
encoder.encode(dataChunks, parityChunks);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail("Should not get Exception: " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// decode
|
||||||
|
backupAndEraseChunks(clonedDataChunks, parityChunks);
|
||||||
|
ECChunk[] inputChunks =
|
||||||
|
prepareInputChunksForDecoding(clonedDataChunks, parityChunks);
|
||||||
|
markChunks(inputChunks);
|
||||||
|
ensureOnlyLeastRequiredChunks(inputChunks);
|
||||||
|
ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
|
||||||
|
int[] erasedIndexes = getErasedIndexesForDecoding();
|
||||||
|
try {
|
||||||
|
decoder.decode(inputChunks, erasedIndexes, recoveredChunks);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail("Should not get Exception: " + e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// validate
|
||||||
|
restoreChunksFromMark(inputChunks);
|
||||||
|
polluteSomeChunk(recoveredChunks);
|
||||||
|
try {
|
||||||
|
validator.validate(inputChunks, erasedIndexes, recoveredChunks);
|
||||||
|
Assert.fail("Validation should fail due to bad decoding");
|
||||||
|
} catch (InvalidDecodingException e) {
|
||||||
|
String expected = "Failed to validate decoding";
|
||||||
|
GenericTestUtils.assertExceptionContains(expected, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -334,7 +334,7 @@ public abstract class TestRawCoderBase extends TestCoderBase {
|
||||||
verifyBufferPositionAtEnd(inputChunks);
|
verifyBufferPositionAtEnd(inputChunks);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyBufferPositionAtEnd(ECChunk[] inputChunks) {
|
void verifyBufferPositionAtEnd(ECChunk[] inputChunks) {
|
||||||
for (ECChunk chunk : inputChunks) {
|
for (ECChunk chunk : inputChunks) {
|
||||||
if (chunk != null) {
|
if (chunk != null) {
|
||||||
Assert.assertEquals(0, chunk.getBuffer().remaining());
|
Assert.assertEquals(0, chunk.getBuffer().remaining());
|
||||||
|
|
|
@ -867,6 +867,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
"dfs.datanode.ec.reconstruction.xmits.weight";
|
"dfs.datanode.ec.reconstruction.xmits.weight";
|
||||||
public static final float DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_DEFAULT =
|
public static final float DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_DEFAULT =
|
||||||
0.5f;
|
0.5f;
|
||||||
|
public static final String DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY =
|
||||||
|
"dfs.datanode.ec.reconstruction.validation";
|
||||||
|
public static final boolean DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE = false;
|
||||||
|
|
||||||
public static final String
|
public static final String
|
||||||
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
|
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used for injecting faults in DFSClient and DFSOutputStream tests.
|
* Used for injecting faults in DFSClient and DFSOutputStream tests.
|
||||||
|
@ -142,4 +143,10 @@ public class DataNodeFaultInjector {
|
||||||
* Just delay a while.
|
* Just delay a while.
|
||||||
*/
|
*/
|
||||||
public void delay() {}
|
public void delay() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used as a hook to inject data pollution
|
||||||
|
* into an erasure coding reconstruction.
|
||||||
|
*/
|
||||||
|
public void badDecoding(ByteBuffer[] outputs) {}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,7 @@ public abstract class StripedBlockChecksumReconstructor
|
||||||
|
|
||||||
private void init() throws IOException {
|
private void init() throws IOException {
|
||||||
initDecoderIfNecessary();
|
initDecoderIfNecessary();
|
||||||
|
initDecodingValidatorIfNecessary();
|
||||||
getStripedReader().init();
|
getStripedReader().init();
|
||||||
// allocate buffer to keep the reconstructed block data
|
// allocate buffer to keep the reconstructed block data
|
||||||
targetBuffer = allocateBuffer(getBufferSize());
|
targetBuffer = allocateBuffer(getBufferSize());
|
||||||
|
@ -192,7 +193,16 @@ public abstract class StripedBlockChecksumReconstructor
|
||||||
for (int i = 0; i < targetIndices.length; i++) {
|
for (int i = 0; i < targetIndices.length; i++) {
|
||||||
tarIndices[i] = targetIndices[i];
|
tarIndices[i] = targetIndices[i];
|
||||||
}
|
}
|
||||||
getDecoder().decode(inputs, tarIndices, outputs);
|
|
||||||
|
if (isValidationEnabled()) {
|
||||||
|
markBuffers(inputs);
|
||||||
|
getDecoder().decode(inputs, tarIndices, outputs);
|
||||||
|
resetBuffers(inputs);
|
||||||
|
|
||||||
|
getValidator().validate(inputs, tarIndices, outputs);
|
||||||
|
} else {
|
||||||
|
getDecoder().decode(inputs, tarIndices, outputs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.InvalidDecodingException;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -53,6 +54,8 @@ class StripedBlockReconstructor extends StripedReconstructor
|
||||||
try {
|
try {
|
||||||
initDecoderIfNecessary();
|
initDecoderIfNecessary();
|
||||||
|
|
||||||
|
initDecodingValidatorIfNecessary();
|
||||||
|
|
||||||
getStripedReader().init();
|
getStripedReader().init();
|
||||||
|
|
||||||
stripedWriter.init();
|
stripedWriter.init();
|
||||||
|
@ -126,12 +129,31 @@ class StripedBlockReconstructor extends StripedReconstructor
|
||||||
int[] erasedIndices = stripedWriter.getRealTargetIndices();
|
int[] erasedIndices = stripedWriter.getRealTargetIndices();
|
||||||
ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
|
ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
|
||||||
|
|
||||||
|
if (isValidationEnabled()) {
|
||||||
|
markBuffers(inputs);
|
||||||
|
decode(inputs, erasedIndices, outputs);
|
||||||
|
resetBuffers(inputs);
|
||||||
|
|
||||||
|
DataNodeFaultInjector.get().badDecoding(outputs);
|
||||||
|
try {
|
||||||
|
getValidator().validate(inputs, erasedIndices, outputs);
|
||||||
|
} catch (InvalidDecodingException e) {
|
||||||
|
getDatanode().getMetrics().incrECInvalidReconstructionTasks();
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
decode(inputs, erasedIndices, outputs);
|
||||||
|
}
|
||||||
|
|
||||||
|
stripedWriter.updateRealTargetBuffers(toReconstructLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void decode(ByteBuffer[] inputs, int[] erasedIndices,
|
||||||
|
ByteBuffer[] outputs) throws IOException {
|
||||||
long start = System.nanoTime();
|
long start = System.nanoTime();
|
||||||
getDecoder().decode(inputs, erasedIndices, outputs);
|
getDecoder().decode(inputs, erasedIndices, outputs);
|
||||||
long end = System.nanoTime();
|
long end = System.nanoTime();
|
||||||
this.getDatanode().getMetrics().incrECDecodingTime(end - start);
|
this.getDatanode().getMetrics().incrECDecodingTime(end - start);
|
||||||
|
|
||||||
stripedWriter.updateRealTargetBuffers(toReconstructLen);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
|
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.DecodingValidator;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -103,10 +105,14 @@ abstract class StripedReconstructor {
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final DataNode datanode;
|
private final DataNode datanode;
|
||||||
private final ErasureCodingPolicy ecPolicy;
|
private final ErasureCodingPolicy ecPolicy;
|
||||||
|
private final ErasureCoderOptions coderOptions;
|
||||||
private RawErasureDecoder decoder;
|
private RawErasureDecoder decoder;
|
||||||
private final ExtendedBlock blockGroup;
|
private final ExtendedBlock blockGroup;
|
||||||
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
|
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
|
||||||
|
|
||||||
|
private final boolean isValidationEnabled;
|
||||||
|
private DecodingValidator validator;
|
||||||
|
|
||||||
// position in striped internal block
|
// position in striped internal block
|
||||||
private long positionInBlock;
|
private long positionInBlock;
|
||||||
private StripedReader stripedReader;
|
private StripedReader stripedReader;
|
||||||
|
@ -136,6 +142,13 @@ abstract class StripedReconstructor {
|
||||||
cachingStrategy = CachingStrategy.newDefaultStrategy();
|
cachingStrategy = CachingStrategy.newDefaultStrategy();
|
||||||
|
|
||||||
positionInBlock = 0L;
|
positionInBlock = 0L;
|
||||||
|
|
||||||
|
coderOptions = new ErasureCoderOptions(
|
||||||
|
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
|
||||||
|
isValidationEnabled = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY,
|
||||||
|
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE)
|
||||||
|
&& !coderOptions.allowChangeInputs();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void incrBytesRead(boolean local, long delta) {
|
public void incrBytesRead(boolean local, long delta) {
|
||||||
|
@ -196,13 +209,18 @@ abstract class StripedReconstructor {
|
||||||
// Initialize decoder
|
// Initialize decoder
|
||||||
protected void initDecoderIfNecessary() {
|
protected void initDecoderIfNecessary() {
|
||||||
if (decoder == null) {
|
if (decoder == null) {
|
||||||
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
|
|
||||||
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
|
|
||||||
decoder = CodecUtil.createRawDecoder(conf, ecPolicy.getCodecName(),
|
decoder = CodecUtil.createRawDecoder(conf, ecPolicy.getCodecName(),
|
||||||
coderOptions);
|
coderOptions);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize decoding validator
|
||||||
|
protected void initDecodingValidatorIfNecessary() {
|
||||||
|
if (isValidationEnabled && validator == null) {
|
||||||
|
validator = new DecodingValidator(decoder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
long getPositionInBlock() {
|
long getPositionInBlock() {
|
||||||
return positionInBlock;
|
return positionInBlock;
|
||||||
}
|
}
|
||||||
|
@ -285,4 +303,28 @@ abstract class StripedReconstructor {
|
||||||
static ByteBufferPool getBufferPool() {
|
static ByteBufferPool getBufferPool() {
|
||||||
return BUFFER_POOL;
|
return BUFFER_POOL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean isValidationEnabled() {
|
||||||
|
return isValidationEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
DecodingValidator getValidator() {
|
||||||
|
return validator;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void markBuffers(ByteBuffer[] buffers) {
|
||||||
|
for (ByteBuffer buffer: buffers) {
|
||||||
|
if (buffer != null) {
|
||||||
|
buffer.mark();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void resetBuffers(ByteBuffer[] buffers) {
|
||||||
|
for (ByteBuffer buffer: buffers) {
|
||||||
|
if (buffer != null) {
|
||||||
|
buffer.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,6 +152,8 @@ public class DataNodeMetrics {
|
||||||
MutableCounterLong ecReconstructionTasks;
|
MutableCounterLong ecReconstructionTasks;
|
||||||
@Metric("Count of erasure coding failed reconstruction tasks")
|
@Metric("Count of erasure coding failed reconstruction tasks")
|
||||||
MutableCounterLong ecFailedReconstructionTasks;
|
MutableCounterLong ecFailedReconstructionTasks;
|
||||||
|
@Metric("Count of erasure coding invalidated reconstruction tasks")
|
||||||
|
private MutableCounterLong ecInvalidReconstructionTasks;
|
||||||
@Metric("Nanoseconds spent by decoding tasks")
|
@Metric("Nanoseconds spent by decoding tasks")
|
||||||
MutableCounterLong ecDecodingTimeNanos;
|
MutableCounterLong ecDecodingTimeNanos;
|
||||||
@Metric("Bytes read by erasure coding worker")
|
@Metric("Bytes read by erasure coding worker")
|
||||||
|
@ -543,6 +545,14 @@ public class DataNodeMetrics {
|
||||||
ecFailedReconstructionTasks.incr();
|
ecFailedReconstructionTasks.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incrECInvalidReconstructionTasks() {
|
||||||
|
ecInvalidReconstructionTasks.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getECInvalidReconstructionTasks() {
|
||||||
|
return ecInvalidReconstructionTasks.value();
|
||||||
|
}
|
||||||
|
|
||||||
public void incrDataNodeActiveXceiversCount() {
|
public void incrDataNodeActiveXceiversCount() {
|
||||||
dataNodeActiveXceiversCount.incr();
|
dataNodeActiveXceiversCount.incr();
|
||||||
}
|
}
|
||||||
|
|
|
@ -3693,6 +3693,16 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.ec.reconstruction.validation</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>
|
||||||
|
Decide if datanode validates that EC reconstruction tasks reconstruct
|
||||||
|
target blocks correctly. When validation fails, reconstruction tasks
|
||||||
|
will fail and be retried by namenode.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.quota.init-threads</name>
|
<name>dfs.namenode.quota.init-threads</name>
|
||||||
<value>4</value>
|
<value>4</value>
|
||||||
|
|
|
@ -107,6 +107,23 @@ public class TestReconstructStripedFile {
|
||||||
return StripedFileTestUtil.getDefaultECPolicy();
|
return StripedFileTestUtil.getDefaultECPolicy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isValidationEnabled() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPendingTimeout() {
|
||||||
|
return DFSConfigKeys
|
||||||
|
.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getBlockSize() {
|
||||||
|
return blockSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MiniDFSCluster getCluster() {
|
||||||
|
return cluster;
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
ecPolicy = getEcPolicy();
|
ecPolicy = getEcPolicy();
|
||||||
|
@ -128,6 +145,11 @@ public class TestReconstructStripedFile {
|
||||||
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
|
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
|
||||||
NativeRSRawErasureCoderFactory.CODER_NAME);
|
NativeRSRawErasureCoderFactory.CODER_NAME);
|
||||||
}
|
}
|
||||||
|
conf.setInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
|
||||||
|
getPendingTimeout());
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY,
|
||||||
|
isValidationEnabled());
|
||||||
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
|
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||||
cluster = new MiniDFSCluster.Builder(conf, basedir).numDataNodes(dnNum)
|
cluster = new MiniDFSCluster.Builder(conf, basedir).numDataNodes(dnNum)
|
||||||
.build();
|
.build();
|
||||||
|
@ -303,7 +325,7 @@ public class TestReconstructStripedFile {
|
||||||
* and verify the block replica length, generationStamp and content.
|
* and verify the block replica length, generationStamp and content.
|
||||||
* 2. Read the file and verify content.
|
* 2. Read the file and verify content.
|
||||||
*/
|
*/
|
||||||
private void assertFileBlocksReconstruction(String fileName, int fileLen,
|
void assertFileBlocksReconstruction(String fileName, int fileLen,
|
||||||
ReconstructionType type, int toRecoverBlockNum) throws Exception {
|
ReconstructionType type, int toRecoverBlockNum) throws Exception {
|
||||||
if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
|
if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
|
||||||
Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
|
Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
|
||||||
|
|
|
@ -0,0 +1,115 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test extends {@link TestReconstructStripedFile} to test
|
||||||
|
* ec reconstruction validation.
|
||||||
|
*/
|
||||||
|
public class TestReconstructStripedFileWithValidator
|
||||||
|
extends TestReconstructStripedFile {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestReconstructStripedFileWithValidator.class);
|
||||||
|
|
||||||
|
public TestReconstructStripedFileWithValidator() {
|
||||||
|
LOG.info("run {} with validator.",
|
||||||
|
TestReconstructStripedFileWithValidator.class.getSuperclass()
|
||||||
|
.getSimpleName());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test injects data pollution into decoded outputs once.
|
||||||
|
* When validation enabled, the first reconstruction task should fail
|
||||||
|
* in the validation, but the data will be recovered correctly
|
||||||
|
* by the next task.
|
||||||
|
* On the other hand, when validation disabled, the first reconstruction task
|
||||||
|
* will succeed and then lead to data corruption.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testValidatorWithBadDecoding()
|
||||||
|
throws Exception {
|
||||||
|
MiniDFSCluster cluster = getCluster();
|
||||||
|
|
||||||
|
cluster.getDataNodes().stream()
|
||||||
|
.map(DataNode::getMetrics)
|
||||||
|
.map(DataNodeMetrics::getECInvalidReconstructionTasks)
|
||||||
|
.forEach(n -> Assert.assertEquals(0, (long) n));
|
||||||
|
|
||||||
|
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
|
||||||
|
DataNodeFaultInjector badDecodingInjector = new DataNodeFaultInjector() {
|
||||||
|
private final AtomicBoolean flag = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void badDecoding(ByteBuffer[] outputs) {
|
||||||
|
if (!flag.get()) {
|
||||||
|
for (ByteBuffer output : outputs) {
|
||||||
|
output.mark();
|
||||||
|
output.put((byte) (output.get(output.position()) + 1));
|
||||||
|
output.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
flag.set(true);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
DataNodeFaultInjector.set(badDecodingInjector);
|
||||||
|
|
||||||
|
int fileLen =
|
||||||
|
(getEcPolicy().getNumDataUnits() + getEcPolicy().getNumParityUnits())
|
||||||
|
* getBlockSize() + getBlockSize() / 10;
|
||||||
|
try {
|
||||||
|
assertFileBlocksReconstruction(
|
||||||
|
"/testValidatorWithBadDecoding",
|
||||||
|
fileLen,
|
||||||
|
ReconstructionType.DataOnly,
|
||||||
|
getEcPolicy().getNumParityUnits());
|
||||||
|
|
||||||
|
long sum = cluster.getDataNodes().stream()
|
||||||
|
.map(DataNode::getMetrics)
|
||||||
|
.mapToLong(DataNodeMetrics::getECInvalidReconstructionTasks)
|
||||||
|
.sum();
|
||||||
|
Assert.assertEquals(1, sum);
|
||||||
|
} finally {
|
||||||
|
DataNodeFaultInjector.set(oldInjector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isValidationEnabled() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a small value for the failed reconstruction task to be
|
||||||
|
* rescheduled in a short period of time.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int getPendingTimeout() {
|
||||||
|
return 10;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue