HDFS-12157. Do fsyncDirectory(..) outside of FSDataset lock. Contributed by inayakumar B.

This commit is contained in:
Kihwal Lee 2017-08-09 09:03:51 -05:00
parent 1a18d5e514
commit 69afa26f19
1 changed files with 24 additions and 22 deletions

View File

@ -991,8 +991,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
replicaInfo, smallBufferSize, conf); replicaInfo, smallBufferSize, conf);
// Finalize the copied files // Finalize the copied files
newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo, newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
false);
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetLock.acquire()) {
// Increment numBlocks here as this block moved without knowing to BPS // Increment numBlocks here as this block moved without knowing to BPS
FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
@ -1295,7 +1294,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
replicaInfo.bumpReplicaGS(newGS); replicaInfo.bumpReplicaGS(newGS);
// finalize the replica if RBW // finalize the replica if RBW
if (replicaInfo.getState() == ReplicaState.RBW) { if (replicaInfo.getState() == ReplicaState.RBW) {
finalizeReplica(b.getBlockPoolId(), replicaInfo, false); finalizeReplica(b.getBlockPoolId(), replicaInfo);
} }
return replicaInfo; return replicaInfo;
} }
@ -1625,23 +1624,39 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
throws IOException { throws IOException {
ReplicaInfo replicaInfo = null;
ReplicaInfo finalizedReplicaInfo = null;
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetLock.acquire()) {
if (Thread.interrupted()) { if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads // Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread"); throw new IOException("Cannot finalize block from Interrupted Thread");
} }
ReplicaInfo replicaInfo = getReplicaInfo(b); replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) { if (replicaInfo.getState() == ReplicaState.FINALIZED) {
// this is legal, when recovery happens on a file that has // this is legal, when recovery happens on a file that has
// been opened for append but never modified // been opened for append but never modified
return; return;
} }
finalizeReplica(b.getBlockPoolId(), replicaInfo, fsyncDir); finalizedReplicaInfo = finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
/*
* Sync the directory after rename from tmp/rbw to Finalized if
* configured. Though rename should be atomic operation, sync on both
* dest and src directories are done because IOUtils.fsync() calls
* directory's channel sync, not the journal itself.
*/
if (fsyncDir && finalizedReplicaInfo instanceof FinalizedReplica
&& replicaInfo instanceof LocalReplica) {
FinalizedReplica finalizedReplica =
(FinalizedReplica) finalizedReplicaInfo;
finalizedReplica.fsyncDirectory();
LocalReplica localReplica = (LocalReplica) replicaInfo;
localReplica.fsyncDirectory();
} }
} }
private ReplicaInfo finalizeReplica(String bpid, private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
ReplicaInfo replicaInfo, boolean fsyncDir) throws IOException { throws IOException {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo newReplicaInfo = null; ReplicaInfo newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR && if (replicaInfo.getState() == ReplicaState.RUR &&
@ -1656,19 +1671,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newReplicaInfo = v.addFinalizedBlock( newReplicaInfo = v.addFinalizedBlock(
bpid, replicaInfo, replicaInfo, replicaInfo.getBytesReserved()); bpid, replicaInfo, replicaInfo, replicaInfo.getBytesReserved());
/*
* Sync the directory after rename from tmp/rbw to Finalized if
* configured. Though rename should be atomic operation, sync on both
* dest and src directories are done because IOUtils.fsync() calls
* directory's channel sync, not the journal itself.
*/
if (fsyncDir && newReplicaInfo instanceof FinalizedReplica
&& replicaInfo instanceof LocalReplica) {
FinalizedReplica finalizedReplica = (FinalizedReplica) newReplicaInfo;
finalizedReplica.fsyncDirectory();
LocalReplica localReplica = (LocalReplica) replicaInfo;
localReplica.fsyncDirectory();
}
if (v.isTransientStorage()) { if (v.isTransientStorage()) {
releaseLockedMemory( releaseLockedMemory(
replicaInfo.getOriginalBytesReserved() replicaInfo.getOriginalBytesReserved()
@ -2634,11 +2636,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newReplicaInfo.setNumBytes(newlength); newReplicaInfo.setNumBytes(newlength);
volumeMap.add(bpid, newReplicaInfo.getReplicaInfo()); volumeMap.add(bpid, newReplicaInfo.getReplicaInfo());
finalizeReplica(bpid, newReplicaInfo.getReplicaInfo(), false); finalizeReplica(bpid, newReplicaInfo.getReplicaInfo());
} }
} }
// finalize the block // finalize the block
return finalizeReplica(bpid, rur, false); return finalizeReplica(bpid, rur);
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi