diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index ca43fe6ad96..4d0beaa3b92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -579,13 +579,17 @@ public static class LogReader { public LogReader(Configuration conf, Path remoteAppLogFile) throws IOException { - FileContext fileContext = - FileContext.getFileContext(remoteAppLogFile.toUri(), conf); - this.fsDataIStream = fileContext.open(remoteAppLogFile); - reader = - new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus( - remoteAppLogFile).getLen(), conf); - this.scanner = reader.createScanner(); + try { + FileContext fileContext = + FileContext.getFileContext(remoteAppLogFile.toUri(), conf); + this.fsDataIStream = fileContext.open(remoteAppLogFile); + reader = new TFile.Reader(this.fsDataIStream, + fileContext.getFileStatus(remoteAppLogFile).getLen(), conf); + this.scanner = reader.createScanner(); + } catch (IOException ioe) { + close(); + throw new IOException("Error in creating LogReader", ioe); + } } private boolean atBeginning = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index 9ae2983ca0f..bf20fb74292 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -33,6 +33,10 @@ import java.io.StringWriter; import java.io.UnsupportedEncodingException; import java.io.Writer; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.CountDownLatch; @@ -121,6 +125,20 @@ public void testForCorruptedAggregatedLogs() throws Exception { Assert.fail("Aggregated logs are corrupted."); } } + + //Append some corrupted text to the end of the aggregated file. + URI logUri = URI.create("file:///" + remoteAppLogFile.toUri().toString()); + Files.write(Paths.get(logUri), + "corrupt_text".getBytes(), StandardOpenOption.APPEND); + try { + // Trying to read a corrupted log file created above should cause + // log reading to fail below with an IOException. + logReader = new LogReader(conf, remoteAppLogFile); + Assert.fail("Expect IOException from reading corrupt aggregated logs."); + } catch (IOException ioe) { + DataInputStream dIS = logReader.next(rLogKey); + Assert.assertNull("Input stream not available for reading", dIS); + } } private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long length,