diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index dcfe3500b80..221bc6adb63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -2611,6 +2611,10 @@ class FsDatasetImpl implements FsDatasetSpi { LOG.info("initReplicaRecovery: changing replica state for " + block + " from " + replica.getState() + " to " + rur.getState()); + if (replica.getState() == ReplicaState.TEMPORARY || replica + .getState() == ReplicaState.RBW) { + ((ReplicaInPipeline) replica).releaseAllBytesReserved(); + } } return rur.createInfo(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 5cb2195e941..087d8f4ab07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1441,6 +1441,11 @@ public class DFSTestUtil { out.abort(); } + public static void setPipeline(DFSOutputStream out, LocatedBlock lastBlock) + throws IOException { + out.getStreamer().setPipelineInConstruction(lastBlock); + } + public static byte[] asArray(ByteBuffer buf) { byte arr[] = new byte[buf.remaining()]; buf.duplicate().get(arr); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java index fad52164dae..2daca863205 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java @@ -31,10 +31,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -44,6 +47,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Daemon; import org.apache.log4j.Level; import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -82,9 +86,12 @@ public class TestSpaceReservation { private static Random rand = new Random(); - private void initConfig(int blockSize) { + @Before + public void before() { conf = new HdfsConfiguration(); + } + private void initConfig(int blockSize) { // Refresh disk usage information frequently. conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC); conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize); @@ -680,4 +687,62 @@ public class TestSpaceReservation { } } } + + @Test(timeout = 60000) + public void testReservedSpaceForLeaseRecovery() throws Exception { + final short replication = 3; + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2); + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, + 1000); + startCluster(BLOCK_SIZE, replication, -1); + + final String methodName = GenericTestUtils.getMethodName(); + final Path file = new Path("/" + methodName + ".01.dat"); + // Write to the file and kill the writer. + FSDataOutputStream os = fs.create(file, replication); + os.write(new byte[8192]); + os.hflush(); + os.close(); + /* + * Reset the pipeline for the append in such a way that, datanode which is + * down is one of the mirror, not the first datanode. + */ + HdfsBlockLocation blockLocation = (HdfsBlockLocation) fs.getClient() + .getBlockLocations(file.toString(), 0, BLOCK_SIZE)[0]; + LocatedBlock lastBlock = blockLocation.getLocatedBlock(); + // stop 3rd node. + cluster.stopDataNode(lastBlock.getLocations()[2].getName()); + try { + os = fs.append(file); + DFSTestUtil.setPipeline((DFSOutputStream) os.getWrappedStream(), + lastBlock); + os.writeBytes("hi"); + os.hsync(); + } catch (IOException e) { + // Append will fail due to not able to replace datanodes in 3 nodes + // cluster. + LOG.info("", e); + } + DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream()); + /* + * There is a chance that stopped DN could be chosen as primary for + * recovery. If so, then recovery will not happen in time. So mark stopped + * node as dead to exclude that node. + */ + cluster.setDataNodeDead(lastBlock.getLocations()[2]); + fs.recoverLease(file); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + return fs.isFileClosed(file); + } catch (IOException e) { + return false; + } + } + }, 500, 30000); + checkReservedSpace(0); + } }