diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index be5d197b7e4..e6de8b958d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1078,6 +1078,9 @@ Release 2.7.0 - UNRELEASED HDFS-8051. FsVolumeList#addVolume should release volume reference if not put it into BlockScanner. (Lei (Eddy) Xu via Colin P. McCabe) + HDFS-7999. FsDatasetImpl#createTemporary sometimes holds the FSDatasetImpl + lock for a very long time (sinago via cmccabe) + BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS HDFS-7720. Quota by Storage Type API, tools and ClientNameNode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 2cc67798cec..6af7d925bce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1414,38 +1414,59 @@ public synchronized ReplicaInPipeline convertTemporaryToRbw( } @Override // FsDatasetSpi - public synchronized ReplicaHandler createTemporary( + public ReplicaHandler createTemporary( StorageType storageType, ExtendedBlock b) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); - if (replicaInfo != null) { - if (replicaInfo.getGenerationStamp() < b.getGenerationStamp() - && replicaInfo instanceof ReplicaInPipeline) { - // Stop the previous writer - ((ReplicaInPipeline)replicaInfo) - .stopWriter(datanode.getDnConf().getXceiverStopTimeout()); - invalidate(b.getBlockPoolId(), new Block[]{replicaInfo}); - } else { - throw new ReplicaAlreadyExistsException("Block " + b + - " already exists in state " + replicaInfo.getState() + - " and thus cannot be created."); + long startTimeMs = Time.monotonicNow(); + long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout(); + ReplicaInfo lastFoundReplicaInfo = null; + do { + synchronized (this) { + ReplicaInfo currentReplicaInfo = + volumeMap.get(b.getBlockPoolId(), b.getBlockId()); + if (currentReplicaInfo == lastFoundReplicaInfo) { + if (lastFoundReplicaInfo != null) { + invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }); + } + FsVolumeReference ref = + volumes.getNextVolume(storageType, b.getNumBytes()); + FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); + // create a temporary file to hold block in the designated volume + File f; + try { + f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + ReplicaInPipeline newReplicaInfo = + new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, + f.getParentFile(), 0); + volumeMap.add(b.getBlockPoolId(), newReplicaInfo); + return new ReplicaHandler(newReplicaInfo, ref); + } else { + if (!(currentReplicaInfo.getGenerationStamp() < b + .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) { + throw new ReplicaAlreadyExistsException("Block " + b + + " already exists in state " + currentReplicaInfo.getState() + + " and thus cannot be created."); + } + lastFoundReplicaInfo = currentReplicaInfo; + } } - } - FsVolumeReference ref = volumes.getNextVolume(storageType, b.getNumBytes()); - FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); - // create a temporary file to hold block in the designated volume - File f; - try { - f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); - } catch (IOException e) { - IOUtils.cleanup(null, ref); - throw e; - } + // Hang too long, just bail out. This is not supposed to happen. + long writerStopMs = Time.monotonicNow() - startTimeMs; + if (writerStopMs > writerStopTimeoutMs) { + LOG.warn("Unable to stop existing writer for block " + b + " after " + + writerStopMs + " miniseconds."); + throw new IOException("Unable to stop existing writer for block " + b + + " after " + writerStopMs + " miniseconds."); + } - ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), - b.getGenerationStamp(), v, f.getParentFile(), 0); - volumeMap.add(b.getBlockPoolId(), newReplicaInfo); - return new ReplicaHandler(newReplicaInfo, ref); + // Stop the previous writer + ((ReplicaInPipeline) lastFoundReplicaInfo) + .stopWriter(writerStopTimeoutMs); + } while (true); } /**