From 9a66477e4d88748bda911449d3b882438afc1ff5 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Thu, 9 Apr 2020 12:22:12 +0200 Subject: [PATCH] YARN-10207. CLOSE_WAIT socket connection leaks during rendering of (corrupted) aggregated logs on the JobHistoryServer Web UI. Contributed by Siddharth Ahuja --- .../logaggregation/AggregatedLogFormat.java | 18 +++++++++++------- .../TestAggregatedLogFormat.java | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index ca43fe6ad96..4d0beaa3b92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -579,13 +579,17 @@ public class AggregatedLogFormat { public LogReader(Configuration conf, Path remoteAppLogFile) throws IOException { - FileContext fileContext = - FileContext.getFileContext(remoteAppLogFile.toUri(), conf); - this.fsDataIStream = fileContext.open(remoteAppLogFile); - reader = - new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus( - remoteAppLogFile).getLen(), conf); - this.scanner = reader.createScanner(); + try { + FileContext fileContext = + FileContext.getFileContext(remoteAppLogFile.toUri(), conf); + this.fsDataIStream = fileContext.open(remoteAppLogFile); + reader = new TFile.Reader(this.fsDataIStream, + fileContext.getFileStatus(remoteAppLogFile).getLen(), conf); + this.scanner = reader.createScanner(); + } catch (IOException ioe) { + close(); + throw new IOException("Error in creating LogReader", ioe); + } } private boolean atBeginning = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java index f85445ed68e..33265ddaf45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java @@ -33,6 +33,10 @@ import java.io.OutputStreamWriter; import java.io.StringWriter; import java.io.UnsupportedEncodingException; import java.io.Writer; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.CountDownLatch; @@ -122,6 +126,20 @@ public class TestAggregatedLogFormat { Assert.fail("Aggregated logs are corrupted."); } } + + //Append some corrupted text to the end of the aggregated file. + URI logUri = URI.create("file:///" + remoteAppLogFile.toUri().toString()); + Files.write(Paths.get(logUri), + "corrupt_text".getBytes(), StandardOpenOption.APPEND); + try { + // Trying to read a corrupted log file created above should cause + // log reading to fail below with an IOException. + logReader = new LogReader(conf, remoteAppLogFile); + Assert.fail("Expect IOException from reading corrupt aggregated logs."); + } catch (IOException ioe) { + DataInputStream dIS = logReader.next(rLogKey); + Assert.assertNull("Input stream not available for reading", dIS); + } } private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long length,