YARN-2724. Skipped uploading a local log file to HDFS if exception is raised when opening it. Contributed by Xuan Gong.

(cherry picked from commit e31f0a6558)
This commit is contained in:
Zhijie Shen 2014-10-24 11:13:44 -07:00
parent 1f6258f535
commit 13c60bac57
3 changed files with 43 additions and 10 deletions

View File

@ -703,6 +703,9 @@ Release 2.6.0 - UNRELEASED
YARN-2732. Fixed syntax error in SecureContainer.apt.vm. (Jian He via zjshen)
YARN-2724. Skipped uploading a local log file to HDFS if exception is raised
when opening it. (Xuan Gong via zjshen)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@ -211,6 +212,16 @@ public class AggregatedLogFormat {
Collections.sort(fileList);
for (File logFile : fileList) {
FileInputStream in = null;
try {
in = secureOpenFile(logFile);
} catch (IOException e) {
logErrorMessage(logFile, e);
IOUtils.cleanup(LOG, in);
continue;
}
final long fileLength = logFile.length();
// Write the logFile Type
out.writeUTF(logFile.getName());
@ -219,9 +230,7 @@ public class AggregatedLogFormat {
out.writeUTF(String.valueOf(fileLength));
// Write the log itself
FileInputStream in = null;
try {
in = SecureIOUtils.openForRead(logFile, getUser(), null);
byte[] buf = new byte[65535];
int len = 0;
long bytesLeft = fileLength;
@ -244,18 +253,26 @@ public class AggregatedLogFormat {
}
this.uploadedFiles.add(logFile);
} catch (IOException e) {
String message = "Error aggregating log file. Log file : "
+ logFile.getAbsolutePath() + e.getMessage();
LOG.error(message, e);
String message = logErrorMessage(logFile, e);
out.write(message.getBytes());
} finally {
if (in != null) {
in.close();
}
IOUtils.cleanup(LOG, in);
}
}
}
@VisibleForTesting
public FileInputStream secureOpenFile(File logFile) throws IOException {
return SecureIOUtils.openForRead(logFile, getUser(), null);
}
private static String logErrorMessage(File logFile, Exception e) {
String message = "Error aggregating log file. Log file : "
+ logFile.getAbsolutePath() + ". " + e.getMessage();
LOG.error(message, e);
return message;
}
// Added for testing purpose.
public String getUser() {
return user;

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.logaggregation;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doThrow;
import java.io.BufferedReader;
import java.io.DataInputStream;
@ -37,7 +38,6 @@ import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -194,6 +194,8 @@ public class TestAggregatedLogFormat {
int numChars = 80000;
// create file stderr and stdout in containerLogDir
writeSrcFile(srcFilePath, "stderr", numChars);
writeSrcFile(srcFilePath, "stdout", numChars);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@ -204,7 +206,14 @@ public class TestAggregatedLogFormat {
new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId, ugi.getShortUserName());
logWriter.append(logKey, logValue);
// When we try to open FileInputStream for stderr, it will throw out an IOException.
// Skip the log aggregation for stderr.
LogValue spyLogValue = spy(logValue);
File errorFile = new File((new Path(srcFilePath, "stderr")).toString());
doThrow(new IOException("Mock can not open FileInputStream")).when(
spyLogValue).secureOpenFile(errorFile);
logWriter.append(logKey, spyLogValue);
logWriter.close();
// make sure permission are correct on the file
@ -218,11 +227,15 @@ public class TestAggregatedLogFormat {
Writer writer = new StringWriter();
LogReader.readAcontainerLogs(dis, writer);
// We should only do the log aggregation for stdout.
// Since we could not open the fileInputStream for stderr, this file is not
// aggregated.
String s = writer.toString();
int expectedLength =
"\n\nLogType:stdout".length() + ("\nLogLength:" + numChars).length()
+ "\nLog Contents:\n".length() + numChars;
Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr"));
Assert.assertTrue("LogLength not matched", s.contains("LogLength:" + numChars));
Assert.assertTrue("Log Contents not matched", s.contains("Log Contents"));