From 623be287b7517b78bb97a6c406257ec6302bfc9b Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Tue, 21 May 2013 13:45:14 +0000 Subject: [PATCH] HDFS-3875. Issue handling checksum errors in write pipeline. Contributed by Kihwal Lee. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1484809 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hadoop/hdfs/DFSClientFaultInjector.java | 45 ++++++++++ .../apache/hadoop/hdfs/DFSOutputStream.java | 30 +++++++ .../hdfs/server/datanode/BlockReceiver.java | 88 ++++++++++++++----- .../fsdataset/impl/FsDatasetImpl.java | 18 +++- .../apache/hadoop/hdfs/TestCrcCorruption.java | 80 +++++++++++++++++ 6 files changed, 239 insertions(+), 24 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2030df121c0..d4ee06cf9d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2406,6 +2406,8 @@ Release 0.23.8 - UNRELEASED HDFS-4805. Webhdfs client is fragile to token renewal errors (daryn via kihwal) + HDFS-3875. Issue handling checksum errors in write pipeline. (kihwal) + Release 0.23.7 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java new file mode 100644 index 00000000000..1738a4c1bc7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; +import java.io.IOException; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Used for injecting faults in DFSClient and DFSOutputStream tests. + * Calls into this are a no-op in production code. + */ +@VisibleForTesting +@InterfaceAudience.Private +public class DFSClientFaultInjector { + public static DFSClientFaultInjector instance = new DFSClientFaultInjector(); + + public static DFSClientFaultInjector get() { + return instance; + } + + public boolean corruptPacket() { + return false; + } + + public boolean uncorruptPacket() { + return false; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 63544913794..cd5dace5a6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -262,8 +262,18 @@ void writeTo(DataOutputStream stm) throws IOException { System.arraycopy(header.getBytes(), 0, buf, headerStart, header.getSerializedSize()); + // corrupt the data for testing. + if (DFSClientFaultInjector.get().corruptPacket()) { + buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; + } + // Write the now contiguous full packet to the output stream. stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen); + + // undo corruption. + if (DFSClientFaultInjector.get().uncorruptPacket()) { + buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; + } } // get the packet's last byte's offset in the block @@ -331,6 +341,9 @@ public DatanodeInfo load(DatanodeInfo key) throws Exception { /** Nodes have been used in the pipeline before and have failed. */ private final List failed = new ArrayList(); + /** The last ack sequence number before pipeline failure. */ + private long lastAckedSeqnoBeforeFailure = -1; + private int pipelineRecoveryCount = 0; /** Has the current block been hflushed? */ private boolean isHflushed = false; /** Append on an existing block? */ @@ -783,6 +796,23 @@ private boolean processDatanodeError() throws IOException { ackQueue.clear(); } + // Record the new pipeline failure recovery. + if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) { + lastAckedSeqnoBeforeFailure = lastAckedSeqno; + pipelineRecoveryCount = 1; + } else { + // If we had to recover the pipeline five times in a row for the + // same packet, this client likely has corrupt data or corrupting + // during transmission. + if (++pipelineRecoveryCount > 5) { + DFSClient.LOG.warn("Error recovering pipeline for writing " + + block + ". Already retried 5 times for the same packet."); + lastException = new IOException("Failing write. Tried pipeline " + + "recovery 5 times without success."); + streamerClosed = true; + return false; + } + } boolean doSleep = setupPipelineForAppendOrRecovery(); if (!streamerClosed && dfsClient.clientRunning) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 6995fd2d4ed..7dc2684569a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -377,7 +377,8 @@ private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0); } catch (ChecksumException ce) { LOG.warn("Checksum error in block " + block + " from " + inAddr, ce); - if (srcDataNode != null) { + // No need to report to namenode when client is writing. + if (srcDataNode != null && isDatanode) { try { LOG.info("report corrupt " + block + " from datanode " + srcDataNode + " to namenode"); @@ -404,6 +405,19 @@ private void translateChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) { diskChecksum.calculateChunkedSums(dataBuf, checksumBuf); } + /** + * Check whether checksum needs to be verified. + * Skip verifying checksum iff this is not the last one in the + * pipeline and clientName is non-null. i.e. Checksum is verified + * on all the datanodes when the data is being written by a + * datanode rather than a client. Whe client is writing the data, + * protocol includes acks and only the last datanode needs to verify + * checksum. + * @return true if checksum verification is needed, otherwise false. + */ + private boolean shouldVerifyChecksum() { + return (mirrorOut == null || isDatanode || needsChecksumTranslation); + } /** * Receives and processes a packet. It can contain many chunks. @@ -451,9 +465,9 @@ private int receivePacket() throws IOException { } // put in queue for pending acks, unless sync was requested - if (responder != null && !syncBlock) { + if (responder != null && !syncBlock && !shouldVerifyChecksum()) { ((PacketResponder) responder.getRunnable()).enqueue(seqno, - lastPacketInBlock, offsetInBlock); + lastPacketInBlock, offsetInBlock, Status.SUCCESS); } //First write the packet to the mirror: @@ -485,17 +499,26 @@ private int receivePacket() throws IOException { throw new IOException("Length of checksums in packet " + checksumBuf.capacity() + " does not match calculated checksum " + "length " + checksumLen); - } + } - /* skip verifying checksum iff this is not the last one in the - * pipeline and clientName is non-null. i.e. Checksum is verified - * on all the datanodes when the data is being written by a - * datanode rather than a client. Whe client is writing the data, - * protocol includes acks and only the last datanode needs to verify - * checksum. - */ - if (mirrorOut == null || isDatanode || needsChecksumTranslation) { - verifyChunks(dataBuf, checksumBuf); + if (shouldVerifyChecksum()) { + try { + verifyChunks(dataBuf, checksumBuf); + } catch (IOException ioe) { + // checksum error detected locally. there is no reason to continue. + if (responder != null) { + try { + ((PacketResponder) responder.getRunnable()).enqueue(seqno, + lastPacketInBlock, offsetInBlock, + Status.ERROR_CHECKSUM); + // Wait until the responder sends back the response + // and interrupt this thread. + Thread.sleep(3000); + } catch (InterruptedException e) { } + } + throw new IOException("Terminating due to a checksum error." + ioe); + } + if (needsChecksumTranslation) { // overwrite the checksums in the packet buffer with the // appropriate polynomial for the disk storage. @@ -584,9 +607,9 @@ private int receivePacket() throws IOException { // if sync was requested, put in queue for pending acks here // (after the fsync finished) - if (responder != null && syncBlock) { + if (responder != null && (syncBlock || shouldVerifyChecksum())) { ((PacketResponder) responder.getRunnable()).enqueue(seqno, - lastPacketInBlock, offsetInBlock); + lastPacketInBlock, offsetInBlock, Status.SUCCESS); } if (throttler != null) { // throttle I/O @@ -784,7 +807,7 @@ private static enum PacketResponderType { } /** - * Processed responses from downstream datanodes in the pipeline + * Processes responses from downstream datanodes in the pipeline * and sends back replies to the originator. */ class PacketResponder implements Runnable, Closeable { @@ -837,10 +860,11 @@ public String toString() { * @param offsetInBlock */ synchronized void enqueue(final long seqno, - final boolean lastPacketInBlock, final long offsetInBlock) { + final boolean lastPacketInBlock, + final long offsetInBlock, final Status ackStatus) { if (running) { final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock, - System.nanoTime()); + System.nanoTime(), ackStatus); if(LOG.isDebugEnabled()) { LOG.debug(myString + ": enqueue " + p); } @@ -986,20 +1010,31 @@ public void run() { } } + Status myStatus = pkt == null ? Status.SUCCESS : pkt.ackStatus; // construct my ack message Status[] replies = null; if (mirrorError) { // ack read error replies = new Status[2]; - replies[0] = Status.SUCCESS; + replies[0] = myStatus; replies[1] = Status.ERROR; } else { short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0 : ack.getNumOfReplies(); replies = new Status[1+ackLen]; - replies[0] = Status.SUCCESS; + replies[0] = myStatus; for (int i=0; i 0 && replies[1] == Status.ERROR_CHECKSUM) { + running = false; + removeAckHead(); + LOG.warn("Shutting down writer and responder due to a checksum error."); + receiverThread.interrupt(); + continue; + } } PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos); @@ -1018,6 +1053,14 @@ public void run() { removeAckHead(); // update bytes acked } + // terminate after sending response if this node detected + // a checksum error + if (myStatus == Status.ERROR_CHECKSUM) { + running = false; + LOG.warn("Shutting down writer and responder due to a checksum error."); + receiverThread.interrupt(); + continue; + } } catch (IOException e) { LOG.warn("IOException in BlockReceiver.run(): ", e); if (running) { @@ -1062,13 +1105,15 @@ private static class Packet { final boolean lastPacketInBlock; final long offsetInBlock; final long ackEnqueueNanoTime; + final Status ackStatus; Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock, - long ackEnqueueNanoTime) { + long ackEnqueueNanoTime, Status ackStatus) { this.seqno = seqno; this.lastPacketInBlock = lastPacketInBlock; this.offsetInBlock = offsetInBlock; this.ackEnqueueNanoTime = ackEnqueueNanoTime; + this.ackStatus = ackStatus; } @Override @@ -1077,6 +1122,7 @@ public String toString() { + ", lastPacketInBlock=" + lastPacketInBlock + ", offsetInBlock=" + offsetInBlock + ", ackEnqueueNanoTime=" + ackEnqueueNanoTime + + ", ackStatus=" + ackStatus + ")"; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 90cdd5888fe..fd5be0f2556 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -730,13 +730,25 @@ public synchronized ReplicaInPipeline recoverRbw(ExtendedBlock b, } // check replica length - if (rbw.getBytesAcked() < minBytesRcvd || rbw.getNumBytes() > maxBytesRcvd){ + long bytesAcked = rbw.getBytesAcked(); + long numBytes = rbw.getNumBytes(); + if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){ throw new ReplicaNotFoundException("Unmatched length replica " + - replicaInfo + ": BytesAcked = " + rbw.getBytesAcked() + - " BytesRcvd = " + rbw.getNumBytes() + " are not in the range of [" + + replicaInfo + ": BytesAcked = " + bytesAcked + + " BytesRcvd = " + numBytes + " are not in the range of [" + minBytesRcvd + ", " + maxBytesRcvd + "]."); } + // Truncate the potentially corrupt portion. + // If the source was client and the last node in the pipeline was lost, + // any corrupt data written after the acked length can go unnoticed. + if (numBytes > bytesAcked) { + final File replicafile = rbw.getBlockFile(); + truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); + rbw.setNumBytes(bytesAcked); + rbw.setLastChecksumAndDataLen(bytesAcked, null); + } + // bump the replica's generation stamp to newGS bumpReplicaGS(rbw, newGS); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java index 15a26d3e437..1f667be3b40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java @@ -31,11 +31,18 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClientFaultInjector; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.IOUtils; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + /** * A JUnit test for corrupted file handling. * This test creates a bunch of files/directories with replication @@ -64,6 +71,79 @@ * replica was created from the non-corrupted replica. */ public class TestCrcCorruption { + + private DFSClientFaultInjector faultInjector; + + @Before + public void setUp() throws IOException { + faultInjector = Mockito.mock(DFSClientFaultInjector.class); + DFSClientFaultInjector.instance = faultInjector; + } + + /** + * Test case for data corruption during data transmission for + * create/write. To recover from corruption while writing, at + * least two replicas are needed. + */ + @Test(timeout=50000) + public void testCorruptionDuringWrt() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(10).build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + Path file = new Path("/test_corruption_file"); + FSDataOutputStream out = fs.create(file, true, 8192, (short)3, (long)(128*1024*1024)); + byte[] data = new byte[65536]; + for (int i=0; i < 65536; i++) { + data[i] = (byte)(i % 256); + } + + for (int i = 0; i < 5; i++) { + out.write(data, 0, 65535); + } + out.hflush(); + // corrupt the packet once + Mockito.when(faultInjector.corruptPacket()).thenReturn(true, false); + Mockito.when(faultInjector.uncorruptPacket()).thenReturn(true, false); + + for (int i = 0; i < 5; i++) { + out.write(data, 0, 65535); + } + out.close(); + // read should succeed + FSDataInputStream in = fs.open(file); + for(int c; (c = in.read()) != -1; ); + in.close(); + + // test the retry limit + out = fs.create(file, true, 8192, (short)3, (long)(128*1024*1024)); + + // corrupt the packet once and never fix it. + Mockito.when(faultInjector.corruptPacket()).thenReturn(true, false); + Mockito.when(faultInjector.uncorruptPacket()).thenReturn(false); + + // the client should give up pipeline reconstruction after retries. + try { + for (int i = 0; i < 5; i++) { + out.write(data, 0, 65535); + } + out.close(); + fail("Write did not fail"); + } catch (IOException ioe) { + // we should get an ioe + DFSClient.LOG.info("Got expected exception", ioe); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + Mockito.when(faultInjector.corruptPacket()).thenReturn(false); + Mockito.when(faultInjector.uncorruptPacket()).thenReturn(false); + } + } + + /** * check if DFS can handle corrupted CRC blocks */