HDFS-3875. Issue handling checksum errors in write pipeline. Contributed by Kihwal Lee.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1484809 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2013-05-21 13:45:14 +00:00
parent 35b0a8d69b
commit 623be287b7
6 changed files with 239 additions and 24 deletions

View File

@ -2406,6 +2406,8 @@ Release 0.23.8 - UNRELEASED
HDFS-4805. Webhdfs client is fragile to token renewal errors
(daryn via kihwal)
HDFS-3875. Issue handling checksum errors in write pipeline. (kihwal)
Release 0.23.7 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Used for injecting faults in DFSClient and DFSOutputStream tests.
* Calls into this are a no-op in production code.
*/
@VisibleForTesting
@InterfaceAudience.Private
public class DFSClientFaultInjector {
public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
public static DFSClientFaultInjector get() {
return instance;
}
public boolean corruptPacket() {
return false;
}
public boolean uncorruptPacket() {
return false;
}
}

View File

@ -262,8 +262,18 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
System.arraycopy(header.getBytes(), 0, buf, headerStart,
header.getSerializedSize());
// corrupt the data for testing.
if (DFSClientFaultInjector.get().corruptPacket()) {
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
}
// Write the now contiguous full packet to the output stream.
stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
// undo corruption.
if (DFSClientFaultInjector.get().uncorruptPacket()) {
buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
}
}
// get the packet's last byte's offset in the block
@ -331,6 +341,9 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
/** The last ack sequence number before pipeline failure. */
private long lastAckedSeqnoBeforeFailure = -1;
private int pipelineRecoveryCount = 0;
/** Has the current block been hflushed? */
private boolean isHflushed = false;
/** Append on an existing block? */
@ -783,6 +796,23 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
ackQueue.clear();
}
// Record the new pipeline failure recovery.
if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
lastAckedSeqnoBeforeFailure = lastAckedSeqno;
pipelineRecoveryCount = 1;
} else {
// If we had to recover the pipeline five times in a row for the
// same packet, this client likely has corrupt data or corrupting
// during transmission.
if (++pipelineRecoveryCount > 5) {
DFSClient.LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet.");
lastException = new IOException("Failing write. Tried pipeline " +
"recovery 5 times without success.");
streamerClosed = true;
return false;
}
}
boolean doSleep = setupPipelineForAppendOrRecovery();
if (!streamerClosed && dfsClient.clientRunning) {

View File

@ -377,7 +377,8 @@ class BlockReceiver implements Closeable {
clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0);
} catch (ChecksumException ce) {
LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
if (srcDataNode != null) {
// No need to report to namenode when client is writing.
if (srcDataNode != null && isDatanode) {
try {
LOG.info("report corrupt " + block + " from datanode " +
srcDataNode + " to namenode");
@ -404,6 +405,19 @@ class BlockReceiver implements Closeable {
diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
}
/**
* Check whether checksum needs to be verified.
* Skip verifying checksum iff this is not the last one in the
* pipeline and clientName is non-null. i.e. Checksum is verified
* on all the datanodes when the data is being written by a
* datanode rather than a client. Whe client is writing the data,
* protocol includes acks and only the last datanode needs to verify
* checksum.
* @return true if checksum verification is needed, otherwise false.
*/
private boolean shouldVerifyChecksum() {
return (mirrorOut == null || isDatanode || needsChecksumTranslation);
}
/**
* Receives and processes a packet. It can contain many chunks.
@ -451,9 +465,9 @@ class BlockReceiver implements Closeable {
}
// put in queue for pending acks, unless sync was requested
if (responder != null && !syncBlock) {
if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock);
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
//First write the packet to the mirror:
@ -485,17 +499,26 @@ class BlockReceiver implements Closeable {
throw new IOException("Length of checksums in packet " +
checksumBuf.capacity() + " does not match calculated checksum " +
"length " + checksumLen);
}
}
/* skip verifying checksum iff this is not the last one in the
* pipeline and clientName is non-null. i.e. Checksum is verified
* on all the datanodes when the data is being written by a
* datanode rather than a client. Whe client is writing the data,
* protocol includes acks and only the last datanode needs to verify
* checksum.
*/
if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
verifyChunks(dataBuf, checksumBuf);
if (shouldVerifyChecksum()) {
try {
verifyChunks(dataBuf, checksumBuf);
} catch (IOException ioe) {
// checksum error detected locally. there is no reason to continue.
if (responder != null) {
try {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock,
Status.ERROR_CHECKSUM);
// Wait until the responder sends back the response
// and interrupt this thread.
Thread.sleep(3000);
} catch (InterruptedException e) { }
}
throw new IOException("Terminating due to a checksum error." + ioe);
}
if (needsChecksumTranslation) {
// overwrite the checksums in the packet buffer with the
// appropriate polynomial for the disk storage.
@ -584,9 +607,9 @@ class BlockReceiver implements Closeable {
// if sync was requested, put in queue for pending acks here
// (after the fsync finished)
if (responder != null && syncBlock) {
if (responder != null && (syncBlock || shouldVerifyChecksum())) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock);
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
if (throttler != null) { // throttle I/O
@ -784,7 +807,7 @@ class BlockReceiver implements Closeable {
}
/**
* Processed responses from downstream datanodes in the pipeline
* Processes responses from downstream datanodes in the pipeline
* and sends back replies to the originator.
*/
class PacketResponder implements Runnable, Closeable {
@ -837,10 +860,11 @@ class BlockReceiver implements Closeable {
* @param offsetInBlock
*/
synchronized void enqueue(final long seqno,
final boolean lastPacketInBlock, final long offsetInBlock) {
final boolean lastPacketInBlock,
final long offsetInBlock, final Status ackStatus) {
if (running) {
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
System.nanoTime());
System.nanoTime(), ackStatus);
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": enqueue " + p);
}
@ -986,20 +1010,31 @@ class BlockReceiver implements Closeable {
}
}
Status myStatus = pkt == null ? Status.SUCCESS : pkt.ackStatus;
// construct my ack message
Status[] replies = null;
if (mirrorError) { // ack read error
replies = new Status[2];
replies[0] = Status.SUCCESS;
replies[0] = myStatus;
replies[1] = Status.ERROR;
} else {
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
: ack.getNumOfReplies();
replies = new Status[1+ackLen];
replies[0] = Status.SUCCESS;
replies[0] = myStatus;
for (int i=0; i<ackLen; i++) {
replies[i+1] = ack.getReply(i);
}
// If the mirror has reported that it received a corrupt packet,
// do self-destruct to mark myself bad, instead of the mirror node.
// The mirror is guaranteed to be good without corrupt data.
if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
running = false;
removeAckHead();
LOG.warn("Shutting down writer and responder due to a checksum error.");
receiverThread.interrupt();
continue;
}
}
PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos);
@ -1018,6 +1053,14 @@ class BlockReceiver implements Closeable {
removeAckHead();
// update bytes acked
}
// terminate after sending response if this node detected
// a checksum error
if (myStatus == Status.ERROR_CHECKSUM) {
running = false;
LOG.warn("Shutting down writer and responder due to a checksum error.");
receiverThread.interrupt();
continue;
}
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
@ -1062,13 +1105,15 @@ class BlockReceiver implements Closeable {
final boolean lastPacketInBlock;
final long offsetInBlock;
final long ackEnqueueNanoTime;
final Status ackStatus;
Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock,
long ackEnqueueNanoTime) {
long ackEnqueueNanoTime, Status ackStatus) {
this.seqno = seqno;
this.lastPacketInBlock = lastPacketInBlock;
this.offsetInBlock = offsetInBlock;
this.ackEnqueueNanoTime = ackEnqueueNanoTime;
this.ackStatus = ackStatus;
}
@Override
@ -1077,6 +1122,7 @@ class BlockReceiver implements Closeable {
+ ", lastPacketInBlock=" + lastPacketInBlock
+ ", offsetInBlock=" + offsetInBlock
+ ", ackEnqueueNanoTime=" + ackEnqueueNanoTime
+ ", ackStatus=" + ackStatus
+ ")";
}
}

View File

@ -730,13 +730,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
// check replica length
if (rbw.getBytesAcked() < minBytesRcvd || rbw.getNumBytes() > maxBytesRcvd){
long bytesAcked = rbw.getBytesAcked();
long numBytes = rbw.getNumBytes();
if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
throw new ReplicaNotFoundException("Unmatched length replica " +
replicaInfo + ": BytesAcked = " + rbw.getBytesAcked() +
" BytesRcvd = " + rbw.getNumBytes() + " are not in the range of [" +
replicaInfo + ": BytesAcked = " + bytesAcked +
" BytesRcvd = " + numBytes + " are not in the range of [" +
minBytesRcvd + ", " + maxBytesRcvd + "].");
}
// Truncate the potentially corrupt portion.
// If the source was client and the last node in the pipeline was lost,
// any corrupt data written after the acked length can go unnoticed.
if (numBytes > bytesAcked) {
final File replicafile = rbw.getBlockFile();
truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
rbw.setNumBytes(bytesAcked);
rbw.setLastChecksumAndDataLen(bytesAcked, null);
}
// bump the replica's generation stamp to newGS
bumpReplicaGS(rbw, newGS);

View File

@ -31,11 +31,18 @@ import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClientFaultInjector;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
/**
* A JUnit test for corrupted file handling.
* This test creates a bunch of files/directories with replication
@ -64,6 +71,79 @@ import org.junit.Test;
* replica was created from the non-corrupted replica.
*/
public class TestCrcCorruption {
private DFSClientFaultInjector faultInjector;
@Before
public void setUp() throws IOException {
faultInjector = Mockito.mock(DFSClientFaultInjector.class);
DFSClientFaultInjector.instance = faultInjector;
}
/**
* Test case for data corruption during data transmission for
* create/write. To recover from corruption while writing, at
* least two replicas are needed.
*/
@Test(timeout=50000)
public void testCorruptionDuringWrt() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(10).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path file = new Path("/test_corruption_file");
FSDataOutputStream out = fs.create(file, true, 8192, (short)3, (long)(128*1024*1024));
byte[] data = new byte[65536];
for (int i=0; i < 65536; i++) {
data[i] = (byte)(i % 256);
}
for (int i = 0; i < 5; i++) {
out.write(data, 0, 65535);
}
out.hflush();
// corrupt the packet once
Mockito.when(faultInjector.corruptPacket()).thenReturn(true, false);
Mockito.when(faultInjector.uncorruptPacket()).thenReturn(true, false);
for (int i = 0; i < 5; i++) {
out.write(data, 0, 65535);
}
out.close();
// read should succeed
FSDataInputStream in = fs.open(file);
for(int c; (c = in.read()) != -1; );
in.close();
// test the retry limit
out = fs.create(file, true, 8192, (short)3, (long)(128*1024*1024));
// corrupt the packet once and never fix it.
Mockito.when(faultInjector.corruptPacket()).thenReturn(true, false);
Mockito.when(faultInjector.uncorruptPacket()).thenReturn(false);
// the client should give up pipeline reconstruction after retries.
try {
for (int i = 0; i < 5; i++) {
out.write(data, 0, 65535);
}
out.close();
fail("Write did not fail");
} catch (IOException ioe) {
// we should get an ioe
DFSClient.LOG.info("Got expected exception", ioe);
}
} finally {
if (cluster != null) { cluster.shutdown(); }
Mockito.when(faultInjector.corruptPacket()).thenReturn(false);
Mockito.when(faultInjector.uncorruptPacket()).thenReturn(false);
}
}
/**
* check if DFS can handle corrupted CRC blocks
*/