HDFS-15759. EC: Verify EC reconstruction correctness on DataNode (#2585) (#2868)

(cherry picked from commit 95e6892675)
(cherry picked from commit 6cfa01c905ca422c3c6b2d1d735dd16ee08b4fb4)

 Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java

Co-authored-by: touchida <56789230+touchida@users.noreply.github.com>
This commit is contained in:
Wei-Chiu Chuang 2021-04-12 19:45:49 -07:00 committed by GitHub
parent 899cef53bd
commit 0faa626bd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 720 additions and 7 deletions

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.erasurecode.ECChunk;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* A utility class to validate decoding.
*/
@InterfaceAudience.Private
public class DecodingValidator {
private final RawErasureDecoder decoder;
private ByteBuffer buffer;
private int[] newValidIndexes;
private int newErasedIndex;
public DecodingValidator(RawErasureDecoder decoder) {
this.decoder = decoder;
}
/**
* Validate outputs decoded from inputs, by decoding an input back from
* the outputs and comparing it with the original one.
*
* For instance, in RS (6, 3), let (d0, d1, d2, d3, d4, d5) be sources
* and (p0, p1, p2) be parities, and assume
* inputs = [d0, null (d1), d2, d3, d4, d5, null (p0), p1, null (p2)];
* erasedIndexes = [1, 6];
* outputs = [d1, p0].
* Then
* 1. Create new inputs, erasedIndexes and outputs for validation so that
* the inputs could contain the decoded outputs, and decode them:
* newInputs = [d1, d2, d3, d4, d5, p0]
* newErasedIndexes = [0]
* newOutputs = [d0']
* 2. Compare d0 and d0'. The comparison will fail with high probability
* when the initial outputs are wrong.
*
* Note that the input buffers' positions must be the ones where data are
* read: If the input buffers have been processed by a decoder, the buffers'
* positions must be reset before being passed into this method.
*
* This method does not change outputs and erasedIndexes.
*
* @param inputs input buffers used for decoding. The buffers' position
* are moved to the end after this method.
* @param erasedIndexes indexes of erased units used for decoding
* @param outputs decoded output buffers, which are ready to be read after
* the call
* @throws IOException
*/
public void validate(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) throws IOException {
markBuffers(outputs);
try {
ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
boolean isDirect = validInput.isDirect();
int capacity = validInput.capacity();
int remaining = validInput.remaining();
// Init buffer
if (buffer == null || buffer.isDirect() != isDirect
|| buffer.capacity() < remaining) {
buffer = allocateBuffer(isDirect, capacity);
}
buffer.clear().limit(remaining);
// Create newInputs and newErasedIndex for validation
ByteBuffer[] newInputs = new ByteBuffer[inputs.length];
int count = 0;
for (int i = 0; i < erasedIndexes.length; i++) {
newInputs[erasedIndexes[i]] = outputs[i];
count++;
}
newErasedIndex = -1;
boolean selected = false;
int numValidIndexes = CoderUtil.getValidIndexes(inputs).length;
for (int i = 0; i < newInputs.length; i++) {
if (count == numValidIndexes) {
break;
} else if (!selected && inputs[i] != null) {
newErasedIndex = i;
newInputs[i] = null;
selected = true;
} else if (newInputs[i] == null) {
newInputs[i] = inputs[i];
if (inputs[i] != null) {
count++;
}
}
}
// Keep it for testing
newValidIndexes = CoderUtil.getValidIndexes(newInputs);
decoder.decode(newInputs, new int[]{newErasedIndex},
new ByteBuffer[]{buffer});
if (!buffer.equals(inputs[newErasedIndex])) {
throw new InvalidDecodingException("Failed to validate decoding");
}
} finally {
toLimits(inputs);
resetBuffers(outputs);
}
}
/**
* Validate outputs decoded from inputs, by decoding an input back from
* those outputs and comparing it with the original one.
* @param inputs input buffers used for decoding
* @param erasedIndexes indexes of erased units used for decoding
* @param outputs decoded output buffers
* @throws IOException
*/
public void validate(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs)
throws IOException {
ByteBuffer[] newInputs = CoderUtil.toBuffers(inputs);
ByteBuffer[] newOutputs = CoderUtil.toBuffers(outputs);
validate(newInputs, erasedIndexes, newOutputs);
}
private ByteBuffer allocateBuffer(boolean direct, int capacity) {
if (direct) {
buffer = ByteBuffer.allocateDirect(capacity);
} else {
buffer = ByteBuffer.allocate(capacity);
}
return buffer;
}
private static void markBuffers(ByteBuffer[] buffers) {
for (ByteBuffer buffer: buffers) {
if (buffer != null) {
buffer.mark();
}
}
}
private static void resetBuffers(ByteBuffer[] buffers) {
for (ByteBuffer buffer: buffers) {
if (buffer != null) {
buffer.reset();
}
}
}
private static void toLimits(ByteBuffer[] buffers) {
for (ByteBuffer buffer: buffers) {
if (buffer != null) {
buffer.position(buffer.limit());
}
}
}
@VisibleForTesting
protected int[] getNewValidIndexes() {
return newValidIndexes;
}
@VisibleForTesting
protected int getNewErasedIndex() {
return newErasedIndex;
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.IOException;
/**
* Thrown for invalid decoding.
*/
@InterfaceAudience.Private
public class InvalidDecodingException
extends IOException {
private static final long serialVersionUID = 0L;
public InvalidDecodingException(String description) {
super(description);
}
}

View File

@ -428,6 +428,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `BlocksDeletedInPendingIBR` | Number of blocks at deleted status in pending incremental block report (IBR) | | `BlocksDeletedInPendingIBR` | Number of blocks at deleted status in pending incremental block report (IBR) |
| `EcReconstructionTasks` | Total number of erasure coding reconstruction tasks | | `EcReconstructionTasks` | Total number of erasure coding reconstruction tasks |
| `EcFailedReconstructionTasks` | Total number of erasure coding failed reconstruction tasks | | `EcFailedReconstructionTasks` | Total number of erasure coding failed reconstruction tasks |
| `EcInvalidReconstructionTasks` | Total number of erasure coding invalidated reconstruction tasks |
| `EcDecodingTimeNanos` | Total number of nanoseconds spent by decoding tasks | | `EcDecodingTimeNanos` | Total number of nanoseconds spent by decoding tasks |
| `EcReconstructionBytesRead` | Total number of bytes read by erasure coding worker | | `EcReconstructionBytesRead` | Total number of bytes read by erasure coding worker |
| `EcReconstructionBytesWritten` | Total number of bytes written by erasure coding worker | | `EcReconstructionBytesWritten` | Total number of bytes written by erasure coding worker |

View File

@ -527,4 +527,16 @@ public abstract class TestCoderBase {
buffer.position(buffer.position() + 1); buffer.position(buffer.position() + 1);
} }
} }
/**
* Pollute some chunk.
* @param chunks
*/
protected void polluteSomeChunk(ECChunk[] chunks) {
int idx = new Random().nextInt(chunks.length);
ByteBuffer buffer = chunks[idx].getBuffer();
buffer.mark();
buffer.put((byte) ((buffer.get(buffer.position()) + 1)));
buffer.reset();
}
} }

