From 98a692fd6361365db4afb9523a5d83ee32774112 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Tue, 21 May 2013 13:42:23 +0000 Subject: [PATCH] HDFS-3875. Issue handling checksum errors in write pipeline. Contributed by Kihwal Lee. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1484808 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hadoop/hdfs/DFSClientFaultInjector.java | 45 ++++++++++ .../apache/hadoop/hdfs/DFSOutputStream.java | 30 +++++++ .../hdfs/server/datanode/BlockReceiver.java | 89 ++++++++++++++----- .../fsdataset/impl/FsDatasetImpl.java | 18 +++- .../apache/hadoop/hdfs/TestCrcCorruption.java | 80 +++++++++++++++++ 6 files changed, 239 insertions(+), 25 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f429329d4d1..52c084dc2fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -3064,6 +3064,8 @@ Release 0.23.8 - UNRELEASED HDFS-4805. Webhdfs client is fragile to token renewal errors (daryn via kihwal) + HDFS-3875. Issue handling checksum errors in write pipeline. (kihwal) + Release 0.23.7 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java new file mode 100644 index 00000000000..1738a4c1bc7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; +import java.io.IOException; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Used for injecting faults in DFSClient and DFSOutputStream tests. + * Calls into this are a no-op in production code. + */ +@VisibleForTesting +@InterfaceAudience.Private +public class DFSClientFaultInjector { + public static DFSClientFaultInjector instance = new DFSClientFaultInjector(); + + public static DFSClientFaultInjector get() { + return instance; + } + + public boolean corruptPacket() { + return false; + } + + public boolean uncorruptPacket() { + return false; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 2d0be1b2110..2b7edee1a24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -254,8 +254,18 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { System.arraycopy(header.getBytes(), 0, buf, headerStart, header.getSerializedSize()); + // corrupt the data for testing. + if (DFSClientFaultInjector.get().corruptPacket()) { + buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; + } + // Write the now contiguous full packet to the output stream. stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen); + + // undo corruption. + if (DFSClientFaultInjector.get().uncorruptPacket()) { + buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; + } } // get the packet's last byte's offset in the block @@ -323,6 +333,9 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { /** Nodes have been used in the pipeline before and have failed. */ private final List failed = new ArrayList(); + /** The last ack sequence number before pipeline failure. */ + private long lastAckedSeqnoBeforeFailure = -1; + private int pipelineRecoveryCount = 0; /** Has the current block been hflushed? */ private boolean isHflushed = false; /** Append on an existing block? */ @@ -779,6 +792,23 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { ackQueue.clear(); } + // Record the new pipeline failure recovery. + if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) { + lastAckedSeqnoBeforeFailure = lastAckedSeqno; + pipelineRecoveryCount = 1; + } else { + // If we had to recover the pipeline five times in a row for the + // same packet, this client likely has corrupt data or corrupting + // during transmission. + if (++pipelineRecoveryCount > 5) { + DFSClient.LOG.warn("Error recovering pipeline for writing " + + block + ". Already retried 5 times for the same packet."); + lastException = new IOException("Failing write. Tried pipeline " + + "recovery 5 times without success."); + streamerClosed = true; + return false; + } + } boolean doSleep = setupPipelineForAppendOrRecovery(); if (!streamerClosed && dfsClient.clientRunning) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 3cf1679b6ec..3990de06e12 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -377,7 +377,8 @@ class BlockReceiver implements Closeable { clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0); } catch (ChecksumException ce) { LOG.warn("Checksum error in block " + block + " from " + inAddr, ce); - if (srcDataNode != null) { + // No need to report to namenode when client is writing. + if (srcDataNode != null && isDatanode) { try { LOG.info("report corrupt " + block + " from datanode " + srcDataNode + " to namenode"); @@ -404,6 +405,19 @@ class BlockReceiver implements Closeable { diskChecksum.calculateChunkedSums(dataBuf, checksumBuf); } + /** + * Check whether checksum needs to be verified. + * Skip verifying checksum iff this is not the last one in the + * pipeline and clientName is non-null. i.e. Checksum is verified + * on all the datanodes when the data is being written by a + * datanode rather than a client. Whe client is writing the data, + * protocol includes acks and only the last datanode needs to verify + * checksum. + * @return true if checksum verification is needed, otherwise false. + */ + private boolean shouldVerifyChecksum() { + return (mirrorOut == null || isDatanode || needsChecksumTranslation); + } /** * Receives and processes a packet. It can contain many chunks. @@ -451,9 +465,9 @@ class BlockReceiver implements Closeable { } // put in queue for pending acks, unless sync was requested - if (responder != null && !syncBlock) { + if (responder != null && !syncBlock && !shouldVerifyChecksum()) { ((PacketResponder) responder.getRunnable()).enqueue(seqno, - lastPacketInBlock, offsetInBlock); + lastPacketInBlock, offsetInBlock, Status.SUCCESS); } //First write the packet to the mirror: @@ -485,17 +499,26 @@ class BlockReceiver implements Closeable { throw new IOException("Length of checksums in packet " + checksumBuf.capacity() + " does not match calculated checksum " + "length " + checksumLen); - } + } - /* skip verifying checksum iff this is not the last one in the - * pipeline and clientName is non-null. i.e. Checksum is verified - * on all the datanodes when the data is being written by a - * datanode rather than a client. Whe client is writing the data, - * protocol includes acks and only the last datanode needs to verify - * checksum. - */ - if (mirrorOut == null || isDatanode || needsChecksumTranslation) { - verifyChunks(dataBuf, checksumBuf); + if (shouldVerifyChecksum()) { + try { + verifyChunks(dataBuf, checksumBuf); + } catch (IOException ioe) { + // checksum error detected locally. there is no reason to continue. + if (responder != null) { + try { + ((PacketResponder) responder.getRunnable()).enqueue(seqno, + lastPacketInBlock, offsetInBlock, + Status.ERROR_CHECKSUM); + // Wait until the responder sends back the response + // and interrupt this thread. + Thread.sleep(3000); + } catch (InterruptedException e) { } + } + throw new IOException("Terminating due to a checksum error." + ioe); + } + if (needsChecksumTranslation) { // overwrite the checksums in the packet buffer with the // appropriate polynomial for the disk storage. @@ -584,9 +607,9 @@ class BlockReceiver implements Closeable { // if sync was requested, put in queue for pending acks here // (after the fsync finished) - if (responder != null && syncBlock) { + if (responder != null && (syncBlock || shouldVerifyChecksum())) { ((PacketResponder) responder.getRunnable()).enqueue(seqno, - lastPacketInBlock, offsetInBlock); + lastPacketInBlock, offsetInBlock, Status.SUCCESS); } if (throttler != null) { // throttle I/O @@ -783,7 +806,7 @@ class BlockReceiver implements Closeable { private static Status[] MIRROR_ERROR_STATUS = {Status.SUCCESS, Status.ERROR}; /** - * Processed responses from downstream datanodes in the pipeline + * Processes responses from downstream datanodes in the pipeline * and sends back replies to the originator. */ class PacketResponder implements Runnable, Closeable { @@ -836,9 +859,9 @@ class BlockReceiver implements Closeable { * @param offsetInBlock */ void enqueue(final long seqno, final boolean lastPacketInBlock, - final long offsetInBlock) { + final long offsetInBlock, final Status ackStatus) { final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock, - System.nanoTime()); + System.nanoTime(), ackStatus); if(LOG.isDebugEnabled()) { LOG.debug(myString + ": enqueue " + p); } @@ -976,7 +999,8 @@ class BlockReceiver implements Closeable { } sendAckUpstream(ack, expected, totalAckTimeNanos, - (pkt != null ? pkt.offsetInBlock : 0)); + (pkt != null ? pkt.offsetInBlock : 0), + (pkt != null ? pkt.ackStatus : Status.SUCCESS)); if (pkt != null) { // remove the packet from the ack queue removeAckHead(); @@ -1038,7 +1062,8 @@ class BlockReceiver implements Closeable { * @param offsetInBlock offset in block for the data in packet */ private void sendAckUpstream(PipelineAck ack, long seqno, - long totalAckTimeNanos, long offsetInBlock) throws IOException { + long totalAckTimeNanos, long offsetInBlock, + Status myStatus) throws IOException { Status[] replies = null; if (mirrorError) { // ack read error replies = MIRROR_ERROR_STATUS; @@ -1046,10 +1071,19 @@ class BlockReceiver implements Closeable { short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack .getNumOfReplies(); replies = new Status[1 + ackLen]; - replies[0] = Status.SUCCESS; + replies[0] = myStatus; for (int i = 0; i < ackLen; i++) { replies[i + 1] = ack.getReply(i); } + // If the mirror has reported that it received a corrupt packet, + // do self-destruct to mark myself bad, instead of making the + // mirror node bad. The mirror is guaranteed to be good without + // corrupt data on disk. + if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) { + throw new IOException("Shutting down writer and responder " + + "since the down streams reported the data sent by this " + + "thread is corrupt"); + } } PipelineAck replyAck = new PipelineAck(seqno, replies, totalAckTimeNanos); @@ -1064,6 +1098,14 @@ class BlockReceiver implements Closeable { if (LOG.isDebugEnabled()) { LOG.debug(myString + ", replyAck=" + replyAck); } + + // If a corruption was detected in the received data, terminate after + // sending ERROR_CHECKSUM back. + if (myStatus == Status.ERROR_CHECKSUM) { + throw new IOException("Shutting down writer and responder " + + "due to a checksum error in received data. The error " + + "response has been sent upstream."); + } } /** @@ -1085,13 +1127,15 @@ class BlockReceiver implements Closeable { final boolean lastPacketInBlock; final long offsetInBlock; final long ackEnqueueNanoTime; + final Status ackStatus; Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock, - long ackEnqueueNanoTime) { + long ackEnqueueNanoTime, Status ackStatus) { this.seqno = seqno; this.lastPacketInBlock = lastPacketInBlock; this.offsetInBlock = offsetInBlock; this.ackEnqueueNanoTime = ackEnqueueNanoTime; + this.ackStatus = ackStatus; } @Override @@ -1100,6 +1144,7 @@ class BlockReceiver implements Closeable { + ", lastPacketInBlock=" + lastPacketInBlock + ", offsetInBlock=" + offsetInBlock + ", ackEnqueueNanoTime=" + ackEnqueueNanoTime + + ", ackStatus=" + ackStatus + ")"; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index d82f32eadcd..524fb4b8800 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -749,13 +749,25 @@ class FsDatasetImpl implements FsDatasetSpi { } // check replica length - if (rbw.getBytesAcked() < minBytesRcvd || rbw.getNumBytes() > maxBytesRcvd){ + long bytesAcked = rbw.getBytesAcked(); + long numBytes = rbw.getNumBytes(); + if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){ throw new ReplicaNotFoundException("Unmatched length replica " + - replicaInfo + ": BytesAcked = " + rbw.getBytesAcked() + - " BytesRcvd = " + rbw.getNumBytes() + " are not in the range of [" + + replicaInfo + ": BytesAcked = " + bytesAcked + + " BytesRcvd = " + numBytes + " are not in the range of [" + minBytesRcvd + ", " + maxBytesRcvd + "]."); } + // Truncate the potentially corrupt portion. + // If the source was client and the last node in the pipeline was lost, + // any corrupt data written after the acked length can go unnoticed. + if (numBytes > bytesAcked) { + final File replicafile = rbw.getBlockFile(); + truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); + rbw.setNumBytes(bytesAcked); + rbw.setLastChecksumAndDataLen(bytesAcked, null); + } + // bump the replica's generation stamp to newGS bumpReplicaGS(rbw, newGS); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java index 15a26d3e437..1f667be3b40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java @@ -31,11 +31,18 @@ import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClientFaultInjector; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.IOUtils; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + /** * A JUnit test for corrupted file handling. * This test creates a bunch of files/directories with replication @@ -64,6 +71,79 @@ import org.junit.Test; * replica was created from the non-corrupted replica. */ public class TestCrcCorruption { + + private DFSClientFaultInjector faultInjector; + + @Before + public void setUp() throws IOException { + faultInjector = Mockito.mock(DFSClientFaultInjector.class); + DFSClientFaultInjector.instance = faultInjector; + } + + /** + * Test case for data corruption during data transmission for + * create/write. To recover from corruption while writing, at + * least two replicas are needed. + */ + @Test(timeout=50000) + public void testCorruptionDuringWrt() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(10).build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + Path file = new Path("/test_corruption_file"); + FSDataOutputStream out = fs.create(file, true, 8192, (short)3, (long)(128*1024*1024)); + byte[] data = new byte[65536]; + for (int i=0; i < 65536; i++) { + data[i] = (byte)(i % 256); + } + + for (int i = 0; i < 5; i++) { + out.write(data, 0, 65535); + } + out.hflush(); + // corrupt the packet once + Mockito.when(faultInjector.corruptPacket()).thenReturn(true, false); + Mockito.when(faultInjector.uncorruptPacket()).thenReturn(true, false); + + for (int i = 0; i < 5; i++) { + out.write(data, 0, 65535); + } + out.close(); + // read should succeed + FSDataInputStream in = fs.open(file); + for(int c; (c = in.read()) != -1; ); + in.close(); + + // test the retry limit + out = fs.create(file, true, 8192, (short)3, (long)(128*1024*1024)); + + // corrupt the packet once and never fix it. + Mockito.when(faultInjector.corruptPacket()).thenReturn(true, false); + Mockito.when(faultInjector.uncorruptPacket()).thenReturn(false); + + // the client should give up pipeline reconstruction after retries. + try { + for (int i = 0; i < 5; i++) { + out.write(data, 0, 65535); + } + out.close(); + fail("Write did not fail"); + } catch (IOException ioe) { + // we should get an ioe + DFSClient.LOG.info("Got expected exception", ioe); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + Mockito.when(faultInjector.corruptPacket()).thenReturn(false); + Mockito.when(faultInjector.uncorruptPacket()).thenReturn(false); + } + } + + /** * check if DFS can handle corrupted CRC blocks */