diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 210dbdde368..ff81b5aae70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -175,8 +175,13 @@ class BlockSender implements java.io.Closeable { * See {{@link BlockSender#isLongRead()} */ private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024; - + // The number of bytes per checksum here determines the alignment + // of reads: we always start reading at a checksum chunk boundary, + // even if the checksum type is NULL. So, choosing too big of a value + // would risk sending too much unnecessary data. 512 (1 disk sector) + // is likely to result in minimal extra IO. + private static final long CHUNK_SIZE = 512; /** * Constructor * @@ -250,12 +255,6 @@ class BlockSender implements java.io.Closeable { try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); - if (replica instanceof FinalizedReplica) { - // Load last checksum in case the replica is being written - // concurrently - final FinalizedReplica frep = (FinalizedReplica) replica; - chunkChecksum = frep.getLastChecksumAndDataLen(); - } } // if there is a write in progress if (replica instanceof ReplicaBeingWritten) { @@ -263,6 +262,10 @@ class BlockSender implements java.io.Closeable { waitForMinLength(rbw, startOffset + length); chunkChecksum = rbw.getLastChecksumAndDataLen(); } + if (replica instanceof FinalizedReplica) { + chunkChecksum = getPartialChunkChecksumForFinalized( + (FinalizedReplica)replica); + } if (replica.getGenerationStamp() < block.getGenerationStamp()) { throw new IOException("Replica gen stamp < block genstamp, block=" @@ -349,12 +352,8 @@ class BlockSender implements java.io.Closeable { } } if (csum == null) { - // The number of bytes per checksum here determines the alignment - // of reads: we always start reading at a checksum chunk boundary, - // even if the checksum type is NULL. So, choosing too big of a value - // would risk sending too much unnecessary data. 512 (1 disk sector) - // is likely to result in minimal extra IO. - csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512); + csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, + (int)CHUNK_SIZE); } /* @@ -428,6 +427,37 @@ class BlockSender implements java.io.Closeable { } } + private ChunkChecksum getPartialChunkChecksumForFinalized( + FinalizedReplica finalized) throws IOException { + // There are a number of places in the code base where a finalized replica + // object is created. If last partial checksum is loaded whenever a + // finalized replica is created, it would increase latency in DataNode + // initialization. Therefore, the last partial chunk checksum is loaded + // lazily. + + // Load last checksum in case the replica is being written concurrently + final long replicaVisibleLength = replica.getVisibleLength(); + if (replicaVisibleLength % CHUNK_SIZE != 0 && + finalized.getLastPartialChunkChecksum() == null) { + // the finalized replica does not have precomputed last partial + // chunk checksum. Recompute now. + try { + finalized.loadLastPartialChunkChecksum(); + return new ChunkChecksum(finalized.getVisibleLength(), + finalized.getLastPartialChunkChecksum()); + } catch (FileNotFoundException e) { + // meta file is lost. Continue anyway to preserve existing behavior. + DataNode.LOG.warn( + "meta file " + finalized.getMetaFile() + " is missing!"); + return null; + } + } else { + // If the checksum is null, BlockSender will use on-disk checksum. + return new ChunkChecksum(finalized.getVisibleLength(), + finalized.getLastPartialChunkChecksum()); + } + } + /** * close opened files. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java index da211913b3e..3f239bd61de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import org.apache.hadoop.hdfs.protocol.Block; @@ -29,7 +28,7 @@ * This class describes a replica that has been finalized. */ public class FinalizedReplica extends ReplicaInfo { - + private byte[] lastPartialChunkChecksum; /** * Constructor * @param blockId block id @@ -40,9 +39,24 @@ public class FinalizedReplica extends ReplicaInfo { */ public FinalizedReplica(long blockId, long len, long genStamp, FsVolumeSpi vol, File dir) { - super(blockId, len, genStamp, vol, dir); + this(blockId, len, genStamp, vol, dir, null); } - + + /** + * Constructor. + * @param blockId block id + * @param len replica length + * @param genStamp replica generation stamp + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + * @param checksum the last partial chunk checksum + */ + public FinalizedReplica(long blockId, long len, long genStamp, + FsVolumeSpi vol, File dir, byte[] checksum) { + super(blockId, len, genStamp, vol, dir); + this.setLastPartialChunkChecksum(checksum); + } + /** * Constructor * @param block a block @@ -50,7 +64,20 @@ public FinalizedReplica(long blockId, long len, long genStamp, * @param dir directory path where block and meta files are located */ public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) { + this(block, vol, dir, null); + } + + /** + * Constructor. + * @param block a block + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + * @param checksum the last partial chunk checksum + */ + public FinalizedReplica(Block block, FsVolumeSpi vol, File dir, + byte[] checksum) { super(block, vol, dir); + this.setLastPartialChunkChecksum(checksum); } /** @@ -59,6 +86,7 @@ public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) { */ public FinalizedReplica(FinalizedReplica from) { super(from); + this.setLastPartialChunkChecksum(from.getLastPartialChunkChecksum()); } @Override // ReplicaInfo @@ -91,30 +119,18 @@ public String toString() { return super.toString(); } - /** - * gets the last chunk checksum and the length of the block corresponding - * to that checksum. - * Note, need to be called with the FsDataset lock acquired. May improve to - * lock only the FsVolume in the future. - * @throws IOException - */ - public ChunkChecksum getLastChecksumAndDataLen() throws IOException { - ChunkChecksum chunkChecksum = null; - try { - byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum( - getBlockFile(), getMetaFile()); - if (lastChecksum != null) { - chunkChecksum = - new ChunkChecksum(getVisibleLength(), lastChecksum); - } - } catch (FileNotFoundException e) { - // meta file is lost. Try to continue anyway. - DataNode.LOG.warn("meta file " + getMetaFile() + - " is missing!"); - } catch (IOException ioe) { - DataNode.LOG.warn("Unable to read checksum from meta file " + - getMetaFile(), ioe); - } - return chunkChecksum; + public byte[] getLastPartialChunkChecksum() { + return lastPartialChunkChecksum; + } + + public void setLastPartialChunkChecksum(byte[] checksum) { + lastPartialChunkChecksum = checksum; + } + + public void loadLastPartialChunkChecksum() + throws IOException { + byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum( + getBlockFile(), getMetaFile()); + setLastPartialChunkChecksum(lastChecksum); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index d47f22901c9..4486b734ce3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1244,10 +1244,8 @@ private ReplicaBeingWritten append(String bpid, v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); // load last checksum and datalen - byte[] lastChunkChecksum = v.loadLastPartialChunkChecksum( - replicaInfo.getBlockFile(), replicaInfo.getMetaFile()); newReplicaInfo.setLastChecksumAndDataLen( - replicaInfo.getNumBytes(), lastChunkChecksum); + replicaInfo.getNumBytes(), replicaInfo.getLastPartialChunkChecksum()); File newmeta = newReplicaInfo.getMetaFile(); @@ -1805,6 +1803,7 @@ private FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo) == ReplicaState.FINALIZED) { newReplicaInfo = (FinalizedReplica) ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica(); + newReplicaInfo.loadLastPartialChunkChecksum(); } else { FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume(); File f = replicaInfo.getBlockFile(); @@ -1817,6 +1816,19 @@ private FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo) bpid, replicaInfo, f, replicaInfo.getBytesReserved()); newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); + + byte[] checksum = null; + // copy the last partial checksum if the replica is originally + // in finalized or rbw state. + if (replicaInfo.getState() == ReplicaState.FINALIZED) { + FinalizedReplica finalized = (FinalizedReplica)replicaInfo; + checksum = finalized.getLastPartialChunkChecksum(); + } else if (replicaInfo.getState() == ReplicaState.RBW) { + ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; + checksum = rbw.getLastChecksumAndDataLen().getChecksum(); + } + newReplicaInfo.setLastPartialChunkChecksum(checksum); + if (v.isTransientStorage()) { releaseLockedMemory( replicaInfo.getOriginalBytesReserved() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index be3394ec67c..887c635c74c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2871,6 +2871,29 @@ public static File getBlockFile(File storageDir, ExtendedBlock blk) { blk.getBlockPoolId()), blk.getBlockId()), blk.getBlockName()); } + /** + * Return all block files in given directory (recursive search). + */ + public static List getAllBlockFiles(File storageDir) { + List results = new ArrayList(); + File[] files = storageDir.listFiles(); + if (files == null) { + return null; + } + for (File f : files) { + if (f.getName().startsWith(Block.BLOCK_FILE_PREFIX) && + !f.getName().endsWith(Block.METADATA_EXTENSION)) { + results.add(f); + } else if (f.isDirectory()) { + List subdirResults = getAllBlockFiles(f); + if (subdirResults != null) { + results.addAll(subdirResults); + } + } + } + return results; + } + /** * Get the latest metadata file correpsonding to a block * @param storageDir storage directory diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java index 0b273dfa0ac..1f31bdc88ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java @@ -92,7 +92,7 @@ public void testListCorruptFilesCorruptedBlock() throws Exception { File storageDir = cluster.getInstanceStorageDir(0, 1); File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); assertTrue("data directory does not exist", data_dir.exists()); - List metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir); + List metaFiles = MiniDFSCluster.getAllBlockFiles(data_dir); assertTrue("Data directory does not contain any blocks or there was an " + "IO error", metaFiles != null && !metaFiles.isEmpty()); File metaFile = metaFiles.get(0); @@ -172,7 +172,7 @@ public void testListCorruptFileBlocksInSafeMode() throws Exception { File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, cluster.getNamesystem().getBlockPoolId()); assertTrue("data directory does not exist", data_dir.exists()); - List metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir); + List metaFiles = MiniDFSCluster.getAllBlockFiles(data_dir); assertTrue("Data directory does not contain any blocks or there was an " + "IO error", metaFiles != null && !metaFiles.isEmpty()); File metaFile = metaFiles.get(0);