View File

@ -0,0 +1,237 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.erasurecode.rawcoder;
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.Assert.assertTrue;
/**
* Test {@link DecodingValidator} under various decoders.
*/
@RunWith(Parameterized.class)
public class TestDecodingValidator extends TestRawCoderBase {
private DecodingValidator validator;
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{RSRawErasureCoderFactory.class, 6, 3, new int[]{1}, new int[]{}},
{RSRawErasureCoderFactory.class, 6, 3, new int[]{3}, new int[]{0}},
{RSRawErasureCoderFactory.class, 6, 3, new int[]{2, 4}, new int[]{1}},
{NativeRSRawErasureCoderFactory.class, 6, 3, new int[]{0}, new int[]{}},
{XORRawErasureCoderFactory.class, 10, 1, new int[]{0}, new int[]{}},
{NativeXORRawErasureCoderFactory.class, 10, 1, new int[]{0},
new int[]{}}
});
}
public TestDecodingValidator(
Class<? extends RawErasureCoderFactory> factoryClass, int numDataUnits,
int numParityUnits, int[] erasedDataIndexes, int[] erasedParityIndexes) {
this.encoderFactoryClass = factoryClass;
this.decoderFactoryClass = factoryClass;
this.numDataUnits = numDataUnits;
this.numParityUnits = numParityUnits;
this.erasedDataIndexes = erasedDataIndexes;
this.erasedParityIndexes = erasedParityIndexes;
}
@Before
public void setup() {
if (encoderFactoryClass == NativeRSRawErasureCoderFactory.class
|| encoderFactoryClass == NativeXORRawErasureCoderFactory.class) {
Assume.assumeTrue(ErasureCodeNative.isNativeCodeLoaded());
}
setAllowDump(false);
}
/**
* Test if the same validator can process direct and non-direct buffers.
*/
@Test
public void testValidate() {
prepare(null, numDataUnits, numParityUnits, erasedDataIndexes,
erasedParityIndexes);
testValidate(true);
testValidate(false);
}
/**
* Test if the same validator can process variable width of data for
* inputs and outputs.
*/
protected void testValidate(boolean usingDirectBuffer) {
this.usingDirectBuffer = usingDirectBuffer;
prepareCoders(false);
prepareValidator(false);
performTestValidate(baseChunkSize);
performTestValidate(baseChunkSize - 17);
performTestValidate(baseChunkSize + 18);
}
protected void prepareValidator(boolean recreate) {
if (validator == null || recreate) {
validator = new DecodingValidator(decoder);
}
}
protected void performTestValidate(int chunkSize) {
setChunkSize(chunkSize);
prepareBufferAllocator(false);
// encode
ECChunk[] dataChunks = prepareDataChunksForEncoding();
ECChunk[] parityChunks = prepareParityChunksForEncoding();
ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
try {
encoder.encode(dataChunks, parityChunks);
} catch (Exception e) {
Assert.fail("Should not get Exception: " + e.getMessage());
}
// decode
backupAndEraseChunks(clonedDataChunks, parityChunks);
ECChunk[] inputChunks =
prepareInputChunksForDecoding(clonedDataChunks, parityChunks);
markChunks(inputChunks);
ensureOnlyLeastRequiredChunks(inputChunks);
ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
int[] erasedIndexes = getErasedIndexesForDecoding();
try {
decoder.decode(inputChunks, erasedIndexes, recoveredChunks);
} catch (Exception e) {
Assert.fail("Should not get Exception: " + e.getMessage());
}
// validate
restoreChunksFromMark(inputChunks);
ECChunk[] clonedInputChunks = cloneChunksWithData(inputChunks);
ECChunk[] clonedRecoveredChunks = cloneChunksWithData(recoveredChunks);
int[] clonedErasedIndexes = erasedIndexes.clone();
try {
validator.validate(clonedInputChunks, clonedErasedIndexes,
clonedRecoveredChunks);
} catch (Exception e) {
Assert.fail("Should not get Exception: " + e.getMessage());
}
// Check if input buffers' positions are moved to the end
verifyBufferPositionAtEnd(clonedInputChunks);
// Check if validator does not change recovered chunks and erased indexes
verifyChunksEqual(recoveredChunks, clonedRecoveredChunks);
Assert.assertArrayEquals("Erased indexes should not be changed",
erasedIndexes, clonedErasedIndexes);
// Check if validator uses correct indexes for validation
List<Integer> validIndexesList =
IntStream.of(CoderUtil.getValidIndexes(inputChunks)).boxed()
.collect(Collectors.toList());
List<Integer> newValidIndexesList =
IntStream.of(validator.getNewValidIndexes()).boxed()
.collect(Collectors.toList());
List<Integer> erasedIndexesList =
IntStream.of(erasedIndexes).boxed().collect(Collectors.toList());
int newErasedIndex = validator.getNewErasedIndex();
Assert.assertTrue(
"Valid indexes for validation should contain"
+ " erased indexes for decoding",
newValidIndexesList.containsAll(erasedIndexesList));
Assert.assertTrue(
"An erased index for validation should be contained"
+ " in valid indexes for decoding",
validIndexesList.contains(newErasedIndex));
Assert.assertFalse(
"An erased index for validation should not be contained"
+ " in valid indexes for validation",
newValidIndexesList.contains(newErasedIndex));
}
private void verifyChunksEqual(ECChunk[] chunks1, ECChunk[] chunks2) {
boolean result = Arrays.deepEquals(toArrays(chunks1), toArrays(chunks2));
assertTrue("Recovered chunks should not be changed", result);
}
/**
* Test if validator throws {@link InvalidDecodingException} when
* a decoded output buffer is polluted.
*/
@Test
public void testValidateWithBadDecoding() throws IOException {
prepare(null, numDataUnits, numParityUnits, erasedDataIndexes,
erasedParityIndexes);
this.usingDirectBuffer = true;
prepareCoders(true);
prepareValidator(true);
prepareBufferAllocator(false);
// encode
ECChunk[] dataChunks = prepareDataChunksForEncoding();
ECChunk[] parityChunks = prepareParityChunksForEncoding();
ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
try {
encoder.encode(dataChunks, parityChunks);
} catch (Exception e) {
Assert.fail("Should not get Exception: " + e.getMessage());
}
// decode
backupAndEraseChunks(clonedDataChunks, parityChunks);
ECChunk[] inputChunks =
prepareInputChunksForDecoding(clonedDataChunks, parityChunks);
markChunks(inputChunks);
ensureOnlyLeastRequiredChunks(inputChunks);
ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
int[] erasedIndexes = getErasedIndexesForDecoding();
try {
decoder.decode(inputChunks, erasedIndexes, recoveredChunks);
} catch (Exception e) {
Assert.fail("Should not get Exception: " + e.getMessage());
}
// validate
restoreChunksFromMark(inputChunks);
polluteSomeChunk(recoveredChunks);
try {
validator.validate(inputChunks, erasedIndexes, recoveredChunks);
Assert.fail("Validation should fail due to bad decoding");
} catch (InvalidDecodingException e) {
String expected = "Failed to validate decoding";
GenericTestUtils.assertExceptionContains(expected, e);
}
}
}

