YARN-10207. CLOSE_WAIT socket connection leaks during rendering of (corrupted) aggregated logs on the JobHistoryServer Web UI. Contributed by Siddharth Ahuja

(cherry picked from commit bffb43b00e)
This commit is contained in:
Szilard Nemeth 2020-04-07 17:03:17 +02:00
parent 7abc6221a3
commit d293e120eb
2 changed files with 29 additions and 7 deletions

View File

@ -579,13 +579,17 @@ public static class LogReader {
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext =
FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
try {
FileContext fileContext =
FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader = new TFile.Reader(this.fsDataIStream,
fileContext.getFileStatus(remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
} catch (IOException ioe) {
close();
throw new IOException("Error in creating LogReader", ioe);
}
}
private boolean atBeginning = true;

View File

@ -33,6 +33,10 @@
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
@ -121,6 +125,20 @@ public void testForCorruptedAggregatedLogs() throws Exception {
Assert.fail("Aggregated logs are corrupted.");
}
}
//Append some corrupted text to the end of the aggregated file.
URI logUri = URI.create("file:///" + remoteAppLogFile.toUri().toString());
Files.write(Paths.get(logUri),
"corrupt_text".getBytes(), StandardOpenOption.APPEND);
try {
// Trying to read a corrupted log file created above should cause
// log reading to fail below with an IOException.
logReader = new LogReader(conf, remoteAppLogFile);
Assert.fail("Expect IOException from reading corrupt aggregated logs.");
} catch (IOException ioe) {
DataInputStream dIS = logReader.next(rLogKey);
Assert.assertNull("Input stream not available for reading", dIS);
}
}
private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long length,