HDFS-4660. Block corruption can happen during pipeline recovery. Contributed by Kihwal Lee.
(cherry picked from commit c74517c46b
)
This commit is contained in:
parent
fe35ba1df9
commit
6de4d52c31
|
@ -135,6 +135,8 @@ Release 2.7.1 - UNRELEASED
|
||||||
|
|
||||||
HDFS-8597. Fix TestFSImage#testZeroBlockSize on Windows. (Xiaoyu Yao)
|
HDFS-8597. Fix TestFSImage#testZeroBlockSize on Windows. (Xiaoyu Yao)
|
||||||
|
|
||||||
|
HDFS-4660. Block corruption can happen during pipeline recovery (kihwal)
|
||||||
|
|
||||||
Release 2.7.0 - 2015-04-20
|
Release 2.7.0 - 2015-04-20
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -588,29 +588,59 @@ class BlockReceiver implements Closeable {
|
||||||
try {
|
try {
|
||||||
long onDiskLen = replicaInfo.getBytesOnDisk();
|
long onDiskLen = replicaInfo.getBytesOnDisk();
|
||||||
if (onDiskLen<offsetInBlock) {
|
if (onDiskLen<offsetInBlock) {
|
||||||
//finally write to the disk :
|
// Normally the beginning of an incoming packet is aligned with the
|
||||||
|
// existing data on disk. If the beginning packet data offset is not
|
||||||
|
// checksum chunk aligned, the end of packet will not go beyond the
|
||||||
|
// next chunk boundary.
|
||||||
|
// When a failure-recovery is involved, the client state and the
|
||||||
|
// the datanode state may not exactly agree. I.e. the client may
|
||||||
|
// resend part of data that is already on disk. Correct number of
|
||||||
|
// bytes should be skipped when writing the data and checksum
|
||||||
|
// buffers out to disk.
|
||||||
|
long partialChunkSizeOnDisk = onDiskLen % bytesPerChecksum;
|
||||||
|
boolean alignedOnDisk = partialChunkSizeOnDisk == 0;
|
||||||
|
boolean alignedInPacket = firstByteInBlock % bytesPerChecksum == 0;
|
||||||
|
|
||||||
if (onDiskLen % bytesPerChecksum != 0) {
|
// Since data is always appended, not overwritten, partial CRC
|
||||||
// prepare to overwrite last checksum
|
// recalculation is necessary if the on-disk data is not chunk-
|
||||||
adjustCrcFilePosition();
|
// aligned, regardless of whether the beginning of the data in
|
||||||
|
// the packet is chunk-aligned.
|
||||||
|
boolean doPartialCrc = !alignedOnDisk && !shouldNotWriteChecksum;
|
||||||
|
|
||||||
|
// If this is a partial chunk, then verify that this is the only
|
||||||
|
// chunk in the packet. If the starting offset is not chunk
|
||||||
|
// aligned, the packet should terminate at or before the next
|
||||||
|
// chunk boundary.
|
||||||
|
if (!alignedInPacket && len > bytesPerChecksum) {
|
||||||
|
throw new IOException("Unexpected packet data length for "
|
||||||
|
+ block + " from " + inAddr + ": a partial chunk must be "
|
||||||
|
+ " sent in an individual packet (data length = " + len
|
||||||
|
+ " > bytesPerChecksum = " + bytesPerChecksum + ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is a partial chunk, then read in pre-existing checksum
|
// If the last portion of the block file is not a full chunk,
|
||||||
|
// then read in pre-existing partial data chunk and recalculate
|
||||||
|
// the checksum so that the checksum calculation can continue
|
||||||
|
// from the right state.
|
||||||
Checksum partialCrc = null;
|
Checksum partialCrc = null;
|
||||||
if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) {
|
if (doPartialCrc) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("receivePacket for " + block
|
LOG.debug("receivePacket for " + block
|
||||||
+ ": bytesPerChecksum=" + bytesPerChecksum
|
+ ": previous write did not end at the chunk boundary."
|
||||||
+ " does not divide firstByteInBlock=" + firstByteInBlock);
|
+ " onDiskLen=" + onDiskLen);
|
||||||
}
|
}
|
||||||
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
|
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
|
||||||
onDiskLen / bytesPerChecksum * checksumSize;
|
onDiskLen / bytesPerChecksum * checksumSize;
|
||||||
partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum);
|
partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The data buffer position where write will begin. If the packet
|
||||||
|
// data and on-disk data have no overlap, this will not be at the
|
||||||
|
// beginning of the buffer.
|
||||||
int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
|
int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
|
||||||
+ dataBuf.arrayOffset() + dataBuf.position();
|
+ dataBuf.arrayOffset() + dataBuf.position();
|
||||||
|
|
||||||
|
// Actual number of data bytes to write.
|
||||||
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
|
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
|
||||||
|
|
||||||
// Write data to disk.
|
// Write data to disk.
|
||||||
|
@ -625,31 +655,63 @@ class BlockReceiver implements Closeable {
|
||||||
final byte[] lastCrc;
|
final byte[] lastCrc;
|
||||||
if (shouldNotWriteChecksum) {
|
if (shouldNotWriteChecksum) {
|
||||||
lastCrc = null;
|
lastCrc = null;
|
||||||
} else if (partialCrc != null) {
|
} else {
|
||||||
// If this is a partial chunk, then verify that this is the only
|
int skip = 0;
|
||||||
// chunk in the packet. Calculate new crc for this chunk.
|
byte[] crcBytes = null;
|
||||||
if (len > bytesPerChecksum) {
|
|
||||||
throw new IOException("Unexpected packet data length for "
|
// First, overwrite the partial crc at the end, if necessary.
|
||||||
+ block + " from " + inAddr + ": a partial chunk must be "
|
if (doPartialCrc) { // not chunk-aligned on disk
|
||||||
+ " sent in an individual packet (data length = " + len
|
// Calculate new crc for this chunk.
|
||||||
+ " > bytesPerChecksum = " + bytesPerChecksum + ")");
|
int bytesToReadForRecalc =
|
||||||
|
(int)(bytesPerChecksum - partialChunkSizeOnDisk);
|
||||||
|
if (numBytesToDisk < bytesToReadForRecalc) {
|
||||||
|
bytesToReadForRecalc = numBytesToDisk;
|
||||||
}
|
}
|
||||||
partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
|
|
||||||
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
|
partialCrc.update(dataBuf.array(), startByteToDisk,
|
||||||
lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length);
|
bytesToReadForRecalc);
|
||||||
|
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc,
|
||||||
|
checksumSize);
|
||||||
|
crcBytes = copyLastChunkChecksum(buf, checksumSize, buf.length);
|
||||||
|
// prepare to overwrite last checksum
|
||||||
|
adjustCrcFilePosition();
|
||||||
checksumOut.write(buf);
|
checksumOut.write(buf);
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Writing out partial crc for data len " + len);
|
LOG.debug("Writing out partial crc for data len " + len +
|
||||||
|
", skip=" + skip);
|
||||||
}
|
}
|
||||||
partialCrc = null;
|
skip++; // For the partial chunk that was just read.
|
||||||
} else {
|
}
|
||||||
// write checksum
|
|
||||||
|
// Determine how many checksums need to be skipped up to the last
|
||||||
|
// boundary. The checksum after the boundary was already counted
|
||||||
|
// above. Only count the number of checksums skipped up to the
|
||||||
|
// boundary here.
|
||||||
|
long lastChunkBoundary = onDiskLen - (onDiskLen%bytesPerChecksum);
|
||||||
|
long skippedDataBytes = lastChunkBoundary - firstByteInBlock;
|
||||||
|
|
||||||
|
if (skippedDataBytes > 0) {
|
||||||
|
skip += (int)(skippedDataBytes / bytesPerChecksum) +
|
||||||
|
((skippedDataBytes % bytesPerChecksum == 0) ? 0 : 1);
|
||||||
|
}
|
||||||
|
skip *= checksumSize; // Convert to number of bytes
|
||||||
|
|
||||||
|
// write the rest of checksum
|
||||||
final int offset = checksumBuf.arrayOffset() +
|
final int offset = checksumBuf.arrayOffset() +
|
||||||
checksumBuf.position();
|
checksumBuf.position() + skip;
|
||||||
final int end = offset + checksumLen;
|
final int end = offset + checksumLen - skip;
|
||||||
lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize,
|
// If offset > end, there is no more checksum to write.
|
||||||
end);
|
// I.e. a partial chunk checksum rewrite happened and there is no
|
||||||
checksumOut.write(checksumBuf.array(), offset, checksumLen);
|
// more to write after that.
|
||||||
|
if (offset > end) {
|
||||||
|
assert crcBytes != null;
|
||||||
|
lastCrc = crcBytes;
|
||||||
|
} else {
|
||||||
|
final int remainingBytes = checksumLen - skip;
|
||||||
|
lastCrc = copyLastChunkChecksum(checksumBuf.array(),
|
||||||
|
checksumSize, end);
|
||||||
|
checksumOut.write(checksumBuf.array(), offset, remainingBytes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// flush entire packet, sync if requested
|
/// flush entire packet, sync if requested
|
||||||
|
|
Loading…
Reference in New Issue