View File

@ -334,7 +334,7 @@ public abstract class TestRawCoderBase extends TestCoderBase {
verifyBufferPositionAtEnd(inputChunks); verifyBufferPositionAtEnd(inputChunks);
} }
private void verifyBufferPositionAtEnd(ECChunk[] inputChunks) { void verifyBufferPositionAtEnd(ECChunk[] inputChunks) {
for (ECChunk chunk : inputChunks) { for (ECChunk chunk : inputChunks) {
if (chunk != null) { if (chunk != null) {
Assert.assertEquals(0, chunk.getBuffer().remaining()); Assert.assertEquals(0, chunk.getBuffer().remaining());

View File

@ -764,6 +764,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.ec.reconstruction.xmits.weight"; "dfs.datanode.ec.reconstruction.xmits.weight";
public static final float DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_DEFAULT = public static final float DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_DEFAULT =
0.5f; 0.5f;
public static final String DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY =
"dfs.datanode.ec.reconstruction.validation";
public static final boolean DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE = false;
public static final String public static final String
DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY = DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
/** /**
* Used for injecting faults in DFSClient and DFSOutputStream tests. * Used for injecting faults in DFSClient and DFSOutputStream tests.
@ -127,4 +128,10 @@ public class DataNodeFaultInjector {
* Just delay a while. * Just delay a while.
*/ */
public void delay() {} public void delay() {}
/**
* Used as a hook to inject data pollution
* into an erasure coding reconstruction.
*/
public void badDecoding(ByteBuffer[] outputs) {}
} }

View File

@ -57,6 +57,7 @@ public abstract class StripedBlockChecksumReconstructor
private void init() throws IOException { private void init() throws IOException {
initDecoderIfNecessary(); initDecoderIfNecessary();
initDecodingValidatorIfNecessary();
getStripedReader().init(); getStripedReader().init();
// allocate buffer to keep the reconstructed block data // allocate buffer to keep the reconstructed block data
targetBuffer = allocateBuffer(getBufferSize()); targetBuffer = allocateBuffer(getBufferSize());
@ -192,7 +193,16 @@ public abstract class StripedBlockChecksumReconstructor
for (int i = 0; i < targetIndices.length; i++) { for (int i = 0; i < targetIndices.length; i++) {
tarIndices[i] = targetIndices[i]; tarIndices[i] = targetIndices[i];
} }
getDecoder().decode(inputs, tarIndices, outputs);
if (isValidationEnabled()) {
markBuffers(inputs);
getDecoder().decode(inputs, tarIndices, outputs);
resetBuffers(inputs);
getValidator().validate(inputs, tarIndices, outputs);
} else {
getDecoder().decode(inputs, tarIndices, outputs);
}
} }
/** /**

View File

@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.io.erasurecode.rawcoder.InvalidDecodingException;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
/** /**
@ -53,6 +54,8 @@ class StripedBlockReconstructor extends StripedReconstructor
try { try {
initDecoderIfNecessary(); initDecoderIfNecessary();
initDecodingValidatorIfNecessary();
getStripedReader().init(); getStripedReader().init();
stripedWriter.init(); stripedWriter.init();
@ -126,12 +129,31 @@ class StripedBlockReconstructor extends StripedReconstructor
int[] erasedIndices = stripedWriter.getRealTargetIndices(); int[] erasedIndices = stripedWriter.getRealTargetIndices();
ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen); ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
if (isValidationEnabled()) {
markBuffers(inputs);
decode(inputs, erasedIndices, outputs);
resetBuffers(inputs);
DataNodeFaultInjector.get().badDecoding(outputs);
try {
getValidator().validate(inputs, erasedIndices, outputs);
} catch (InvalidDecodingException e) {
getDatanode().getMetrics().incrECInvalidReconstructionTasks();
throw e;
}
} else {
decode(inputs, erasedIndices, outputs);
}
stripedWriter.updateRealTargetBuffers(toReconstructLen);
}
private void decode(ByteBuffer[] inputs, int[] erasedIndices,
ByteBuffer[] outputs) throws IOException {
long start = System.nanoTime(); long start = System.nanoTime();
getDecoder().decode(inputs, erasedIndices, outputs); getDecoder().decode(inputs, erasedIndices, outputs);
long end = System.nanoTime(); long end = System.nanoTime();
this.getDatanode().getMetrics().incrECDecodingTime(end - start); this.getDatanode().getMetrics().incrECDecodingTime(end - start);
stripedWriter.updateRealTargetBuffers(toReconstructLen);
} }
/** /**

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode.erasurecode; package org.apache.hadoop.hdfs.server.datanode.erasurecode;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.erasurecode.rawcoder.DecodingValidator;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -103,10 +105,14 @@ abstract class StripedReconstructor {
private final Configuration conf; private final Configuration conf;
private final DataNode datanode; private final DataNode datanode;
private final ErasureCodingPolicy ecPolicy; private final ErasureCodingPolicy ecPolicy;
private final ErasureCoderOptions coderOptions;
private RawErasureDecoder decoder; private RawErasureDecoder decoder;
private final ExtendedBlock blockGroup; private final ExtendedBlock blockGroup;
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
private final boolean isValidationEnabled;
private DecodingValidator validator;
// position in striped internal block // position in striped internal block
private long positionInBlock; private long positionInBlock;
private StripedReader stripedReader; private StripedReader stripedReader;
@ -136,6 +142,13 @@ abstract class StripedReconstructor {
cachingStrategy = CachingStrategy.newDefaultStrategy(); cachingStrategy = CachingStrategy.newDefaultStrategy();
positionInBlock = 0L; positionInBlock = 0L;
coderOptions = new ErasureCoderOptions(
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
isValidationEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY,
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE)
&& !coderOptions.allowChangeInputs();
} }
public void incrBytesRead(boolean local, long delta) { public void incrBytesRead(boolean local, long delta) {
@ -196,13 +209,18 @@ abstract class StripedReconstructor {
// Initialize decoder // Initialize decoder
protected void initDecoderIfNecessary() { protected void initDecoderIfNecessary() {
if (decoder == null) { if (decoder == null) {
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
decoder = CodecUtil.createRawDecoder(conf, ecPolicy.getCodecName(), decoder = CodecUtil.createRawDecoder(conf, ecPolicy.getCodecName(),
coderOptions); coderOptions);
} }
} }
// Initialize decoding validator
protected void initDecodingValidatorIfNecessary() {
if (isValidationEnabled && validator == null) {
validator = new DecodingValidator(decoder);
}
}
long getPositionInBlock() { long getPositionInBlock() {
return positionInBlock; return positionInBlock;
} }
@ -285,4 +303,28 @@ abstract class StripedReconstructor {
static ByteBufferPool getBufferPool() { static ByteBufferPool getBufferPool() {
return BUFFER_POOL; return BUFFER_POOL;
} }
boolean isValidationEnabled() {
return isValidationEnabled;
}
DecodingValidator getValidator() {
return validator;
}
protected static void markBuffers(ByteBuffer[] buffers) {
for (ByteBuffer buffer: buffers) {
if (buffer != null) {
buffer.mark();
}
}
}
protected static void resetBuffers(ByteBuffer[] buffers) {
for (ByteBuffer buffer: buffers) {
if (buffer != null) {
buffer.reset();
}
}
}
} }

View File

@ -145,6 +145,8 @@ public class DataNodeMetrics {
MutableCounterLong ecReconstructionTasks; MutableCounterLong ecReconstructionTasks;
@Metric("Count of erasure coding failed reconstruction tasks") @Metric("Count of erasure coding failed reconstruction tasks")
MutableCounterLong ecFailedReconstructionTasks; MutableCounterLong ecFailedReconstructionTasks;
@Metric("Count of erasure coding invalidated reconstruction tasks")
private MutableCounterLong ecInvalidReconstructionTasks;
@Metric("Nanoseconds spent by decoding tasks") @Metric("Nanoseconds spent by decoding tasks")
MutableCounterLong ecDecodingTimeNanos; MutableCounterLong ecDecodingTimeNanos;
@Metric("Bytes read by erasure coding worker") @Metric("Bytes read by erasure coding worker")
@ -486,6 +488,14 @@ public class DataNodeMetrics {
ecFailedReconstructionTasks.incr(); ecFailedReconstructionTasks.incr();
} }
public void incrECInvalidReconstructionTasks() {
ecInvalidReconstructionTasks.incr();
}
public long getECInvalidReconstructionTasks() {
return ecInvalidReconstructionTasks.value();
}
public void incrDataNodeActiveXceiversCount() { public void incrDataNodeActiveXceiversCount() {
dataNodeActiveXceiversCount.incr(); dataNodeActiveXceiversCount.incr();
} }

View File

@ -3317,6 +3317,16 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.ec.reconstruction.validation</name>
<value>false</value>
<description>
Decide if datanode validates that EC reconstruction tasks reconstruct
target blocks correctly. When validation fails, reconstruction tasks
will fail and be retried by namenode.
</description>
</property>
<property> <property>
<name>dfs.namenode.quota.init-threads</name> <name>dfs.namenode.quota.init-threads</name>
<value>4</value> <value>4</value>

View File

@ -107,6 +107,23 @@ public class TestReconstructStripedFile {
return StripedFileTestUtil.getDefaultECPolicy(); return StripedFileTestUtil.getDefaultECPolicy();
} }
public boolean isValidationEnabled() {
return false;
}
public int getPendingTimeout() {
return DFSConfigKeys
.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT;
}
public int getBlockSize() {
return blockSize;
}
public MiniDFSCluster getCluster() {
return cluster;
}
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
ecPolicy = getEcPolicy(); ecPolicy = getEcPolicy();
@ -130,6 +147,11 @@ public class TestReconstructStripedFile {
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
NativeRSRawErasureCoderFactory.CODER_NAME); NativeRSRawErasureCoderFactory.CODER_NAME);
} }
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
getPendingTimeout());
conf.setBoolean(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY,
isValidationEnabled());
File basedir = new File(GenericTestUtils.getRandomizedTempPath()); File basedir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, basedir).numDataNodes(dnNum) cluster = new MiniDFSCluster.Builder(conf, basedir).numDataNodes(dnNum)
.build(); .build();
@ -305,7 +327,7 @@ public class TestReconstructStripedFile {
* and verify the block replica length, generationStamp and content. * and verify the block replica length, generationStamp and content.
* 2. Read the file and verify content. * 2. Read the file and verify content.
*/ */
private void assertFileBlocksReconstruction(String fileName, int fileLen, void assertFileBlocksReconstruction(String fileName, int fileLen,
ReconstructionType type, int toRecoverBlockNum) throws Exception { ReconstructionType type, int toRecoverBlockNum) throws Exception {
if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) { if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum); Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This test extends {@link TestReconstructStripedFile} to test
* ec reconstruction validation.
*/
public class TestReconstructStripedFileWithValidator
extends TestReconstructStripedFile {
private static final Logger LOG =
LoggerFactory.getLogger(TestReconstructStripedFileWithValidator.class);
public TestReconstructStripedFileWithValidator() {
LOG.info("run {} with validator.",
TestReconstructStripedFileWithValidator.class.getSuperclass()
.getSimpleName());
}
/**
* This test injects data pollution into decoded outputs once.
* When validation enabled, the first reconstruction task should fail
* in the validation, but the data will be recovered correctly
* by the next task.
* On the other hand, when validation disabled, the first reconstruction task
* will succeed and then lead to data corruption.
*/
@Test(timeout = 120000)
public void testValidatorWithBadDecoding()
throws Exception {
MiniDFSCluster cluster = getCluster();
cluster.getDataNodes().stream()
.map(DataNode::getMetrics)
.map(DataNodeMetrics::getECInvalidReconstructionTasks)
.forEach(n -> Assert.assertEquals(0, (long) n));
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector badDecodingInjector = new DataNodeFaultInjector() {
private final AtomicBoolean flag = new AtomicBoolean(false);
@Override
public void badDecoding(ByteBuffer[] outputs) {
if (!flag.get()) {
for (ByteBuffer output : outputs) {
output.mark();
output.put((byte) (output.get(output.position()) + 1));
output.reset();
}
}
flag.set(true);
}
};
DataNodeFaultInjector.set(badDecodingInjector);
int fileLen =
(getEcPolicy().getNumDataUnits() + getEcPolicy().getNumParityUnits())
* getBlockSize() + getBlockSize() / 10;
try {
assertFileBlocksReconstruction(
"/testValidatorWithBadDecoding",
fileLen,
ReconstructionType.DataOnly,
getEcPolicy().getNumParityUnits());
long sum = cluster.getDataNodes().stream()
.map(DataNode::getMetrics)
.mapToLong(DataNodeMetrics::getECInvalidReconstructionTasks)
.sum();
Assert.assertEquals(1, sum);
} finally {
DataNodeFaultInjector.set(oldInjector);
}
}
@Override
public boolean isValidationEnabled() {
return true;
}
/**
* Set a small value for the failed reconstruction task to be
* rescheduled in a short period of time.
*/
@Override
public int getPendingTimeout() {
return 10;
}
}