HDFS-12157. Do fsyncDirectory(..) outside of FSDataset lock. Contributed by Vinayakumar B.

(cherry picked from commit 74bcc8d1f2)
This commit is contained in:
Kihwal Lee 2017-08-09 09:33:07 -05:00
parent 49d459a3ed
commit 639380efff
1 changed files with 22 additions and 18 deletions

View File

@ -1010,8 +1010,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
targetVolume, blockFiles[0].getParentFile(), 0);
newReplicaInfo.setNumBytes(blockFiles[1].length());
// Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo,
false);
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
try(AutoCloseableLock lock = datasetLock.acquire()) {
// Increment numBlocks here as this block moved without knowing to BPS
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
@ -1372,7 +1371,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
bumpReplicaGS(replicaInfo, newGS);
// finalize the replica if RBW
if (replicaInfo.getState() == ReplicaState.RBW) {
finalizeReplica(b.getBlockPoolId(), replicaInfo, false);
finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
return replicaInfo;
}
@ -1737,23 +1736,37 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
throws IOException {
ReplicaInfo replicaInfo = null;
ReplicaInfo finalizedReplicaInfo = null;
try(AutoCloseableLock lock = datasetLock.acquire()) {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
}
ReplicaInfo replicaInfo = getReplicaInfo(b);
replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
// this is legal, when recovery happens on a file that has
// been opened for append but never modified
return;
}
finalizeReplica(b.getBlockPoolId(), replicaInfo, fsyncDir);
finalizedReplicaInfo = finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
/*
* Sync the directory after rename from tmp/rbw to Finalized if
* configured. Though rename should be atomic operation, sync on both
* dest and src directories are done because IOUtils.fsync() calls
* directory's channel sync, not the journal itself.
*/
if (fsyncDir) {
FsVolumeSpi v = replicaInfo.getVolume();
File f = replicaInfo.getBlockFile();
File dest = finalizedReplicaInfo.getBlockFile();
fsyncDirectory(v, dest.getParentFile(), f.getParentFile());
}
}
private FinalizedReplica finalizeReplica(String bpid,
ReplicaInfo replicaInfo, boolean fsyncDir) throws IOException {
private FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo)
throws IOException {
try(AutoCloseableLock lock = datasetLock.acquire()) {
FinalizedReplica newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR &&
@ -1773,15 +1786,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
newReplicaInfo =
new FinalizedReplica(replicaInfo, v, dest.getParentFile());
/*
* Sync the directory after rename from tmp/rbw to Finalized if
* configured. Though rename should be atomic operation, sync on both
* dest and src directories are done because IOUtils.fsync() calls
* directory's channel sync, not the journal itself.
*/
if (fsyncDir) {
fsyncDirectory(v, dest.getParentFile(), f.getParentFile());
}
if (v.isTransientStorage()) {
releaseLockedMemory(
replicaInfo.getOriginalBytesReserved()
@ -2755,12 +2759,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// but it is immediately converted to finalized state within the same
// lock, so no need to update it.
volumeMap.add(bpid, newReplicaInfo);
finalizeReplica(bpid, newReplicaInfo, false);
finalizeReplica(bpid, newReplicaInfo);
}
}
// finalize the block
return finalizeReplica(bpid, rur, false);
return finalizeReplica(bpid, rur);
}
private File[] copyReplicaWithNewBlockIdAndGS(