diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingValidator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingValidator.java new file mode 100644 index 00000000000..9597058b6fe --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/DecodingValidator.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.io.erasurecode.ECChunk; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A utility class to validate decoding. + */ +@InterfaceAudience.Private +public class DecodingValidator { + + private final RawErasureDecoder decoder; + private ByteBuffer buffer; + private int[] newValidIndexes; + private int newErasedIndex; + + public DecodingValidator(RawErasureDecoder decoder) { + this.decoder = decoder; + } + + /** + * Validate outputs decoded from inputs, by decoding an input back from + * the outputs and comparing it with the original one. + * + * For instance, in RS (6, 3), let (d0, d1, d2, d3, d4, d5) be sources + * and (p0, p1, p2) be parities, and assume + * inputs = [d0, null (d1), d2, d3, d4, d5, null (p0), p1, null (p2)]; + * erasedIndexes = [1, 6]; + * outputs = [d1, p0]. + * Then + * 1. Create new inputs, erasedIndexes and outputs for validation so that + * the inputs could contain the decoded outputs, and decode them: + * newInputs = [d1, d2, d3, d4, d5, p0] + * newErasedIndexes = [0] + * newOutputs = [d0'] + * 2. Compare d0 and d0'. The comparison will fail with high probability + * when the initial outputs are wrong. + * + * Note that the input buffers' positions must be the ones where data are + * read: If the input buffers have been processed by a decoder, the buffers' + * positions must be reset before being passed into this method. + * + * This method does not change outputs and erasedIndexes. + * + * @param inputs input buffers used for decoding. The buffers' position + * are moved to the end after this method. + * @param erasedIndexes indexes of erased units used for decoding + * @param outputs decoded output buffers, which are ready to be read after + * the call + * @throws IOException + */ + public void validate(ByteBuffer[] inputs, int[] erasedIndexes, + ByteBuffer[] outputs) throws IOException { + markBuffers(outputs); + + try { + ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs); + boolean isDirect = validInput.isDirect(); + int capacity = validInput.capacity(); + int remaining = validInput.remaining(); + + // Init buffer + if (buffer == null || buffer.isDirect() != isDirect + || buffer.capacity() < remaining) { + buffer = allocateBuffer(isDirect, capacity); + } + buffer.clear().limit(remaining); + + // Create newInputs and newErasedIndex for validation + ByteBuffer[] newInputs = new ByteBuffer[inputs.length]; + int count = 0; + for (int i = 0; i < erasedIndexes.length; i++) { + newInputs[erasedIndexes[i]] = outputs[i]; + count++; + } + newErasedIndex = -1; + boolean selected = false; + int numValidIndexes = CoderUtil.getValidIndexes(inputs).length; + for (int i = 0; i < newInputs.length; i++) { + if (count == numValidIndexes) { + break; + } else if (!selected && inputs[i] != null) { + newErasedIndex = i; + newInputs[i] = null; + selected = true; + } else if (newInputs[i] == null) { + newInputs[i] = inputs[i]; + if (inputs[i] != null) { + count++; + } + } + } + + // Keep it for testing + newValidIndexes = CoderUtil.getValidIndexes(newInputs); + + decoder.decode(newInputs, new int[]{newErasedIndex}, + new ByteBuffer[]{buffer}); + + if (!buffer.equals(inputs[newErasedIndex])) { + throw new InvalidDecodingException("Failed to validate decoding"); + } + } finally { + toLimits(inputs); + resetBuffers(outputs); + } + } + + /** + * Validate outputs decoded from inputs, by decoding an input back from + * those outputs and comparing it with the original one. + * @param inputs input buffers used for decoding + * @param erasedIndexes indexes of erased units used for decoding + * @param outputs decoded output buffers + * @throws IOException + */ + public void validate(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs) + throws IOException { + ByteBuffer[] newInputs = CoderUtil.toBuffers(inputs); + ByteBuffer[] newOutputs = CoderUtil.toBuffers(outputs); + validate(newInputs, erasedIndexes, newOutputs); + } + + private ByteBuffer allocateBuffer(boolean direct, int capacity) { + if (direct) { + buffer = ByteBuffer.allocateDirect(capacity); + } else { + buffer = ByteBuffer.allocate(capacity); + } + return buffer; + } + + private static void markBuffers(ByteBuffer[] buffers) { + for (ByteBuffer buffer: buffers) { + if (buffer != null) { + buffer.mark(); + } + } + } + + private static void resetBuffers(ByteBuffer[] buffers) { + for (ByteBuffer buffer: buffers) { + if (buffer != null) { + buffer.reset(); + } + } + } + + private static void toLimits(ByteBuffer[] buffers) { + for (ByteBuffer buffer: buffers) { + if (buffer != null) { + buffer.position(buffer.limit()); + } + } + } + + @VisibleForTesting + protected int[] getNewValidIndexes() { + return newValidIndexes; + } + + @VisibleForTesting + protected int getNewErasedIndex() { + return newErasedIndex; + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/InvalidDecodingException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/InvalidDecodingException.java new file mode 100644 index 00000000000..37869f8eede --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/InvalidDecodingException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.io.IOException; + +/** + * Thrown for invalid decoding. + */ +@InterfaceAudience.Private +public class InvalidDecodingException + extends IOException { + private static final long serialVersionUID = 0L; + + public InvalidDecodingException(String description) { + super(description); + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index b93a11ce1c9..cc80e15ed2e 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -449,6 +449,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i | `BlocksDeletedInPendingIBR` | Number of blocks at deleted status in pending incremental block report (IBR) | | `EcReconstructionTasks` | Total number of erasure coding reconstruction tasks | | `EcFailedReconstructionTasks` | Total number of erasure coding failed reconstruction tasks | +| `EcInvalidReconstructionTasks` | Total number of erasure coding invalidated reconstruction tasks | | `EcDecodingTimeNanos` | Total number of nanoseconds spent by decoding tasks | | `EcReconstructionBytesRead` | Total number of bytes read by erasure coding worker | | `EcReconstructionBytesWritten` | Total number of bytes written by erasure coding worker | diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java index caab72ceff6..811148464b7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java @@ -519,4 +519,16 @@ protected void corruptSomeChunk(ECChunk[] chunks) { buffer.position(buffer.position() + 1); } } + + /** + * Pollute some chunk. + * @param chunks + */ + protected void polluteSomeChunk(ECChunk[] chunks) { + int idx = new Random().nextInt(chunks.length); + ByteBuffer buffer = chunks[idx].getBuffer(); + buffer.mark(); + buffer.put((byte) ((buffer.get(buffer.position()) + 1))); + buffer.reset(); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDecodingValidator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDecodingValidator.java new file mode 100644 index 00000000000..06744cccc0a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestDecodingValidator.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.erasurecode.rawcoder; + +import org.apache.hadoop.io.erasurecode.ECChunk; +import org.apache.hadoop.io.erasurecode.ErasureCodeNative; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertTrue; + +/** + * Test {@link DecodingValidator} under various decoders. + */ +@RunWith(Parameterized.class) +public class TestDecodingValidator extends TestRawCoderBase { + + private DecodingValidator validator; + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + {RSRawErasureCoderFactory.class, 6, 3, new int[]{1}, new int[]{}}, + {RSRawErasureCoderFactory.class, 6, 3, new int[]{3}, new int[]{0}}, + {RSRawErasureCoderFactory.class, 6, 3, new int[]{2, 4}, new int[]{1}}, + {NativeRSRawErasureCoderFactory.class, 6, 3, new int[]{0}, new int[]{}}, + {XORRawErasureCoderFactory.class, 10, 1, new int[]{0}, new int[]{}}, + {NativeXORRawErasureCoderFactory.class, 10, 1, new int[]{0}, + new int[]{}} + }); + } + + public TestDecodingValidator( + Class factoryClass, int numDataUnits, + int numParityUnits, int[] erasedDataIndexes, int[] erasedParityIndexes) { + this.encoderFactoryClass = factoryClass; + this.decoderFactoryClass = factoryClass; + this.numDataUnits = numDataUnits; + this.numParityUnits = numParityUnits; + this.erasedDataIndexes = erasedDataIndexes; + this.erasedParityIndexes = erasedParityIndexes; + } + + @Before + public void setup() { + if (encoderFactoryClass == NativeRSRawErasureCoderFactory.class + || encoderFactoryClass == NativeXORRawErasureCoderFactory.class) { + Assume.assumeTrue(ErasureCodeNative.isNativeCodeLoaded()); + } + setAllowDump(false); + } + + /** + * Test if the same validator can process direct and non-direct buffers. + */ + @Test + public void testValidate() { + prepare(null, numDataUnits, numParityUnits, erasedDataIndexes, + erasedParityIndexes); + testValidate(true); + testValidate(false); + } + + /** + * Test if the same validator can process variable width of data for + * inputs and outputs. + */ + protected void testValidate(boolean usingDirectBuffer) { + this.usingDirectBuffer = usingDirectBuffer; + prepareCoders(false); + prepareValidator(false); + + performTestValidate(baseChunkSize); + performTestValidate(baseChunkSize - 17); + performTestValidate(baseChunkSize + 18); + } + + protected void prepareValidator(boolean recreate) { + if (validator == null || recreate) { + validator = new DecodingValidator(decoder); + } + } + + protected void performTestValidate(int chunkSize) { + setChunkSize(chunkSize); + prepareBufferAllocator(false); + + // encode + ECChunk[] dataChunks = prepareDataChunksForEncoding(); + ECChunk[] parityChunks = prepareParityChunksForEncoding(); + ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks); + try { + encoder.encode(dataChunks, parityChunks); + } catch (Exception e) { + Assert.fail("Should not get Exception: " + e.getMessage()); + } + + // decode + backupAndEraseChunks(clonedDataChunks, parityChunks); + ECChunk[] inputChunks = + prepareInputChunksForDecoding(clonedDataChunks, parityChunks); + markChunks(inputChunks); + ensureOnlyLeastRequiredChunks(inputChunks); + ECChunk[] recoveredChunks = prepareOutputChunksForDecoding(); + int[] erasedIndexes = getErasedIndexesForDecoding(); + try { + decoder.decode(inputChunks, erasedIndexes, recoveredChunks); + } catch (Exception e) { + Assert.fail("Should not get Exception: " + e.getMessage()); + } + + // validate + restoreChunksFromMark(inputChunks); + ECChunk[] clonedInputChunks = cloneChunksWithData(inputChunks); + ECChunk[] clonedRecoveredChunks = cloneChunksWithData(recoveredChunks); + int[] clonedErasedIndexes = erasedIndexes.clone(); + + try { + validator.validate(clonedInputChunks, clonedErasedIndexes, + clonedRecoveredChunks); + } catch (Exception e) { + Assert.fail("Should not get Exception: " + e.getMessage()); + } + + // Check if input buffers' positions are moved to the end + verifyBufferPositionAtEnd(clonedInputChunks); + + // Check if validator does not change recovered chunks and erased indexes + verifyChunksEqual(recoveredChunks, clonedRecoveredChunks); + Assert.assertArrayEquals("Erased indexes should not be changed", + erasedIndexes, clonedErasedIndexes); + + // Check if validator uses correct indexes for validation + List validIndexesList = + IntStream.of(CoderUtil.getValidIndexes(inputChunks)).boxed() + .collect(Collectors.toList()); + List newValidIndexesList = + IntStream.of(validator.getNewValidIndexes()).boxed() + .collect(Collectors.toList()); + List erasedIndexesList = + IntStream.of(erasedIndexes).boxed().collect(Collectors.toList()); + int newErasedIndex = validator.getNewErasedIndex(); + Assert.assertTrue( + "Valid indexes for validation should contain" + + " erased indexes for decoding", + newValidIndexesList.containsAll(erasedIndexesList)); + Assert.assertTrue( + "An erased index for validation should be contained" + + " in valid indexes for decoding", + validIndexesList.contains(newErasedIndex)); + Assert.assertFalse( + "An erased index for validation should not be contained" + + " in valid indexes for validation", + newValidIndexesList.contains(newErasedIndex)); + } + + private void verifyChunksEqual(ECChunk[] chunks1, ECChunk[] chunks2) { + boolean result = Arrays.deepEquals(toArrays(chunks1), toArrays(chunks2)); + assertTrue("Recovered chunks should not be changed", result); + } + + /** + * Test if validator throws {@link InvalidDecodingException} when + * a decoded output buffer is polluted. + */ + @Test + public void testValidateWithBadDecoding() throws IOException { + prepare(null, numDataUnits, numParityUnits, erasedDataIndexes, + erasedParityIndexes); + this.usingDirectBuffer = true; + prepareCoders(true); + prepareValidator(true); + prepareBufferAllocator(false); + + // encode + ECChunk[] dataChunks = prepareDataChunksForEncoding(); + ECChunk[] parityChunks = prepareParityChunksForEncoding(); + ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks); + try { + encoder.encode(dataChunks, parityChunks); + } catch (Exception e) { + Assert.fail("Should not get Exception: " + e.getMessage()); + } + + // decode + backupAndEraseChunks(clonedDataChunks, parityChunks); + ECChunk[] inputChunks = + prepareInputChunksForDecoding(clonedDataChunks, parityChunks); + markChunks(inputChunks); + ensureOnlyLeastRequiredChunks(inputChunks); + ECChunk[] recoveredChunks = prepareOutputChunksForDecoding(); + int[] erasedIndexes = getErasedIndexesForDecoding(); + try { + decoder.decode(inputChunks, erasedIndexes, recoveredChunks); + } catch (Exception e) { + Assert.fail("Should not get Exception: " + e.getMessage()); + } + + // validate + restoreChunksFromMark(inputChunks); + polluteSomeChunk(recoveredChunks); + try { + validator.validate(inputChunks, erasedIndexes, recoveredChunks); + Assert.fail("Validation should fail due to bad decoding"); + } catch (InvalidDecodingException e) { + String expected = "Failed to validate decoding"; + GenericTestUtils.assertExceptionContains(expected, e); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java index 4519e357bd1..eb63494507e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java @@ -334,7 +334,7 @@ protected void testInputPosition(boolean usingDirectBuffer) { verifyBufferPositionAtEnd(inputChunks); } - private void verifyBufferPositionAtEnd(ECChunk[] inputChunks) { + void verifyBufferPositionAtEnd(ECChunk[] inputChunks) { for (ECChunk chunk : inputChunks) { if (chunk != null) { Assert.assertEquals(0, chunk.getBuffer().remaining()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index c29d91c421a..b2c94544811 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -867,6 +867,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.ec.reconstruction.xmits.weight"; public static final float DFS_DN_EC_RECONSTRUCTION_XMITS_WEIGHT_DEFAULT = 0.5f; + public static final String DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY = + "dfs.datanode.ec.reconstruction.validation"; + public static final boolean DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE = false; public static final String DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 58c589e4d25..e3b9cf0399c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -23,6 +23,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; /** * Used for injecting faults in DFSClient and DFSOutputStream tests. @@ -142,4 +143,10 @@ public void blockUtilSendFullBlockReport() {} * Just delay a while. */ public void delay() {} + + /** + * Used as a hook to inject data pollution + * into an erasure coding reconstruction. + */ + public void badDecoding(ByteBuffer[] outputs) {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java index e28d6c556b8..a196935219e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java @@ -57,6 +57,7 @@ protected StripedBlockChecksumReconstructor(ErasureCodingWorker worker, private void init() throws IOException { initDecoderIfNecessary(); + initDecodingValidatorIfNecessary(); getStripedReader().init(); // allocate buffer to keep the reconstructed block data targetBuffer = allocateBuffer(getBufferSize()); @@ -192,7 +193,16 @@ private void reconstructTargets(int toReconstructLen) throws IOException { for (int i = 0; i < targetIndices.length; i++) { tarIndices[i] = targetIndices[i]; } - getDecoder().decode(inputs, tarIndices, outputs); + + if (isValidationEnabled()) { + markBuffers(inputs); + getDecoder().decode(inputs, tarIndices, outputs); + resetBuffers(inputs); + + getValidator().validate(inputs, tarIndices, outputs); + } else { + getDecoder().decode(inputs, tarIndices, outputs); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java index 1af2380886a..cd59f515283 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; +import org.apache.hadoop.io.erasurecode.rawcoder.InvalidDecodingException; import org.apache.hadoop.util.Time; /** @@ -53,6 +54,8 @@ public void run() { try { initDecoderIfNecessary(); + initDecodingValidatorIfNecessary(); + getStripedReader().init(); stripedWriter.init(); @@ -126,12 +129,31 @@ private void reconstructTargets(int toReconstructLen) throws IOException { int[] erasedIndices = stripedWriter.getRealTargetIndices(); ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen); + if (isValidationEnabled()) { + markBuffers(inputs); + decode(inputs, erasedIndices, outputs); + resetBuffers(inputs); + + DataNodeFaultInjector.get().badDecoding(outputs); + try { + getValidator().validate(inputs, erasedIndices, outputs); + } catch (InvalidDecodingException e) { + getDatanode().getMetrics().incrECInvalidReconstructionTasks(); + throw e; + } + } else { + decode(inputs, erasedIndices, outputs); + } + + stripedWriter.updateRealTargetBuffers(toReconstructLen); + } + + private void decode(ByteBuffer[] inputs, int[] erasedIndices, + ByteBuffer[] outputs) throws IOException { long start = System.nanoTime(); getDecoder().decode(inputs, erasedIndices, outputs); long end = System.nanoTime(); this.getDatanode().getMetrics().incrECDecodingTime(end - start); - - stripedWriter.updateRealTargetBuffers(toReconstructLen); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index 851f6956620..06346c6b324 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.datanode.erasurecode; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.io.erasurecode.rawcoder.DecodingValidator; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -103,10 +105,14 @@ abstract class StripedReconstructor { private final Configuration conf; private final DataNode datanode; private final ErasureCodingPolicy ecPolicy; + private final ErasureCoderOptions coderOptions; private RawErasureDecoder decoder; private final ExtendedBlock blockGroup; private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); + private final boolean isValidationEnabled; + private DecodingValidator validator; + // position in striped internal block private long positionInBlock; private StripedReader stripedReader; @@ -136,6 +142,13 @@ abstract class StripedReconstructor { cachingStrategy = CachingStrategy.newDefaultStrategy(); positionInBlock = 0L; + + coderOptions = new ErasureCoderOptions( + ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()); + isValidationEnabled = conf.getBoolean( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY, + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_VALUE) + && !coderOptions.allowChangeInputs(); } public void incrBytesRead(boolean local, long delta) { @@ -196,13 +209,18 @@ long getBlockLen(int i) { // Initialize decoder protected void initDecoderIfNecessary() { if (decoder == null) { - ErasureCoderOptions coderOptions = new ErasureCoderOptions( - ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()); decoder = CodecUtil.createRawDecoder(conf, ecPolicy.getCodecName(), coderOptions); } } + // Initialize decoding validator + protected void initDecodingValidatorIfNecessary() { + if (isValidationEnabled && validator == null) { + validator = new DecodingValidator(decoder); + } + } + long getPositionInBlock() { return positionInBlock; } @@ -285,4 +303,28 @@ public ErasureCodingWorker getErasureCodingWorker() { static ByteBufferPool getBufferPool() { return BUFFER_POOL; } + + boolean isValidationEnabled() { + return isValidationEnabled; + } + + DecodingValidator getValidator() { + return validator; + } + + protected static void markBuffers(ByteBuffer[] buffers) { + for (ByteBuffer buffer: buffers) { + if (buffer != null) { + buffer.mark(); + } + } + } + + protected static void resetBuffers(ByteBuffer[] buffers) { + for (ByteBuffer buffer: buffers) { + if (buffer != null) { + buffer.reset(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 712d8f46ef7..2debc3ee0a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -152,6 +152,8 @@ public class DataNodeMetrics { MutableCounterLong ecReconstructionTasks; @Metric("Count of erasure coding failed reconstruction tasks") MutableCounterLong ecFailedReconstructionTasks; + @Metric("Count of erasure coding invalidated reconstruction tasks") + private MutableCounterLong ecInvalidReconstructionTasks; @Metric("Nanoseconds spent by decoding tasks") MutableCounterLong ecDecodingTimeNanos; @Metric("Bytes read by erasure coding worker") @@ -543,6 +545,14 @@ public void incrECFailedReconstructionTasks() { ecFailedReconstructionTasks.incr(); } + public void incrECInvalidReconstructionTasks() { + ecInvalidReconstructionTasks.incr(); + } + + public long getECInvalidReconstructionTasks() { + return ecInvalidReconstructionTasks.value(); + } + public void incrDataNodeActiveXceiversCount() { dataNodeActiveXceiversCount.incr(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index c3080581de9..df4df489bfa 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3693,6 +3693,16 @@ + + dfs.datanode.ec.reconstruction.validation + false + + Decide if datanode validates that EC reconstruction tasks reconstruct + target blocks correctly. When validation fails, reconstruction tasks + will fail and be retried by namenode. + + + dfs.namenode.quota.init-threads 4 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java index 902b8992773..67df6d8a8db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -107,6 +107,23 @@ public ErasureCodingPolicy getEcPolicy() { return StripedFileTestUtil.getDefaultECPolicy(); } + public boolean isValidationEnabled() { + return false; + } + + public int getPendingTimeout() { + return DFSConfigKeys + .DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT; + } + + public int getBlockSize() { + return blockSize; + } + + public MiniDFSCluster getCluster() { + return cluster; + } + @Before public void setup() throws IOException { ecPolicy = getEcPolicy(); @@ -128,6 +145,11 @@ public void setup() throws IOException { CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, NativeRSRawErasureCoderFactory.CODER_NAME); } + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, + getPendingTimeout()); + conf.setBoolean(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_VALIDATION_KEY, + isValidationEnabled()); File basedir = new File(GenericTestUtils.getRandomizedTempPath()); cluster = new MiniDFSCluster.Builder(conf, basedir).numDataNodes(dnNum) .build(); @@ -303,7 +325,7 @@ private static void writeFile(DistributedFileSystem fs, String fileName, * and verify the block replica length, generationStamp and content. * 2. Read the file and verify content. */ - private void assertFileBlocksReconstruction(String fileName, int fileLen, + void assertFileBlocksReconstruction(String fileName, int fileLen, ReconstructionType type, int toRecoverBlockNum) throws Exception { if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) { Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFileWithValidator.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFileWithValidator.java new file mode 100644 index 00000000000..00749efa4d0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFileWithValidator.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This test extends {@link TestReconstructStripedFile} to test + * ec reconstruction validation. + */ +public class TestReconstructStripedFileWithValidator + extends TestReconstructStripedFile { + private static final Logger LOG = + LoggerFactory.getLogger(TestReconstructStripedFileWithValidator.class); + + public TestReconstructStripedFileWithValidator() { + LOG.info("run {} with validator.", + TestReconstructStripedFileWithValidator.class.getSuperclass() + .getSimpleName()); + } + + /** + * This test injects data pollution into decoded outputs once. + * When validation enabled, the first reconstruction task should fail + * in the validation, but the data will be recovered correctly + * by the next task. + * On the other hand, when validation disabled, the first reconstruction task + * will succeed and then lead to data corruption. + */ + @Test(timeout = 120000) + public void testValidatorWithBadDecoding() + throws Exception { + MiniDFSCluster cluster = getCluster(); + + cluster.getDataNodes().stream() + .map(DataNode::getMetrics) + .map(DataNodeMetrics::getECInvalidReconstructionTasks) + .forEach(n -> Assert.assertEquals(0, (long) n)); + + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector badDecodingInjector = new DataNodeFaultInjector() { + private final AtomicBoolean flag = new AtomicBoolean(false); + + @Override + public void badDecoding(ByteBuffer[] outputs) { + if (!flag.get()) { + for (ByteBuffer output : outputs) { + output.mark(); + output.put((byte) (output.get(output.position()) + 1)); + output.reset(); + } + } + flag.set(true); + } + }; + DataNodeFaultInjector.set(badDecodingInjector); + + int fileLen = + (getEcPolicy().getNumDataUnits() + getEcPolicy().getNumParityUnits()) + * getBlockSize() + getBlockSize() / 10; + try { + assertFileBlocksReconstruction( + "/testValidatorWithBadDecoding", + fileLen, + ReconstructionType.DataOnly, + getEcPolicy().getNumParityUnits()); + + long sum = cluster.getDataNodes().stream() + .map(DataNode::getMetrics) + .mapToLong(DataNodeMetrics::getECInvalidReconstructionTasks) + .sum(); + Assert.assertEquals(1, sum); + } finally { + DataNodeFaultInjector.set(oldInjector); + } + } + + @Override + public boolean isValidationEnabled() { + return true; + } + + /** + * Set a small value for the failed reconstruction task to be + * rescheduled in a short period of time. + */ + @Override + public int getPendingTimeout() { + return 10; + } +}