HDFS-3384. DataStreamer thread should be closed immediatly when failed to setup a PipelineForAppendOrRecovery (Contributed by Uma Maheswara Rao G)

This commit is contained in:
Vinayakumar B 2015-05-08 17:18:14 +05:30
parent 1ffb7fa42e
commit c648317a68
3 changed files with 29 additions and 0 deletions

View File

@ -690,6 +690,9 @@ Release 2.8.0 - UNRELEASED
HDFS-6576. Datanode log is generating at root directory in security mode HDFS-6576. Datanode log is generating at root directory in security mode
(surendra singh lilhore via vinayakumarb) (surendra singh lilhore via vinayakumarb)
HDFS-3384. DataStreamer thread should be closed immediatly when failed to
setup a PipelineForAppendOrRecovery (Uma Maheswara Rao G via vinayakumarb)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -460,6 +460,9 @@ class DataStreamer extends Daemon {
LOG.debug("Append to block " + block); LOG.debug("Append to block " + block);
} }
setupPipelineForAppendOrRecovery(); setupPipelineForAppendOrRecovery();
if (true == streamerClosed) {
continue;
}
initDataStreaming(); initDataStreaming();
} }
@ -571,6 +574,7 @@ class DataStreamer extends Daemon {
} }
} }
lastException.set(e); lastException.set(e);
assert !(e instanceof NullPointerException);
hasError = true; hasError = true;
if (errorIndex == -1 && restartingNodeIndex.get() == -1) { if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
// Not a datanode issue // Not a datanode issue

View File

@ -602,4 +602,26 @@ public class TestFileAppend{
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout = 10000)
public void testAppendCorruptedBlock() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setInt("dfs.min.replication", 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
try {
DistributedFileSystem fs = cluster.getFileSystem();
Path fileName = new Path("/appendCorruptBlock");
DFSTestUtil.createFile(fs, fileName, 512, (short) 1, 0);
DFSTestUtil.waitReplication(fs, fileName, (short) 1);
Assert.assertTrue("File not created", fs.exists(fileName));
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
cluster.corruptBlockOnDataNodes(block);
DFSTestUtil.appendFile(fs, fileName, "appendCorruptBlock");
} finally {
cluster.shutdown();
}
}
} }