HDFS-12157. Do fsyncDirectory(..) outside of FSDataset lock. Contributed by Vinayakumar B.
(cherry picked from commit 74bcc8d1f213ffd18056b7d0590d0cd346abaff9)
This commit is contained in:
parent
68380262ff
commit
5758dad28f
@ -1010,8 +1010,7 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
|
|||||||
targetVolume, blockFiles[0].getParentFile(), 0);
|
targetVolume, blockFiles[0].getParentFile(), 0);
|
||||||
newReplicaInfo.setNumBytes(blockFiles[1].length());
|
newReplicaInfo.setNumBytes(blockFiles[1].length());
|
||||||
// Finalize the copied files
|
// Finalize the copied files
|
||||||
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo,
|
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
|
||||||
false);
|
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||||
// Increment numBlocks here as this block moved without knowing to BPS
|
// Increment numBlocks here as this block moved without knowing to BPS
|
||||||
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
|
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
|
||||||
@ -1372,7 +1371,7 @@ public Replica recoverClose(ExtendedBlock b, long newGS,
|
|||||||
bumpReplicaGS(replicaInfo, newGS);
|
bumpReplicaGS(replicaInfo, newGS);
|
||||||
// finalize the replica if RBW
|
// finalize the replica if RBW
|
||||||
if (replicaInfo.getState() == ReplicaState.RBW) {
|
if (replicaInfo.getState() == ReplicaState.RBW) {
|
||||||
finalizeReplica(b.getBlockPoolId(), replicaInfo, false);
|
finalizeReplica(b.getBlockPoolId(), replicaInfo);
|
||||||
}
|
}
|
||||||
return replicaInfo;
|
return replicaInfo;
|
||||||
}
|
}
|
||||||
@ -1737,23 +1736,37 @@ public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams strea
|
|||||||
@Override // FsDatasetSpi
|
@Override // FsDatasetSpi
|
||||||
public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
|
public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
ReplicaInfo replicaInfo = null;
|
||||||
|
ReplicaInfo finalizedReplicaInfo = null;
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||||
if (Thread.interrupted()) {
|
if (Thread.interrupted()) {
|
||||||
// Don't allow data modifications from interrupted threads
|
// Don't allow data modifications from interrupted threads
|
||||||
throw new IOException("Cannot finalize block from Interrupted Thread");
|
throw new IOException("Cannot finalize block from Interrupted Thread");
|
||||||
}
|
}
|
||||||
ReplicaInfo replicaInfo = getReplicaInfo(b);
|
replicaInfo = getReplicaInfo(b);
|
||||||
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
|
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
|
||||||
// this is legal, when recovery happens on a file that has
|
// this is legal, when recovery happens on a file that has
|
||||||
// been opened for append but never modified
|
// been opened for append but never modified
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
finalizeReplica(b.getBlockPoolId(), replicaInfo, fsyncDir);
|
finalizedReplicaInfo = finalizeReplica(b.getBlockPoolId(), replicaInfo);
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* Sync the directory after rename from tmp/rbw to Finalized if
|
||||||
|
* configured. Though rename should be atomic operation, sync on both
|
||||||
|
* dest and src directories are done because IOUtils.fsync() calls
|
||||||
|
* directory's channel sync, not the journal itself.
|
||||||
|
*/
|
||||||
|
if (fsyncDir) {
|
||||||
|
FsVolumeSpi v = replicaInfo.getVolume();
|
||||||
|
File f = replicaInfo.getBlockFile();
|
||||||
|
File dest = finalizedReplicaInfo.getBlockFile();
|
||||||
|
fsyncDirectory(v, dest.getParentFile(), f.getParentFile());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private FinalizedReplica finalizeReplica(String bpid,
|
private FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo)
|
||||||
ReplicaInfo replicaInfo, boolean fsyncDir) throws IOException {
|
throws IOException {
|
||||||
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
try(AutoCloseableLock lock = datasetLock.acquire()) {
|
||||||
FinalizedReplica newReplicaInfo = null;
|
FinalizedReplica newReplicaInfo = null;
|
||||||
if (replicaInfo.getState() == ReplicaState.RUR &&
|
if (replicaInfo.getState() == ReplicaState.RUR &&
|
||||||
@ -1773,15 +1786,6 @@ private FinalizedReplica finalizeReplica(String bpid,
|
|||||||
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
|
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
|
||||||
newReplicaInfo =
|
newReplicaInfo =
|
||||||
new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
||||||
/*
|
|
||||||
* Sync the directory after rename from tmp/rbw to Finalized if
|
|
||||||
* configured. Though rename should be atomic operation, sync on both
|
|
||||||
* dest and src directories are done because IOUtils.fsync() calls
|
|
||||||
* directory's channel sync, not the journal itself.
|
|
||||||
*/
|
|
||||||
if (fsyncDir) {
|
|
||||||
fsyncDirectory(v, dest.getParentFile(), f.getParentFile());
|
|
||||||
}
|
|
||||||
if (v.isTransientStorage()) {
|
if (v.isTransientStorage()) {
|
||||||
releaseLockedMemory(
|
releaseLockedMemory(
|
||||||
replicaInfo.getOriginalBytesReserved()
|
replicaInfo.getOriginalBytesReserved()
|
||||||
@ -2755,12 +2759,12 @@ private FinalizedReplica updateReplicaUnderRecovery(
|
|||||||
// but it is immediately converted to finalized state within the same
|
// but it is immediately converted to finalized state within the same
|
||||||
// lock, so no need to update it.
|
// lock, so no need to update it.
|
||||||
volumeMap.add(bpid, newReplicaInfo);
|
volumeMap.add(bpid, newReplicaInfo);
|
||||||
finalizeReplica(bpid, newReplicaInfo, false);
|
finalizeReplica(bpid, newReplicaInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// finalize the block
|
// finalize the block
|
||||||
return finalizeReplica(bpid, rur, false);
|
return finalizeReplica(bpid, rur);
|
||||||
}
|
}
|
||||||
|
|
||||||
private File[] copyReplicaWithNewBlockIdAndGS(
|
private File[] copyReplicaWithNewBlockIdAndGS(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user