HDFS-7999. FsDatasetImpl#createTemporary sometimes holds the FSDatasetImpl lock for a very long time (sinago via cmccabe)

(cherry picked from commit 28bebc81db)
This commit is contained in:
Colin Patrick Mccabe 2015-04-06 08:54:46 -07:00
parent d2b2d76cce
commit a827089905
2 changed files with 52 additions and 28 deletions

View File

@ -1078,6 +1078,9 @@ Release 2.7.0 - UNRELEASED
HDFS-8051. FsVolumeList#addVolume should release volume reference if not HDFS-8051. FsVolumeList#addVolume should release volume reference if not
put it into BlockScanner. (Lei (Eddy) Xu via Colin P. McCabe) put it into BlockScanner. (Lei (Eddy) Xu via Colin P. McCabe)
HDFS-7999. FsDatasetImpl#createTemporary sometimes holds the FSDatasetImpl
lock for a very long time (sinago via cmccabe)
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

View File

@ -1414,38 +1414,59 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaHandler createTemporary( public ReplicaHandler createTemporary(
StorageType storageType, ExtendedBlock b) throws IOException { StorageType storageType, ExtendedBlock b) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); long startTimeMs = Time.monotonicNow();
if (replicaInfo != null) { long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
if (replicaInfo.getGenerationStamp() < b.getGenerationStamp() ReplicaInfo lastFoundReplicaInfo = null;
&& replicaInfo instanceof ReplicaInPipeline) { do {
// Stop the previous writer synchronized (this) {
((ReplicaInPipeline)replicaInfo) ReplicaInfo currentReplicaInfo =
.stopWriter(datanode.getDnConf().getXceiverStopTimeout()); volumeMap.get(b.getBlockPoolId(), b.getBlockId());
invalidate(b.getBlockPoolId(), new Block[]{replicaInfo}); if (currentReplicaInfo == lastFoundReplicaInfo) {
} else { if (lastFoundReplicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b + invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
" already exists in state " + replicaInfo.getState() + }
" and thus cannot be created."); FsVolumeReference ref =
volumes.getNextVolume(storageType, b.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
// create a temporary file to hold block in the designated volume
File f;
try {
f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
ReplicaInPipeline newReplicaInfo =
new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
f.getParentFile(), 0);
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return new ReplicaHandler(newReplicaInfo, ref);
} else {
if (!(currentReplicaInfo.getGenerationStamp() < b
.getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) {
throw new ReplicaAlreadyExistsException("Block " + b
+ " already exists in state " + currentReplicaInfo.getState()
+ " and thus cannot be created.");
}
lastFoundReplicaInfo = currentReplicaInfo;
}
} }
}
FsVolumeReference ref = volumes.getNextVolume(storageType, b.getNumBytes()); // Hang too long, just bail out. This is not supposed to happen.
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); long writerStopMs = Time.monotonicNow() - startTimeMs;
// create a temporary file to hold block in the designated volume if (writerStopMs > writerStopTimeoutMs) {
File f; LOG.warn("Unable to stop existing writer for block " + b + " after "
try { + writerStopMs + " miniseconds.");
f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); throw new IOException("Unable to stop existing writer for block " + b
} catch (IOException e) { + " after " + writerStopMs + " miniseconds.");
IOUtils.cleanup(null, ref); }
throw e;
}
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), // Stop the previous writer
b.getGenerationStamp(), v, f.getParentFile(), 0); ((ReplicaInPipeline) lastFoundReplicaInfo)
volumeMap.add(b.getBlockPoolId(), newReplicaInfo); .stopWriter(writerStopTimeoutMs);
return new ReplicaHandler(newReplicaInfo, ref); } while (true);
} }
/** /**