diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index ce3c144c2f9..c2efb4d4232 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1168,7 +1168,30 @@ class FsDatasetImpl implements FsDatasetSpi { return new ReplicaHandler(replica, ref); } } - + + + private byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); + final int checksumSize = dcs.getChecksumSize(); + final long onDiskLen = blockFile.length(); + final int bytesPerChecksum = dcs.getBytesPerChecksum(); + + if (onDiskLen % bytesPerChecksum == 0) { + // the last chunk is a complete one. No need to preserve its checksum + // because it will not be modified. + return null; + } + + int offsetInChecksum = BlockMetadataHeader.getHeaderSize() + + (int)(onDiskLen / bytesPerChecksum * checksumSize); + byte[] lastChecksum = new byte[checksumSize]; + RandomAccessFile raf = new RandomAccessFile(metaFile, "r"); + raf.seek(offsetInChecksum); + raf.read(lastChecksum, 0, checksumSize); + return lastChecksum; + } + /** Append to a finalized replica * Change a finalized replica to be a RBW replica and * bump its generation stamp to be the newGS @@ -1205,6 +1228,13 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); + + // load last checksum and datalen + byte[] lastChunkChecksum = loadLastPartialChunkChecksum( + replicaInfo.getBlockFile(), replicaInfo.getMetaFile()); + newReplicaInfo.setLastChecksumAndDataLen( + replicaInfo.getNumBytes(), lastChunkChecksum); + File newmeta = newReplicaInfo.getMetaFile(); // rename meta file to rbw directory @@ -1584,6 +1614,12 @@ class FsDatasetImpl implements FsDatasetSpi { blockId, numBytes, expectedGs, v, dest.getParentFile(), Thread.currentThread(), 0); rbw.setBytesAcked(visible); + + // load last checksum and datalen + final File destMeta = FsDatasetUtil.getMetaFile(dest, + b.getGenerationStamp()); + byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta); + rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum); // overwrite the RBW in the volume map volumeMap.add(b.getBlockPoolId(), rbw); return rbw; @@ -2666,6 +2702,9 @@ class FsDatasetImpl implements FsDatasetSpi { newBlockId, recoveryId, volume, blockFile.getParentFile(), newlength); newReplicaInfo.setNumBytes(newlength); + // In theory, this rbw replica needs to reload last chunk checksum, + // but it is immediately converted to finalized state within the same + // lock, so no need to update it. volumeMap.add(bpid, newReplicaInfo); finalizeReplica(bpid, newReplicaInfo); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index ada4224b7f3..3035bbbd149 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -24,10 +24,12 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.util.EnumSet; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -44,10 +46,16 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; +import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.junit.Assert; import org.junit.Test; @@ -61,6 +69,8 @@ public class TestFileAppend{ private static byte[] fileContents = null; + static final DataChecksum DEFAULT_CHECKSUM = + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512); // // writes to file but does not close it // @@ -656,4 +666,63 @@ public class TestFileAppend{ cluster.shutdown(); } } + + @Test(timeout = 10000) + public void testConcurrentAppendRead() + throws IOException, TimeoutException, InterruptedException { + // Create a finalized replica and append to it + // Read block data and checksum. Verify checksum. + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + conf.setInt("dfs.min.replication", 1); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + FsDatasetSpi dataSet = DataNodeTestUtils.getFSDataset(dn); + + // create a file with 1 byte of data. + long initialFileLength = 1; + DistributedFileSystem fs = cluster.getFileSystem(); + Path fileName = new Path("/appendCorruptBlock"); + DFSTestUtil.createFile(fs, fileName, initialFileLength, (short) 1, 0); + DFSTestUtil.waitReplication(fs, fileName, (short) 1); + Assert.assertTrue("File not created", fs.exists(fileName)); + + // Call FsDatasetImpl#append to append the block file, + // which converts it to a rbw replica. + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName); + long newGS = block.getGenerationStamp() + 1; + ReplicaHandler replicaHandler = + dataSet.append(block, newGS, initialFileLength); + + // write data to block file + ReplicaBeingWritten rbw = + (ReplicaBeingWritten) replicaHandler.getReplica(); + ReplicaOutputStreams outputStreams = + rbw.createStreams(false, DEFAULT_CHECKSUM); + OutputStream dataOutput = outputStreams.getDataOut(); + + byte[] appendBytes = new byte[1]; + dataOutput.write(appendBytes, 0, 1); + dataOutput.flush(); + dataOutput.close(); + + // update checksum file + final int smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); + FsDatasetUtil.computeChecksum(rbw.getMetaFile(), rbw.getMetaFile(), + rbw.getBlockFile(), smallBufferSize, conf); + + // read the block + // the DataNode BlockSender should read from the rbw replica's in-memory + // checksum, rather than on-disk checksum. Otherwise it will see a + // checksum mismatch error. + final byte[] readBlock = DFSTestUtil.readFileBuffer(fs, fileName); + assertEquals("should have read only one byte!", 1, readBlock.length); + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java index f16ca956d10..3df9337671f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.DF; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; @@ -42,10 +43,13 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.DataChecksum; import org.apache.log4j.Level; +import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; @@ -63,6 +67,8 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils { LogFactory.getLog(FsDatasetImplTestUtils.class); private final FsDatasetImpl dataset; + private static final DataChecksum DEFAULT_CHECKSUM = + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512); /** * By default we assume 2 data directories (volumes) per DataNode. */ @@ -220,9 +226,18 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils { dataset.volumeMap.add(block.getBlockPoolId(), info); info.getBlockFile().createNewFile(); info.getMetaFile().createNewFile(); + saveMetaFileHeader(info.getMetaFile()); return info; } + private void saveMetaFileHeader(File metaFile) throws IOException { + DataOutputStream metaOut = new DataOutputStream( + new FileOutputStream(metaFile)); + BlockMetadataHeader.writeHeader(metaOut, DEFAULT_CHECKSUM); + metaOut.close(); + } + + @Override public Replica createReplicaInPipeline(ExtendedBlock block) throws IOException {