YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write more log-data than the log-length that it records. Contributed by Mit Desai.
svn merge --ignore-ancestry -c 1580005 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1580006 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2c3feeb727
commit
d1f2d89a43
|
@ -521,6 +521,9 @@ Release 2.4.0 - UNRELEASED
|
|||
YARN-1811. Fixed AMFilters in YARN to correctly accept requests from either
|
||||
web-app proxy or the RMs when HA is enabled. (Robert Kanter via vinodkv)
|
||||
|
||||
YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write
|
||||
more log-data than the log-length that it records. (Mit Desai via vinodk)
|
||||
|
||||
Release 2.3.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -182,20 +182,29 @@ public class AggregatedLogFormat {
|
|||
Arrays.sort(logFiles);
|
||||
for (File logFile : logFiles) {
|
||||
|
||||
long fileLength = 0;
|
||||
|
||||
// Write the logFile Type
|
||||
out.writeUTF(logFile.getName());
|
||||
|
||||
// Write the log length as UTF so that it is printable
|
||||
out.writeUTF(String.valueOf(logFile.length()));
|
||||
out.writeUTF(String.valueOf(fileLength = logFile.length()));
|
||||
|
||||
// Write the log itself
|
||||
FileInputStream in = null;
|
||||
try {
|
||||
in = SecureIOUtils.openForRead(logFile, getUser(), null);
|
||||
byte[] buf = new byte[65535];
|
||||
long curRead = 0;
|
||||
int len = 0;
|
||||
while ((len = in.read(buf)) != -1) {
|
||||
while ( ((len = in.read(buf)) != -1) && (curRead < fileLength) ) {
|
||||
out.write(buf, 0, len);
|
||||
curRead += len;
|
||||
}
|
||||
long newLength = logFile.length();
|
||||
if(fileLength < newLength) {
|
||||
LOG.warn("Aggregated Logs Truncated by "+
|
||||
(newLength-fileLength) +" bytes.");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
String message = "Error aggregating log file. Log file : "
|
||||
|
@ -553,7 +562,7 @@ public class AggregatedLogFormat {
|
|||
out.println(fileLengthStr);
|
||||
out.println("Log Contents:");
|
||||
|
||||
int curRead = 0;
|
||||
long curRead = 0;
|
||||
long pendingRead = fileLength - curRead;
|
||||
int toRead =
|
||||
pendingRead > buf.length ? buf.length : (int) pendingRead;
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.io.UnsupportedEncodingException;
|
|||
import java.io.Writer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
|
@ -87,6 +88,96 @@ public class TestAggregatedLogFormat {
|
|||
fs.delete(workDirPath, true);
|
||||
}
|
||||
|
||||
//Test for Corrupted AggregatedLogs. The Logs should not write more data
|
||||
//if Logvalue.write() is called and the application is still
|
||||
//appending to logs
|
||||
|
||||
@Test
|
||||
public void testForCorruptedAggregatedLogs() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
File workDir = new File(testWorkDir, "testReadAcontainerLogs1");
|
||||
Path remoteAppLogFile =
|
||||
new Path(workDir.getAbsolutePath(), "aggregatedLogFile");
|
||||
Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles");
|
||||
ContainerId testContainerId = TestContainerId.newContainerId(1, 1, 1, 1);
|
||||
Path t =
|
||||
new Path(srcFileRoot, testContainerId.getApplicationAttemptId()
|
||||
.getApplicationId().toString());
|
||||
Path srcFilePath = new Path(t, testContainerId.toString());
|
||||
|
||||
long numChars = 950000;
|
||||
|
||||
writeSrcFileAndALog(srcFilePath, "stdout", numChars, remoteAppLogFile,
|
||||
srcFileRoot, testContainerId);
|
||||
|
||||
LogReader logReader = new LogReader(conf, remoteAppLogFile);
|
||||
LogKey rLogKey = new LogKey();
|
||||
DataInputStream dis = logReader.next(rLogKey);
|
||||
Writer writer = new StringWriter();
|
||||
try {
|
||||
LogReader.readAcontainerLogs(dis, writer);
|
||||
} catch (Exception e) {
|
||||
if(e.toString().contains("NumberFormatException")) {
|
||||
Assert.fail("Aggregated logs are corrupted.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long length,
|
||||
Path remoteAppLogFile, Path srcFileRoot, ContainerId testContainerId)
|
||||
throws Exception {
|
||||
File dir = new File(srcFilePath.toString());
|
||||
if (!dir.exists()) {
|
||||
if (!dir.mkdirs()) {
|
||||
throw new IOException("Unable to create directory : " + dir);
|
||||
}
|
||||
}
|
||||
|
||||
File outputFile = new File(new File(srcFilePath.toString()), fileName);
|
||||
FileOutputStream os = new FileOutputStream(outputFile);
|
||||
final OutputStreamWriter osw = new OutputStreamWriter(os, "UTF8");
|
||||
final int ch = filler;
|
||||
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
|
||||
|
||||
LogKey logKey = new LogKey(testContainerId);
|
||||
LogValue logValue =
|
||||
spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
|
||||
testContainerId, ugi.getShortUserName()));
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
Thread t = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
for(int i=0; i < length/3; i++) {
|
||||
osw.write(ch);
|
||||
}
|
||||
|
||||
latch.countDown();
|
||||
|
||||
for(int i=0; i < (2*length)/3; i++) {
|
||||
osw.write(ch);
|
||||
}
|
||||
osw.close();
|
||||
} catch (IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
|
||||
//Wait till the osw is partially written
|
||||
//aggregation starts once the ows has completed 1/3rd of its work
|
||||
latch.await();
|
||||
|
||||
//Aggregate The Logs
|
||||
logWriter.append(logKey, logValue);
|
||||
logWriter.close();
|
||||
}
|
||||
|
||||
//Verify the output generated by readAContainerLogs(DataInputStream, Writer)
|
||||
@Test
|
||||
public void testReadAcontainerLogs1() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue