From 639380efff495c17bb19f4e2657d1b51f1f04c4f Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Wed, 9 Aug 2017 09:33:07 -0500 Subject: [PATCH] HDFS-12157. Do fsyncDirectory(..) outside of FSDataset lock. Contributed by Vinayakumar B. (cherry picked from commit 74bcc8d1f213ffd18056b7d0590d0cd346abaff9) --- .../fsdataset/impl/FsDatasetImpl.java | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 6468d65b069..ce0f7c0797f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1010,8 +1010,7 @@ class FsDatasetImpl implements FsDatasetSpi { targetVolume, blockFiles[0].getParentFile(), 0); newReplicaInfo.setNumBytes(blockFiles[1].length()); // Finalize the copied files - newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo, - false); + newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); try(AutoCloseableLock lock = datasetLock.acquire()) { // Increment numBlocks here as this block moved without knowing to BPS FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); @@ -1372,7 +1371,7 @@ class FsDatasetImpl implements FsDatasetSpi { bumpReplicaGS(replicaInfo, newGS); // finalize the replica if RBW if (replicaInfo.getState() == ReplicaState.RBW) { - finalizeReplica(b.getBlockPoolId(), replicaInfo, false); + finalizeReplica(b.getBlockPoolId(), replicaInfo); } return replicaInfo; } @@ -1737,23 +1736,37 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) throws IOException { + ReplicaInfo replicaInfo = null; + ReplicaInfo finalizedReplicaInfo = null; try(AutoCloseableLock lock = datasetLock.acquire()) { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads throw new IOException("Cannot finalize block from Interrupted Thread"); } - ReplicaInfo replicaInfo = getReplicaInfo(b); + replicaInfo = getReplicaInfo(b); if (replicaInfo.getState() == ReplicaState.FINALIZED) { // this is legal, when recovery happens on a file that has // been opened for append but never modified return; } - finalizeReplica(b.getBlockPoolId(), replicaInfo, fsyncDir); + finalizedReplicaInfo = finalizeReplica(b.getBlockPoolId(), replicaInfo); + } + /* + * Sync the directory after rename from tmp/rbw to Finalized if + * configured. Though rename should be atomic operation, sync on both + * dest and src directories are done because IOUtils.fsync() calls + * directory's channel sync, not the journal itself. + */ + if (fsyncDir) { + FsVolumeSpi v = replicaInfo.getVolume(); + File f = replicaInfo.getBlockFile(); + File dest = finalizedReplicaInfo.getBlockFile(); + fsyncDirectory(v, dest.getParentFile(), f.getParentFile()); } } - private FinalizedReplica finalizeReplica(String bpid, - ReplicaInfo replicaInfo, boolean fsyncDir) throws IOException { + private FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo) + throws IOException { try(AutoCloseableLock lock = datasetLock.acquire()) { FinalizedReplica newReplicaInfo = null; if (replicaInfo.getState() == ReplicaState.RUR && @@ -1773,15 +1786,6 @@ class FsDatasetImpl implements FsDatasetSpi { bpid, replicaInfo, f, replicaInfo.getBytesReserved()); newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); - /* - * Sync the directory after rename from tmp/rbw to Finalized if - * configured. Though rename should be atomic operation, sync on both - * dest and src directories are done because IOUtils.fsync() calls - * directory's channel sync, not the journal itself. - */ - if (fsyncDir) { - fsyncDirectory(v, dest.getParentFile(), f.getParentFile()); - } if (v.isTransientStorage()) { releaseLockedMemory( replicaInfo.getOriginalBytesReserved() @@ -2755,12 +2759,12 @@ class FsDatasetImpl implements FsDatasetSpi { // but it is immediately converted to finalized state within the same // lock, so no need to update it. volumeMap.add(bpid, newReplicaInfo); - finalizeReplica(bpid, newReplicaInfo, false); + finalizeReplica(bpid, newReplicaInfo); } } // finalize the block - return finalizeReplica(bpid, rur, false); + return finalizeReplica(bpid, rur); } private File[] copyReplicaWithNewBlockIdAndGS(