YARN-10207. CLOSE_WAIT socket connection leaks during rendering of (corrupted) aggregated logs on the JobHistoryServer Web UI. Contributed by Siddharth Ahuja

This commit is contained in:
Szilard Nemeth 2020-04-09 12:22:12 +02:00
parent 2a2c3a4094
commit 9a66477e4d
2 changed files with 29 additions and 7 deletions

View File

@ -579,13 +579,17 @@ public class AggregatedLogFormat {
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext =
FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
try {
FileContext fileContext =
FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader = new TFile.Reader(this.fsDataIStream,
fileContext.getFileStatus(remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
} catch (IOException ioe) {
close();
throw new IOException("Error in creating LogReader", ioe);
}
}
private boolean atBeginning = true;

View File

@ -33,6 +33,10 @@ import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
@ -122,6 +126,20 @@ public class TestAggregatedLogFormat {
Assert.fail("Aggregated logs are corrupted.");
}
}
//Append some corrupted text to the end of the aggregated file.
URI logUri = URI.create("file:///" + remoteAppLogFile.toUri().toString());
Files.write(Paths.get(logUri),
"corrupt_text".getBytes(), StandardOpenOption.APPEND);
try {
// Trying to read a corrupted log file created above should cause
// log reading to fail below with an IOException.
logReader = new LogReader(conf, remoteAppLogFile);
Assert.fail("Expect IOException from reading corrupt aggregated logs.");
} catch (IOException ioe) {
DataInputStream dIS = logReader.next(rLogKey);
Assert.assertNull("Input stream not available for reading", dIS);
}
}
private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long length,