From d2d038b0a0599175db2e7e28174da6e4edac9a25 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Tue, 16 Jun 2015 15:42:00 -0500 Subject: [PATCH] HDFS-4660. Block corruption can happen during pipeline recovery. Contributed by Kihwal Lee. (cherry picked from commit c74517c46bf00af408ed866b6577623cdec02de1) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hdfs/server/datanode/BlockReceiver.java | 128 +++++++++++++----- 2 files changed, 97 insertions(+), 33 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 16f880dbfea..dd6aff77cd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -689,6 +689,8 @@ Release 2.7.1 - UNRELEASED HDFS-8597. Fix TestFSImage#testZeroBlockSize on Windows. (Xiaoyu Yao) + HDFS-4660. Block corruption can happen during pipeline recovery (kihwal) + Release 2.7.0 - 2015-04-20 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index c46892d3e2e..2468f432e28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -588,29 +588,59 @@ class BlockReceiver implements Closeable { try { long onDiskLen = replicaInfo.getBytesOnDisk(); if (onDiskLen bytesPerChecksum) { + throw new IOException("Unexpected packet data length for " + + block + " from " + inAddr + ": a partial chunk must be " + + " sent in an individual packet (data length = " + len + + " > bytesPerChecksum = " + bytesPerChecksum + ")"); } - - // If this is a partial chunk, then read in pre-existing checksum + + // If the last portion of the block file is not a full chunk, + // then read in pre-existing partial data chunk and recalculate + // the checksum so that the checksum calculation can continue + // from the right state. Checksum partialCrc = null; - if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) { + if (doPartialCrc) { if (LOG.isDebugEnabled()) { LOG.debug("receivePacket for " + block - + ": bytesPerChecksum=" + bytesPerChecksum - + " does not divide firstByteInBlock=" + firstByteInBlock); + + ": previous write did not end at the chunk boundary." + + " onDiskLen=" + onDiskLen); } long offsetInChecksum = BlockMetadataHeader.getHeaderSize() + onDiskLen / bytesPerChecksum * checksumSize; partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum); } + // The data buffer position where write will begin. If the packet + // data and on-disk data have no overlap, this will not be at the + // beginning of the buffer. int startByteToDisk = (int)(onDiskLen-firstByteInBlock) + dataBuf.arrayOffset() + dataBuf.position(); + // Actual number of data bytes to write. int numBytesToDisk = (int)(offsetInBlock-onDiskLen); // Write data to disk. @@ -625,31 +655,63 @@ class BlockReceiver implements Closeable { final byte[] lastCrc; if (shouldNotWriteChecksum) { lastCrc = null; - } else if (partialCrc != null) { - // If this is a partial chunk, then verify that this is the only - // chunk in the packet. Calculate new crc for this chunk. - if (len > bytesPerChecksum) { - throw new IOException("Unexpected packet data length for " - + block + " from " + inAddr + ": a partial chunk must be " - + " sent in an individual packet (data length = " + len - + " > bytesPerChecksum = " + bytesPerChecksum + ")"); - } - partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk); - byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize); - lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length); - checksumOut.write(buf); - if(LOG.isDebugEnabled()) { - LOG.debug("Writing out partial crc for data len " + len); - } - partialCrc = null; } else { - // write checksum + int skip = 0; + byte[] crcBytes = null; + + // First, overwrite the partial crc at the end, if necessary. + if (doPartialCrc) { // not chunk-aligned on disk + // Calculate new crc for this chunk. + int bytesToReadForRecalc = + (int)(bytesPerChecksum - partialChunkSizeOnDisk); + if (numBytesToDisk < bytesToReadForRecalc) { + bytesToReadForRecalc = numBytesToDisk; + } + + partialCrc.update(dataBuf.array(), startByteToDisk, + bytesToReadForRecalc); + byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, + checksumSize); + crcBytes = copyLastChunkChecksum(buf, checksumSize, buf.length); + // prepare to overwrite last checksum + adjustCrcFilePosition(); + checksumOut.write(buf); + if(LOG.isDebugEnabled()) { + LOG.debug("Writing out partial crc for data len " + len + + ", skip=" + skip); + } + skip++; // For the partial chunk that was just read. + } + + // Determine how many checksums need to be skipped up to the last + // boundary. The checksum after the boundary was already counted + // above. Only count the number of checksums skipped up to the + // boundary here. + long lastChunkBoundary = onDiskLen - (onDiskLen%bytesPerChecksum); + long skippedDataBytes = lastChunkBoundary - firstByteInBlock; + + if (skippedDataBytes > 0) { + skip += (int)(skippedDataBytes / bytesPerChecksum) + + ((skippedDataBytes % bytesPerChecksum == 0) ? 0 : 1); + } + skip *= checksumSize; // Convert to number of bytes + + // write the rest of checksum final int offset = checksumBuf.arrayOffset() + - checksumBuf.position(); - final int end = offset + checksumLen; - lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize, - end); - checksumOut.write(checksumBuf.array(), offset, checksumLen); + checksumBuf.position() + skip; + final int end = offset + checksumLen - skip; + // If offset > end, there is no more checksum to write. + // I.e. a partial chunk checksum rewrite happened and there is no + // more to write after that. + if (offset > end) { + assert crcBytes != null; + lastCrc = crcBytes; + } else { + final int remainingBytes = checksumLen - skip; + lastCrc = copyLastChunkChecksum(checksumBuf.array(), + checksumSize, end); + checksumOut.write(checksumBuf.array(), offset, remainingBytes); + } } /// flush entire packet, sync if requested