diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 5bf56123d79..7abccb986df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1170,7 +1170,30 @@ public ReplicaHandler append(ExtendedBlock b, return new ReplicaHandler(replica, ref); } } - + + + private byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); + final int checksumSize = dcs.getChecksumSize(); + final long onDiskLen = blockFile.length(); + final int bytesPerChecksum = dcs.getBytesPerChecksum(); + + if (onDiskLen % bytesPerChecksum == 0) { + // the last chunk is a complete one. No need to preserve its checksum + // because it will not be modified. + return null; + } + + int offsetInChecksum = BlockMetadataHeader.getHeaderSize() + + (int)(onDiskLen / bytesPerChecksum * checksumSize); + byte[] lastChecksum = new byte[checksumSize]; + RandomAccessFile raf = new RandomAccessFile(metaFile, "r"); + raf.seek(offsetInChecksum); + raf.read(lastChecksum, 0, checksumSize); + return lastChecksum; + } + /** Append to a finalized replica * Change a finalized replica to be a RBW replica and * bump its generation stamp to be the newGS @@ -1207,6 +1230,13 @@ private ReplicaBeingWritten append(String bpid, ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); + + // load last checksum and datalen + byte[] lastChunkChecksum = loadLastPartialChunkChecksum( + replicaInfo.getBlockFile(), replicaInfo.getMetaFile()); + newReplicaInfo.setLastChecksumAndDataLen( + replicaInfo.getNumBytes(), lastChunkChecksum); + File newmeta = newReplicaInfo.getMetaFile(); // rename meta file to rbw directory @@ -1586,6 +1616,12 @@ public ReplicaInPipeline convertTemporaryToRbw( blockId, numBytes, expectedGs, v, dest.getParentFile(), Thread.currentThread(), 0); rbw.setBytesAcked(visible); + + // load last checksum and datalen + final File destMeta = FsDatasetUtil.getMetaFile(dest, + b.getGenerationStamp()); + byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta); + rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum); // overwrite the RBW in the volume map volumeMap.add(b.getBlockPoolId(), rbw); return rbw; @@ -2668,6 +2704,9 @@ private FinalizedReplica updateReplicaUnderRecovery( newBlockId, recoveryId, volume, blockFile.getParentFile(), newlength); newReplicaInfo.setNumBytes(newlength); + // In theory, this rbw replica needs to reload last chunk checksum, + // but it is immediately converted to finalized state within the same + // lock, so no need to update it. volumeMap.add(bpid, newReplicaInfo); finalizeReplica(bpid, newReplicaInfo); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index ada4224b7f3..3035bbbd149 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -24,10 +24,12 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.util.EnumSet; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -44,10 +46,16 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; +import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.junit.Assert; import org.junit.Test; @@ -61,6 +69,8 @@ public class TestFileAppend{ private static byte[] fileContents = null; + static final DataChecksum DEFAULT_CHECKSUM = + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512); // // writes to file but does not close it // @@ -656,4 +666,63 @@ public void testAppendCorruptedBlock() throws Exception { cluster.shutdown(); } } + + @Test(timeout = 10000) + public void testConcurrentAppendRead() + throws IOException, TimeoutException, InterruptedException { + // Create a finalized replica and append to it + // Read block data and checksum. Verify checksum. + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + conf.setInt("dfs.min.replication", 1); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + FsDatasetSpi dataSet = DataNodeTestUtils.getFSDataset(dn); + + // create a file with 1 byte of data. + long initialFileLength = 1; + DistributedFileSystem fs = cluster.getFileSystem(); + Path fileName = new Path("/appendCorruptBlock"); + DFSTestUtil.createFile(fs, fileName, initialFileLength, (short) 1, 0); + DFSTestUtil.waitReplication(fs, fileName, (short) 1); + Assert.assertTrue("File not created", fs.exists(fileName)); + + // Call FsDatasetImpl#append to append the block file, + // which converts it to a rbw replica. + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName); + long newGS = block.getGenerationStamp() + 1; + ReplicaHandler replicaHandler = + dataSet.append(block, newGS, initialFileLength); + + // write data to block file + ReplicaBeingWritten rbw = + (ReplicaBeingWritten) replicaHandler.getReplica(); + ReplicaOutputStreams outputStreams = + rbw.createStreams(false, DEFAULT_CHECKSUM); + OutputStream dataOutput = outputStreams.getDataOut(); + + byte[] appendBytes = new byte[1]; + dataOutput.write(appendBytes, 0, 1); + dataOutput.flush(); + dataOutput.close(); + + // update checksum file + final int smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); + FsDatasetUtil.computeChecksum(rbw.getMetaFile(), rbw.getMetaFile(), + rbw.getBlockFile(), smallBufferSize, conf); + + // read the block + // the DataNode BlockSender should read from the rbw replica's in-memory + // checksum, rather than on-disk checksum. Otherwise it will see a + // checksum mismatch error. + final byte[] readBlock = DFSTestUtil.readFileBuffer(fs, fileName); + assertEquals("should have read only one byte!", 1, readBlock.length); + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java index 116a6af1fb0..803bcb16ca7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; @@ -43,10 +44,13 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.DataChecksum; import org.apache.log4j.Level; +import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; @@ -67,6 +71,8 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils { LogFactory.getLog(FsDatasetImplTestUtils.class); private final FsDatasetImpl dataset; + private static final DataChecksum DEFAULT_CHECKSUM = + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512); /** * By default we assume 2 data directories (volumes) per DataNode. */ @@ -245,9 +251,18 @@ public Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block) dataset.volumeMap.add(block.getBlockPoolId(), info); info.getBlockFile().createNewFile(); info.getMetaFile().createNewFile(); + saveMetaFileHeader(info.getMetaFile()); return info; } + private void saveMetaFileHeader(File metaFile) throws IOException { + DataOutputStream metaOut = new DataOutputStream( + new FileOutputStream(metaFile)); + BlockMetadataHeader.writeHeader(metaOut, DEFAULT_CHECKSUM); + metaOut.close(); + } + + @Override public Replica createReplicaInPipeline(ExtendedBlock block) throws IOException {