diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 5363aa8ed0f..fa1e872e093 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -188,6 +188,9 @@ Release 2.7.4 - UNRELEASED HDFS-9500. Fix software version counts for DataNodes during rolling upgrade. (Erik Krogen via shv) + HDFS-11056 Concurrent append and read operations lead to checksum error. + (Wei-Chiu Chuang) + Release 2.7.3 - 2016-08-25 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 400a778cad4..4ad863ee787 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -942,7 +942,7 @@ class FsDatasetImpl implements FsDatasetSpi { * @param blockFile block file for which the checksum will be computed * @throws IOException */ - private static void computeChecksum(File srcMeta, File dstMeta, File blockFile) + static void computeChecksum(File srcMeta, File dstMeta, File blockFile) throws IOException { final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta); final byte[] data = new byte[1 << 16]; @@ -1079,7 +1079,30 @@ class FsDatasetImpl implements FsDatasetSpi { } return new ReplicaHandler(replica, ref); } - + + + private byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); + final int checksumSize = dcs.getChecksumSize(); + final long onDiskLen = blockFile.length(); + final int bytesPerChecksum = dcs.getBytesPerChecksum(); + + if (onDiskLen % bytesPerChecksum == 0) { + // the last chunk is a complete one. No need to preserve its checksum + // because it will not be modified. + return null; + } + + int offsetInChecksum = BlockMetadataHeader.getHeaderSize() + + (int)(onDiskLen / bytesPerChecksum * checksumSize); + byte[] lastChecksum = new byte[checksumSize]; + RandomAccessFile raf = new RandomAccessFile(metaFile, "r"); + raf.seek(offsetInChecksum); + raf.read(lastChecksum, 0, checksumSize); + return lastChecksum; + } + /** Append to a finalized replica * Change a finalized replica to be a RBW replica and * bump its generation stamp to be the newGS @@ -1113,6 +1136,13 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); + + // load last checksum and datalen + byte[] lastChunkChecksum = loadLastPartialChunkChecksum( + replicaInfo.getBlockFile(), replicaInfo.getMetaFile()); + newReplicaInfo.setLastChecksumAndDataLen( + replicaInfo.getNumBytes(), lastChunkChecksum); + File newmeta = newReplicaInfo.getMetaFile(); // rename meta file to rbw directory @@ -1435,6 +1465,12 @@ class FsDatasetImpl implements FsDatasetSpi { blockId, numBytes, expectedGs, v, dest.getParentFile(), Thread.currentThread(), 0); rbw.setBytesAcked(visible); + + // load last checksum and datalen + final File destMeta = FsDatasetUtil.getMetaFile(dest, + b.getGenerationStamp()); + byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta); + rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum); // overwrite the RBW in the volume map volumeMap.add(b.getBlockPoolId(), rbw); return rbw; @@ -2466,6 +2502,9 @@ class FsDatasetImpl implements FsDatasetSpi { newBlockId, recoveryId, volume, blockFile.getParentFile(), newlength); newReplicaInfo.setNumBytes(newlength); + // In theory, this rbw replica needs to reload last chunk checksum, + // but it is immediately converted to finalized state within the same + // lock, so no need to update it. volumeMap.add(bpid, newReplicaInfo); finalizeReplica(bpid, newReplicaInfo); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java index adefbdb56f1..6db034e841a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java @@ -22,6 +22,7 @@ import java.io.FilenameFilter; import java.io.IOException; import java.util.Arrays; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.GenerationStamp; @@ -104,4 +105,16 @@ public class FsDatasetUtil { + blockFile + ", metaFile=" + metaFile, nfe); } } + + /** + * Compute the checksum for a block file that does not already have + * its checksum computed, and save it to dstMeta file. + */ + public static void computeChecksum(File srcMeta, File dstMeta, File blockFile) + throws IOException { + Preconditions.checkNotNull(srcMeta); + Preconditions.checkNotNull(dstMeta); + Preconditions.checkNotNull(blockFile); + FsDatasetImpl.computeChecksum(srcMeta, dstMeta, blockFile); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index ff0b9d7c245..c196aae6ac1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -24,9 +24,11 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -42,9 +44,15 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; +import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.DataChecksum; import org.junit.Assert; import org.junit.Test; @@ -57,6 +65,8 @@ public class TestFileAppend{ private static byte[] fileContents = null; + static final DataChecksum DEFAULT_CHECKSUM = + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512); // // writes to file but does not close it // @@ -603,4 +613,62 @@ public class TestFileAppend{ cluster.shutdown(); } } + + @Test(timeout = 10000) + public void testConcurrentAppendRead() + throws IOException, TimeoutException, InterruptedException { + // Create a finalized replica and append to it + // Read block data and checksum. Verify checksum. + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + conf.setInt("dfs.min.replication", 1); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + FsDatasetSpi dataSet = DataNodeTestUtils.getFSDataset(dn); + + // create a file with 1 byte of data. + long initialFileLength = 1; + DistributedFileSystem fs = cluster.getFileSystem(); + Path fileName = new Path("/appendCorruptBlock"); + DFSTestUtil.createFile(fs, fileName, initialFileLength, (short) 1, 0); + DFSTestUtil.waitReplication(fs, fileName, (short) 1); + Assert.assertTrue("File not created", fs.exists(fileName)); + + // Call FsDatasetImpl#append to append the block file, + // which converts it to a rbw replica. + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName); + long newGS = block.getGenerationStamp() + 1; + ReplicaHandler replicaHandler = + dataSet.append(block, newGS, initialFileLength); + + // write data to block file + ReplicaBeingWritten rbw = + (ReplicaBeingWritten) replicaHandler.getReplica(); + ReplicaOutputStreams outputStreams = + rbw.createStreams(false, DEFAULT_CHECKSUM); + OutputStream dataOutput = outputStreams.getDataOut(); + + byte[] appendBytes = new byte[1]; + dataOutput.write(appendBytes, 0, 1); + dataOutput.flush(); + dataOutput.close(); + + // update checksum file + FsDatasetUtil.computeChecksum(rbw.getMetaFile(), rbw.getMetaFile(), + rbw.getBlockFile()); + + // read the block + // the DataNode BlockSender should read from the rbw replica's in-memory + // checksum, rather than on-disk checksum. Otherwise it will see a + // checksum mismatch error. + final byte[] readBlock = DFSTestUtil.readFileBuffer(fs, fileName); + assertEquals("should have read only one byte!", 1, readBlock.length); + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index 9325cdcf733..17558f12a70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; @@ -34,6 +38,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.junit.Assert; import org.junit.Test; @@ -47,7 +52,9 @@ public class TestWriteToReplica { final private static int RWR = 3; final private static int RUR = 4; final private static int NON_EXISTENT = 5; - + + private static final DataChecksum DEFAULT_CHECKSUM = + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512); // test close @Test public void testClose() throws Exception { @@ -129,6 +136,13 @@ public class TestWriteToReplica { cluster.shutdown(); } } + + private void saveMetaFileHeader(File metaFile) throws IOException { + DataOutputStream metaOut = new DataOutputStream( + new FileOutputStream(metaFile)); + BlockMetadataHeader.writeHeader(metaOut, DEFAULT_CHECKSUM); + metaOut.close(); + } /** * Generate testing environment and return a collection of blocks @@ -156,6 +170,7 @@ public class TestWriteToReplica { replicasMap.add(bpid, replicaInfo); replicaInfo.getBlockFile().createNewFile(); replicaInfo.getMetaFile().createNewFile(); + saveMetaFileHeader(replicaInfo.getMetaFile()); replicasMap.add(bpid, new ReplicaInPipeline( blocks[TEMPORARY].getBlockId(),