From d293e120eb52a2f2f471ae0e81fc066e0c117bdd Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Tue, 7 Apr 2020 17:03:17 +0200 Subject: [PATCH] YARN-10207. CLOSE_WAIT socket connection leaks during rendering of (corrupted) aggregated logs on the JobHistoryServer Web UI. Contributed by Siddharth Ahuja (cherry picked from commit bffb43b00e14a23d96f08b5a5df01e7f760b11ed) --- .../logaggregation/AggregatedLogFormat.java | 18 +++++++++++------- .../TestAggregatedLogFormat.java | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index ca43fe6ad96..4d0beaa3b92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -579,13 +579,17 @@ public static class LogReader { public LogReader(Configuration conf, Path remoteAppLogFile) throws IOException { - FileContext fileContext = - FileContext.getFileContext(remoteAppLogFile.toUri(), conf); - this.fsDataIStream = fileContext.open(remoteAppLogFile); - reader = - new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus( - remoteAppLogFile).getLen(), conf); - this.scanner = reader.createScanner(); + try { + FileContext fileContext = + FileContext.getFileContext(remoteAppLogFile.toUri(), conf); + this.fsDataIStream = fileContext.open(remoteAppLogFile); + reader = new TFile.Reader(this.fsDataIStream, + fileContext.getFileStatus(remoteAppLogFile).getLen(), conf); + this.scanner = reader.createScanner(); + } catch (IOException ioe) { + close(); + throw new IOException("Error in creating LogReader", ioe); + } } private boolean atBeginning = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index 9ae2983ca0f..bf20fb74292 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -33,6 +33,10 @@ import java.io.StringWriter; import java.io.UnsupportedEncodingException; import java.io.Writer; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.CountDownLatch; @@ -121,6 +125,20 @@ public void testForCorruptedAggregatedLogs() throws Exception { Assert.fail("Aggregated logs are corrupted."); } } + + //Append some corrupted text to the end of the aggregated file. + URI logUri = URI.create("file:///" + remoteAppLogFile.toUri().toString()); + Files.write(Paths.get(logUri), + "corrupt_text".getBytes(), StandardOpenOption.APPEND); + try { + // Trying to read a corrupted log file created above should cause + // log reading to fail below with an IOException. + logReader = new LogReader(conf, remoteAppLogFile); + Assert.fail("Expect IOException from reading corrupt aggregated logs."); + } catch (IOException ioe) { + DataInputStream dIS = logReader.next(rLogKey); + Assert.assertNull("Input stream not available for reading", dIS); + } } private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long length,