diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c9e5b87c759..4decfd8d889 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -690,6 +690,9 @@ Release 2.8.0 - UNRELEASED HDFS-6576. Datanode log is generating at root directory in security mode (surendra singh lilhore via vinayakumarb) + HDFS-3384. DataStreamer thread should be closed immediatly when failed to + setup a PipelineForAppendOrRecovery (Uma Maheswara Rao G via vinayakumarb) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 96bf21263ec..697ee1182ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -460,6 +460,9 @@ class DataStreamer extends Daemon { LOG.debug("Append to block " + block); } setupPipelineForAppendOrRecovery(); + if (true == streamerClosed) { + continue; + } initDataStreaming(); } @@ -571,6 +574,7 @@ class DataStreamer extends Daemon { } } lastException.set(e); + assert !(e instanceof NullPointerException); hasError = true; if (errorIndex == -1 && restartingNodeIndex.get() == -1) { // Not a datanode issue diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index 6a7c3eaa587..402c944fee6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -602,4 +602,26 @@ public class TestFileAppend{ cluster.shutdown(); } } + + @Test(timeout = 10000) + public void testAppendCorruptedBlock() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + conf.setInt("dfs.min.replication", 1); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + .build(); + try { + DistributedFileSystem fs = cluster.getFileSystem(); + Path fileName = new Path("/appendCorruptBlock"); + DFSTestUtil.createFile(fs, fileName, 512, (short) 1, 0); + DFSTestUtil.waitReplication(fs, fileName, (short) 1); + Assert.assertTrue("File not created", fs.exists(fileName)); + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName); + cluster.corruptBlockOnDataNodes(block); + DFSTestUtil.appendFile(fs, fileName, "appendCorruptBlock"); + } finally { + cluster.shutdown(); + } + } }