HDFS-12157. Do fsyncDirectory(..) outside of FSDataset lock. Contributed by Vinayakumar B.
This commit is contained in:
parent
3e2753daa1
commit
74bcc8d1f2
|
@ -1035,8 +1035,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
targetVolume, blockFiles[0].getParentFile(), 0);
|
||||
newReplicaInfo.setNumBytes(blockFiles[1].length());
|
||||
// Finalize the copied files
|
||||
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo,
|
||||
false);
|
||||
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
// Increment numBlocks here as this block moved without knowing to BPS
|
||||
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
|
||||
|
@ -1409,7 +1408,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
bumpReplicaGS(replicaInfo, newGS);
|
||||
// finalize the replica if RBW
|
||||
if (replicaInfo.getState() == ReplicaState.RBW) {
|
||||
finalizeReplica(b.getBlockPoolId(), replicaInfo, false);
|
||||
finalizeReplica(b.getBlockPoolId(), replicaInfo);
|
||||
}
|
||||
return replicaInfo;
|
||||
}
|
||||
|
@ -1777,23 +1776,37 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
@Override // FsDatasetSpi
|
||||
public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
|
||||
throws IOException {
|
||||
ReplicaInfo replicaInfo = null;
|
||||
ReplicaInfo finalizedReplicaInfo = null;
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
if (Thread.interrupted()) {
|
||||
// Don't allow data modifications from interrupted threads
|
||||
throw new IOException("Cannot finalize block from Interrupted Thread");
|
||||
}
|
||||
ReplicaInfo replicaInfo = getReplicaInfo(b);
|
||||
replicaInfo = getReplicaInfo(b);
|
||||
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
|
||||
// this is legal, when recovery happens on a file that has
|
||||
// been opened for append but never modified
|
||||
return;
|
||||
}
|
||||
finalizeReplica(b.getBlockPoolId(), replicaInfo, fsyncDir);
|
||||
finalizedReplicaInfo = finalizeReplica(b.getBlockPoolId(), replicaInfo);
|
||||
}
|
||||
/*
|
||||
* Sync the directory after rename from tmp/rbw to Finalized if
|
||||
* configured. Though rename should be atomic operation, sync on both
|
||||
* dest and src directories are done because IOUtils.fsync() calls
|
||||
* directory's channel sync, not the journal itself.
|
||||
*/
|
||||
if (fsyncDir) {
|
||||
FsVolumeSpi v = replicaInfo.getVolume();
|
||||
File f = replicaInfo.getBlockFile();
|
||||
File dest = finalizedReplicaInfo.getBlockFile();
|
||||
fsyncDirectory(v, dest.getParentFile(), f.getParentFile());
|
||||
}
|
||||
}
|
||||
|
||||
private FinalizedReplica finalizeReplica(String bpid,
|
||||
ReplicaInfo replicaInfo, boolean fsyncDir) throws IOException {
|
||||
private FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo)
|
||||
throws IOException {
|
||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||
FinalizedReplica newReplicaInfo = null;
|
||||
if (replicaInfo.getState() == ReplicaState.RUR &&
|
||||
|
@ -1813,15 +1826,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
|
||||
newReplicaInfo =
|
||||
new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
||||
/*
|
||||
* Sync the directory after rename from tmp/rbw to Finalized if
|
||||
* configured. Though rename should be atomic operation, sync on both
|
||||
* dest and src directories are done because IOUtils.fsync() calls
|
||||
* directory's channel sync, not the journal itself.
|
||||
*/
|
||||
if (fsyncDir) {
|
||||
fsyncDirectory(v, dest.getParentFile(), f.getParentFile());
|
||||
}
|
||||
if (v.isTransientStorage()) {
|
||||
releaseLockedMemory(
|
||||
replicaInfo.getOriginalBytesReserved()
|
||||
|
@ -2811,12 +2815,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
// but it is immediately converted to finalized state within the same
|
||||
// lock, so no need to update it.
|
||||
volumeMap.add(bpid, newReplicaInfo);
|
||||
finalizeReplica(bpid, newReplicaInfo, false);
|
||||
finalizeReplica(bpid, newReplicaInfo);
|
||||
}
|
||||
}
|
||||
|
||||
// finalize the block
|
||||
return finalizeReplica(bpid, rur, false);
|
||||
return finalizeReplica(bpid, rur);
|
||||
}
|
||||
|
||||
private File[] copyReplicaWithNewBlockIdAndGS(
|
||||
|
|
Loading…
Reference in New Issue