diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 58357519162..da64e6db174 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -278,6 +278,9 @@ Release 2.7.4 - UNRELEASED HDFS-11795. Fix ASF License warnings in branch-2.7. (Yiqun Lin via aajisaka) + HDFS-11674. reserveSpaceForReplicas is not released if append request failed + due to mirror down and replica recovered (vinayakumarb) + Release 2.7.3 - 2016-08-25 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index a44de61c034..1272e2928aa 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -426,6 +426,21 @@ public class DFSOutputStream extends FSOutputSummer } } + /** + * Set pipeline in construction + * + * @param lastBlock the last block of a file + * @throws IOException + */ + void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{ + // setup pipeline to append to the last block XXX retries?? + setPipeline(lastBlock); + if (nodes.length < 1) { + throw new IOException("Unable to retrieve blocks locations " + + " for last block " + block + " of file " + src); + } + } + private void setPipeline(LocatedBlock lb) { setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); } @@ -2497,6 +2512,13 @@ public class DFSOutputStream extends FSOutputSummer return fileId; } + /** + * Returns the data streamer object. + */ + protected DataStreamer getStreamer() { + return streamer; + } + private static void arraycopy(T[] srcs, T[] dsts, int skipIndex) { System.arraycopy(srcs, 0, dsts, 0, skipIndex); System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index ec86337b1f1..c13b6f51286 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -2359,6 +2359,10 @@ class FsDatasetImpl implements FsDatasetSpi { LOG.info("initReplicaRecovery: changing replica state for " + block + " from " + replica.getState() + " to " + rur.getState()); + if (replica.getState() == ReplicaState.TEMPORARY || replica + .getState() == ReplicaState.RBW) { + ((ReplicaInPipeline) replica).releaseAllBytesReserved(); + } } return rur.createInfo(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index d0dba9f6b20..974903b45a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1355,6 +1355,11 @@ public class DFSTestUtil { out.abort(); } + public static void setPipeline(DFSOutputStream out, LocatedBlock lastBlock) + throws IOException { + out.getStreamer().setPipelineInConstruction(lastBlock); + } + public static byte[] asArray(ByteBuffer buf) { byte arr[] = new byte[buf.remaining()]; buf.duplicate().get(arr); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java index 026cbd8adc2..24c2e0643dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java @@ -30,10 +30,13 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -43,6 +46,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Daemon; import org.apache.log4j.Level; import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -76,9 +80,12 @@ public class TestRbwSpaceReservation { private DataNodeFaultInjector old = null; private static Random rand = new Random(); - private void initConfig(int blockSize) { + @Before + public void before() { conf = new HdfsConfiguration(); + } + private void initConfig(int blockSize) { // Refresh disk usage information frequently. conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC); conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize); @@ -523,4 +530,62 @@ public class TestRbwSpaceReservation { } } } + + @Test(timeout = 60000) + public void testReservedSpaceForLeaseRecovery() throws Exception { + final short replication = 3; + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2); + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY, + 1000); + startCluster(BLOCK_SIZE, replication, -1); + + final String methodName = GenericTestUtils.getMethodName(); + final Path file = new Path("/" + methodName + ".01.dat"); + // Write to the file and kill the writer. + FSDataOutputStream os = fs.create(file, replication); + os.write(new byte[8192]); + os.hflush(); + os.close(); + /* + * Reset the pipeline for the append in such a way that, datanode which is + * down is one of the mirror, not the first datanode. + */ + HdfsBlockLocation blockLocation = (HdfsBlockLocation) fs.getClient() + .getBlockLocations(file.toString(), 0, BLOCK_SIZE)[0]; + LocatedBlock lastBlock = blockLocation.getLocatedBlock(); + // stop 3rd node. + cluster.stopDataNode(lastBlock.getLocations()[2].getName()); + try { + os = fs.append(file); + DFSTestUtil.setPipeline((DFSOutputStream) os.getWrappedStream(), + lastBlock); + os.writeBytes("hi"); + os.hsync(); + } catch (IOException e) { + // Append will fail due to not able to replace datanodes in 3 nodes + // cluster. + LOG.info("", e); + } + DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream()); + /* + * There is a chance that stopped DN could be chosen as primary for + * recovery. If so, then recovery will not happen in time. So mark stopped + * node as dead to exclude that node. + */ + cluster.setDataNodeDead(lastBlock.getLocations()[2]); + fs.recoverLease(file); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + return fs.isFileClosed(file); + } catch (IOException e) { + return false; + } + } + }, 500, 30000); + checkReservedSpace(0); + } }