diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 84e382a488b..91a16bc2c03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1403,6 +1403,9 @@ Release 2.7.0 - UNRELEASED HDFS-8038. PBImageDelimitedTextWriter#getEntry output HDFS path in platform-specific format. (Xiaoyu Yao via cnauroth) + HDFS-8072. Reserved RBW space is not released if client terminates while + writing block. (Arpit Agarwal) + BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS HDFS-7720. Quota by Storage Type API, tools and ClientNameNode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 58cb8b1a51b..c0be956e8ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -817,6 +817,7 @@ class BlockReceiver implements Closeable { } } catch (IOException ioe) { + replicaInfo.releaseAllBytesReserved(); if (datanode.isRestarting()) { // Do not throw if shutting down for restart. Otherwise, it will cause // premature termination of responder. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index 6a2664011d7..cc55f85c2ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -148,6 +148,12 @@ public class ReplicaInPipeline extends ReplicaInfo return bytesReserved; } + @Override + public void releaseAllBytesReserved() { // ReplicaInPipelineInterface + getVolume().releaseReservedSpace(bytesReserved); + bytesReserved = 0; + } + @Override // ReplicaInPipelineInterface public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { this.bytesOnDisk = dataLength; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java index 7f08b81b416..0263d0f9f3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java @@ -44,6 +44,11 @@ public interface ReplicaInPipelineInterface extends Replica { */ void setBytesAcked(long bytesAcked); + /** + * Release any disk space reserved for this replica. + */ + public void releaseAllBytesReserved(); + /** * store the checksum for the last chunk along with the data length * @param dataLength number of bytes on disk diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 160a86c3239..a358e2256fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -289,6 +289,10 @@ public class SimulatedFSDataset implements FsDatasetSpi { } } + @Override + public void releaseAllBytesReserved() { + } + @Override synchronized public long getBytesOnDisk() { if (finalized) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java index c3c0197ce72..ad44500e502 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java @@ -40,6 +40,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface { public void setBytesAcked(long bytesAcked) { } + @Override + public void releaseAllBytesReserved() { + } + @Override public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java index 487f3ab21ab..ebf2f3b9862 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import com.google.common.base.Supplier; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,6 +46,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.List; import java.util.Random; +import java.util.concurrent.TimeoutException; /** * Ensure that the DN reserves disk space equivalent to a full block for @@ -53,7 +55,6 @@ import java.util.Random; public class TestRbwSpaceReservation { static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class); - private static final short REPL_FACTOR = 1; private static final int DU_REFRESH_INTERVAL_MSEC = 500; private static final int STORAGES_PER_DATANODE = 1; private static final int BLOCK_SIZE = 1024 * 1024; @@ -83,25 +84,38 @@ public class TestRbwSpaceReservation { ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL); } - private void startCluster(int blockSize, long perVolumeCapacity) throws IOException { + /** + * + * @param blockSize + * @param perVolumeCapacity limit the capacity of each volume to the given + * value. If negative, then don't limit. + * @throws IOException + */ + private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException { initConfig(blockSize); cluster = new MiniDFSCluster .Builder(conf) .storagesPerDatanode(STORAGES_PER_DATANODE) - .numDataNodes(REPL_FACTOR) + .numDataNodes(numDatanodes) .build(); fs = cluster.getFileSystem(); client = fs.getClient(); cluster.waitActive(); if (perVolumeCapacity >= 0) { + for (DataNode dn : cluster.getDataNodes()) { + for (FsVolumeSpi volume : dn.getFSDataset().getVolumes()) { + ((FsVolumeImpl) volume).setCapacityForTesting(perVolumeCapacity); + } + } + } + + if (numDatanodes == 1) { List volumes = cluster.getDataNodes().get(0).getFSDataset().getVolumes(); - assertThat(volumes.size(), is(1)); singletonVolume = ((FsVolumeImpl) volumes.get(0)); - singletonVolume.setCapacityForTesting(perVolumeCapacity); } } @@ -128,7 +142,7 @@ public class TestRbwSpaceReservation { throws IOException, InterruptedException { // Enough for 1 block + meta files + some delta. final long configuredCapacity = fileBlockSize * 2 - 1; - startCluster(BLOCK_SIZE, configuredCapacity); + startCluster(BLOCK_SIZE, 1, configuredCapacity); FSDataOutputStream out = null; Path path = new Path("/" + fileNamePrefix + ".dat"); @@ -195,7 +209,7 @@ public class TestRbwSpaceReservation { @Test (timeout=300000) public void testWithLimitedSpace() throws IOException { // Cluster with just enough space for a full block + meta. - startCluster(BLOCK_SIZE, 2 * BLOCK_SIZE - 1); + startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1); final String methodName = GenericTestUtils.getMethodName(); Path file1 = new Path("/" + methodName + ".01.dat"); Path file2 = new Path("/" + methodName + ".02.dat"); @@ -208,7 +222,6 @@ public class TestRbwSpaceReservation { os2 = fs.create(file2); // Write one byte to the first file. - LOG.info("arpit: writing first file"); byte[] data = new byte[1]; os1.write(data); os1.hsync(); @@ -227,6 +240,42 @@ public class TestRbwSpaceReservation { } } + /** + * Ensure that reserved space is released when the client goes away + * unexpectedly. + * + * The verification is done for each replica in the write pipeline. + * + * @throws IOException + */ + @Test(timeout=300000) + public void testSpaceReleasedOnUnexpectedEof() + throws IOException, InterruptedException, TimeoutException { + final short replication = 3; + startCluster(BLOCK_SIZE, replication, -1); + + final String methodName = GenericTestUtils.getMethodName(); + final Path file = new Path("/" + methodName + ".01.dat"); + + // Write 1 byte to the file and kill the writer. + FSDataOutputStream os = fs.create(file, replication); + os.write(new byte[1]); + os.hsync(); + DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream()); + + // Ensure all space reserved for the replica was released on each + // DataNode. + for (DataNode dn : cluster.getDataNodes()) { + final FsVolumeImpl volume = (FsVolumeImpl) dn.getFSDataset().getVolumes().get(0); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return (volume.getReservedForRbw() == 0); + } + }, 500, Integer.MAX_VALUE); // Wait until the test times out. + } + } + /** * Stress test to ensure we are not leaking reserved space. * @throws IOException @@ -235,7 +284,7 @@ public class TestRbwSpaceReservation { @Test (timeout=600000) public void stressTest() throws IOException, InterruptedException { final int numWriters = 5; - startCluster(SMALL_BLOCK_SIZE, SMALL_BLOCK_SIZE * numWriters * 10); + startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10); Writer[] writers = new Writer[numWriters]; // Start a few writers and let them run for a while.