From c619e9b43fd00ba0e59a98ae09685ff719bb722b Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Wed, 9 Nov 2016 09:15:51 -0800 Subject: [PATCH] HDFS-11056. Concurrent append and read operations lead to checksum error. Contributed by Wei-Chiu Chuang. --- .../datanode/fsdataset/impl/FsVolumeImpl.java | 41 +++++++++++ .../apache/hadoop/hdfs/TestFileAppend.java | 71 +++++++++++++++++++ .../impl/FsDatasetImplTestUtils.java | 14 ++++ 3 files changed, 126 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 16278656490..5880b3e598c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -23,6 +23,7 @@ import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; import java.io.OutputStreamWriter; +import java.io.RandomAccessFile; import java.net.URI; import java.nio.channels.ClosedChannelException; import java.nio.file.Files; @@ -47,6 +48,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; @@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.server.datanode.LocalReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline; @@ -1102,6 +1105,28 @@ public class FsVolumeImpl implements FsVolumeSpi { } + private byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); + final int checksumSize = dcs.getChecksumSize(); + final long onDiskLen = blockFile.length(); + final int bytesPerChecksum = dcs.getBytesPerChecksum(); + + if (onDiskLen % bytesPerChecksum == 0) { + // the last chunk is a complete one. No need to preserve its checksum + // because it will not be modified. + return null; + } + + int offsetInChecksum = BlockMetadataHeader.getHeaderSize() + + (int)(onDiskLen / bytesPerChecksum * checksumSize); + byte[] lastChecksum = new byte[checksumSize]; + RandomAccessFile raf = new RandomAccessFile(metaFile, "r"); + raf.seek(offsetInChecksum); + raf.read(lastChecksum, 0, checksumSize); + return lastChecksum; + } + public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo, long newGS, long estimateBlockLen) throws IOException { @@ -1126,6 +1151,13 @@ public class FsVolumeImpl implements FsVolumeSpi { .setBytesToReserve(bytesReserved) .buildLocalReplicaInPipeline(); + // load last checksum and datalen + LocalReplica localReplica = (LocalReplica)replicaInfo; + byte[] lastChunkChecksum = loadLastPartialChunkChecksum( + localReplica.getBlockFile(), localReplica.getMetaFile()); + newReplicaInfo.setLastChecksumAndDataLen( + replicaInfo.getNumBytes(), lastChunkChecksum); + // rename meta file to rbw directory // rename block file to rbw directory newReplicaInfo.moveReplicaFrom(replicaInfo, newBlkFile); @@ -1170,6 +1202,12 @@ public class FsVolumeImpl implements FsVolumeSpi { .setBytesToReserve(0) .buildLocalReplicaInPipeline(); rbw.setBytesAcked(visible); + + // load last checksum and datalen + final File destMeta = FsDatasetUtil.getMetaFile(dest, + b.getGenerationStamp()); + byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta); + rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum); return rbw; } @@ -1206,6 +1244,9 @@ public class FsVolumeImpl implements FsVolumeSpi { .setDirectoryToUse(blockFile.getParentFile()) .setBytesToReserve(newlength) .buildLocalReplicaInPipeline(); + // In theory, this rbw replica needs to reload last chunk checksum, + // but it is immediately converted to finalized state within the same lock, + // so no need to update it. return newReplicaInfo; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index 3e8561f1572..20cec6a2c8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -24,10 +24,12 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.util.EnumSet; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -44,10 +46,16 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; +import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.junit.Assert; import org.junit.Test; @@ -61,6 +69,8 @@ public class TestFileAppend{ private static byte[] fileContents = null; + static final DataChecksum DEFAULT_CHECKSUM = + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512); // // writes to file but does not close it // @@ -655,4 +665,65 @@ public class TestFileAppend{ cluster.shutdown(); } } + + @Test(timeout = 10000) + public void testConcurrentAppendRead() + throws IOException, TimeoutException, InterruptedException { + // Create a finalized replica and append to it + // Read block data and checksum. Verify checksum. + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + conf.setInt("dfs.min.replication", 1); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + FsDatasetSpi dataSet = DataNodeTestUtils.getFSDataset(dn); + + // create a file with 1 byte of data. + long initialFileLength = 1; + DistributedFileSystem fs = cluster.getFileSystem(); + Path fileName = new Path("/appendCorruptBlock"); + DFSTestUtil.createFile(fs, fileName, initialFileLength, (short) 1, 0); + DFSTestUtil.waitReplication(fs, fileName, (short) 1); + Assert.assertTrue("File not created", fs.exists(fileName)); + + // Call FsDatasetImpl#append to append the block file, + // which converts it to a rbw replica. + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName); + long newGS = block.getGenerationStamp()+1; + ReplicaHandler + replicaHandler = dataSet.append(block, newGS, initialFileLength); + + // write data to block file + ReplicaBeingWritten rbw = + (ReplicaBeingWritten)replicaHandler.getReplica(); + ReplicaOutputStreams + outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM); + OutputStream dataOutput = outputStreams.getDataOut(); + + byte[] appendBytes = new byte[1]; + dataOutput.write(appendBytes, 0, 1); + dataOutput.flush(); + dataOutput.close(); + + // update checksum file + final int smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); + FsDatasetUtil.computeChecksum( + rbw.getMetaFile(), rbw.getMetaFile(), rbw.getBlockFile(), + smallBufferSize, conf); + + // read the block + // the DataNode BlockSender should read from the rbw replica's in-memory + // checksum, rather than on-disk checksum. Otherwise it will see a + // checksum mismatch error. + final byte[] readBlock = DFSTestUtil.readFileBuffer(fs, fileName); + assertEquals("should have read only one byte!", 1, readBlock.length); + } finally { + cluster.shutdown(); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java index 07ddb5984b9..05b41fb6322 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.DF; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; @@ -43,10 +44,13 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.DataChecksum; import org.apache.log4j.Level; +import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; @@ -67,6 +71,8 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils { LogFactory.getLog(FsDatasetImplTestUtils.class); private final FsDatasetImpl dataset; + private static final DataChecksum DEFAULT_CHECKSUM = + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512); /** * By default we assume 2 data directories (volumes) per DataNode. */ @@ -245,9 +251,17 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils { dataset.volumeMap.add(block.getBlockPoolId(), info); info.getBlockFile().createNewFile(); info.getMetaFile().createNewFile(); + saveMetaFileHeader(info.getMetaFile()); return info; } + private void saveMetaFileHeader(File metaFile) throws IOException { + DataOutputStream metaOut = new DataOutputStream( + new FileOutputStream(metaFile)); + BlockMetadataHeader.writeHeader(metaOut, DEFAULT_CHECKSUM); + metaOut.close(); + } + @Override public Replica createReplicaInPipeline(ExtendedBlock block) throws IOException {