diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 812e1c47107..767d3ec1f92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1162,7 +1162,8 @@ private synchronized ReplicaBeingWritten append(String bpid, // construct a RBW replica with the new GS File blkfile = replicaInfo.getBlockFile(); FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); - if (v.getAvailable() < estimateBlockLen - replicaInfo.getNumBytes()) { + long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes(); + if (v.getAvailable() < bytesReserved) { throw new DiskOutOfSpaceException("Insufficient space for appending to " + replicaInfo); } @@ -1170,7 +1171,7 @@ private synchronized ReplicaBeingWritten append(String bpid, File oldmeta = replicaInfo.getMetaFile(); ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, - v, newBlkFile.getParentFile(), Thread.currentThread(), estimateBlockLen); + v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); File newmeta = newReplicaInfo.getMetaFile(); // rename meta file to rbw directory @@ -1206,7 +1207,7 @@ private synchronized ReplicaBeingWritten append(String bpid, // Replace finalized replica by a RBW replica in replicas map volumeMap.add(bpid, newReplicaInfo); - v.reserveSpaceForReplica(estimateBlockLen - replicaInfo.getNumBytes()); + v.reserveSpaceForReplica(bytesReserved); return newReplicaInfo; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java index 49e585d3542..6dbd299e9e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java @@ -572,4 +572,64 @@ public int getNumFailures() { return numFailures; } } + + @Test(timeout = 30000) + public void testReservedSpaceForAppend() throws Exception { + final short replication = 3; + startCluster(BLOCK_SIZE, replication, -1); + final String methodName = GenericTestUtils.getMethodName(); + final Path file = new Path("/" + methodName + ".01.dat"); + + // Write 1 byte to the file and kill the writer. + FSDataOutputStream os = fs.create(file, replication); + os.write(new byte[1024]); + os.close(); + + final Path file2 = new Path("/" + methodName + ".02.dat"); + + // Write 1 byte to the file and keep it open. + FSDataOutputStream os2 = fs.create(file2, replication); + os2.write(new byte[1]); + os2.hflush(); + int expectedFile2Reserved = BLOCK_SIZE - 1; + checkReservedSpace(expectedFile2Reserved); + + // append one byte and verify reservedspace before and after closing + os = fs.append(file); + os.write(new byte[1]); + os.hflush(); + int expectedFile1Reserved = BLOCK_SIZE - 1025; + checkReservedSpace(expectedFile2Reserved + expectedFile1Reserved); + os.close(); + checkReservedSpace(expectedFile2Reserved); + + // append one byte and verify reservedspace before and after abort + os = fs.append(file); + os.write(new byte[1]); + os.hflush(); + expectedFile1Reserved--; + checkReservedSpace(expectedFile2Reserved + expectedFile1Reserved); + DFSTestUtil.abortStream(((DFSOutputStream) os.getWrappedStream())); + checkReservedSpace(expectedFile2Reserved); + } + + private void checkReservedSpace(final long expectedReserved) throws TimeoutException, + InterruptedException, IOException { + for (final DataNode dn : cluster.getDataNodes()) { + try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset() + .getFsVolumeReferences()) { + final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LOG.info( + "dn " + dn.getDisplayName() + " space : " + volume + .getReservedForReplicas() + ", Expected ReservedSpace :" + + expectedReserved); + return (volume.getReservedForReplicas() == expectedReserved); + } + }, 100, 3000); + } + } + } }