YARN-6288. Exceptions during aggregated log writes are mishandled. Contributed by Akira Ajisaka
(cherry picked from commit 1b081ca27e
)
This commit is contained in:
parent
832c2f52f8
commit
84b7f2e956
|
@ -1345,18 +1345,18 @@ public class TestLogsCLI {
|
||||||
Path path =
|
Path path =
|
||||||
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
|
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
|
||||||
+ System.currentTimeMillis());
|
+ System.currentTimeMillis());
|
||||||
AggregatedLogFormat.LogWriter writer =
|
try (AggregatedLogFormat.LogWriter writer =
|
||||||
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
|
new AggregatedLogFormat.LogWriter()) {
|
||||||
|
writer.initialize(configuration, path, ugi);
|
||||||
writer.writeApplicationOwner(ugi.getUserName());
|
writer.writeApplicationOwner(ugi.getUserName());
|
||||||
|
|
||||||
Map<ApplicationAccessType, String> appAcls =
|
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
|
||||||
new HashMap<ApplicationAccessType, String>();
|
|
||||||
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
|
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
|
||||||
writer.writeApplicationACLs(appAcls);
|
writer.writeApplicationACLs(appAcls);
|
||||||
writer.append(new AggregatedLogFormat.LogKey(containerId),
|
writer.append(new AggregatedLogFormat.LogKey(containerId),
|
||||||
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName()));
|
UserGroupInformation.getCurrentUser().getShortUserName()));
|
||||||
writer.close();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
|
private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
|
||||||
|
@ -1365,12 +1365,12 @@ public class TestLogsCLI {
|
||||||
Path path =
|
Path path =
|
||||||
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
|
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
|
||||||
+ System.currentTimeMillis());
|
+ System.currentTimeMillis());
|
||||||
AggregatedLogFormat.LogWriter writer =
|
try (AggregatedLogFormat.LogWriter writer =
|
||||||
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
|
new AggregatedLogFormat.LogWriter()) {
|
||||||
|
writer.initialize(configuration, path, ugi);
|
||||||
writer.writeApplicationOwner(ugi.getUserName());
|
writer.writeApplicationOwner(ugi.getUserName());
|
||||||
|
|
||||||
Map<ApplicationAccessType, String> appAcls =
|
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
|
||||||
new HashMap<ApplicationAccessType, String>();
|
|
||||||
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
|
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
|
||||||
writer.writeApplicationACLs(appAcls);
|
writer.writeApplicationACLs(appAcls);
|
||||||
DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
|
DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
|
||||||
|
@ -1379,9 +1379,9 @@ public class TestLogsCLI {
|
||||||
out = writer.getWriter().prepareAppendValue(-1);
|
out = writer.getWriter().prepareAppendValue(-1);
|
||||||
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
|
UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
|
||||||
new HashSet<File>());
|
new HashSet<>());
|
||||||
out.close();
|
out.close();
|
||||||
writer.close();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private YarnClient createMockYarnClient(YarnApplicationState appState,
|
private YarnClient createMockYarnClient(YarnApplicationState appState,
|
||||||
|
|
|
@ -444,13 +444,22 @@ public class AggregatedLogFormat {
|
||||||
* The writer that writes out the aggregated logs.
|
* The writer that writes out the aggregated logs.
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
public static class LogWriter {
|
public static class LogWriter implements AutoCloseable {
|
||||||
|
|
||||||
private final FSDataOutputStream fsDataOStream;
|
private FSDataOutputStream fsDataOStream;
|
||||||
private final TFile.Writer writer;
|
private TFile.Writer writer;
|
||||||
private FileContext fc;
|
private FileContext fc;
|
||||||
|
|
||||||
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
|
/**
|
||||||
|
* Initialize the LogWriter.
|
||||||
|
* Must be called just after the instance is created.
|
||||||
|
* @param conf Configuration
|
||||||
|
* @param remoteAppLogFile remote log file path
|
||||||
|
* @param userUgi Ugi of the user
|
||||||
|
* @throws IOException Failed to initialize
|
||||||
|
*/
|
||||||
|
public void initialize(final Configuration conf,
|
||||||
|
final Path remoteAppLogFile,
|
||||||
UserGroupInformation userUgi) throws IOException {
|
UserGroupInformation userUgi) throws IOException {
|
||||||
try {
|
try {
|
||||||
this.fsDataOStream =
|
this.fsDataOStream =
|
||||||
|
@ -529,12 +538,15 @@ public class AggregatedLogFormat {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
if (writer != null) {
|
||||||
try {
|
try {
|
||||||
this.writer.close();
|
writer.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Exception closing writer", e);
|
LOG.warn("Exception closing writer", e);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
IOUtils.closeStream(fsDataOStream);
|
IOUtils.closeStream(fsDataOStream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,8 +140,8 @@ public class TestAggregatedLogFormat {
|
||||||
final int ch = filler;
|
final int ch = filler;
|
||||||
|
|
||||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||||
LogWriter logWriter = new LogWriter(new Configuration(), remoteAppLogFile,
|
try (LogWriter logWriter = new LogWriter()) {
|
||||||
ugi);
|
logWriter.initialize(new Configuration(), remoteAppLogFile, ugi);
|
||||||
|
|
||||||
LogKey logKey = new LogKey(testContainerId);
|
LogKey logKey = new LogKey(testContainerId);
|
||||||
LogValue logValue =
|
LogValue logValue =
|
||||||
|
@ -177,7 +177,7 @@ public class TestAggregatedLogFormat {
|
||||||
|
|
||||||
//Aggregate The Logs
|
//Aggregate The Logs
|
||||||
logWriter.append(logKey, logValue);
|
logWriter.append(logKey, logValue);
|
||||||
logWriter.close();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -216,22 +216,23 @@ public class TestAggregatedLogFormat {
|
||||||
writeSrcFile(srcFilePath, "stdout", numChars);
|
writeSrcFile(srcFilePath, "stdout", numChars);
|
||||||
|
|
||||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||||
LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
|
try (LogWriter logWriter = new LogWriter()) {
|
||||||
|
logWriter.initialize(conf, remoteAppLogFile, ugi);
|
||||||
|
|
||||||
LogKey logKey = new LogKey(testContainerId);
|
LogKey logKey = new LogKey(testContainerId);
|
||||||
LogValue logValue =
|
LogValue logValue =
|
||||||
new LogValue(Collections.singletonList(srcFileRoot.toString()),
|
new LogValue(Collections.singletonList(srcFileRoot.toString()),
|
||||||
testContainerId, ugi.getShortUserName());
|
testContainerId, ugi.getShortUserName());
|
||||||
|
|
||||||
// When we try to open FileInputStream for stderr, it will throw out an IOException.
|
// When we try to open FileInputStream for stderr, it will throw out an
|
||||||
// Skip the log aggregation for stderr.
|
// IOException. Skip the log aggregation for stderr.
|
||||||
LogValue spyLogValue = spy(logValue);
|
LogValue spyLogValue = spy(logValue);
|
||||||
File errorFile = new File((new Path(srcFilePath, "stderr")).toString());
|
File errorFile = new File((new Path(srcFilePath, "stderr")).toString());
|
||||||
doThrow(new IOException("Mock can not open FileInputStream")).when(
|
doThrow(new IOException("Mock can not open FileInputStream")).when(
|
||||||
spyLogValue).secureOpenFile(errorFile);
|
spyLogValue).secureOpenFile(errorFile);
|
||||||
|
|
||||||
logWriter.append(logKey, spyLogValue);
|
logWriter.append(logKey, spyLogValue);
|
||||||
logWriter.close();
|
}
|
||||||
|
|
||||||
// make sure permission are correct on the file
|
// make sure permission are correct on the file
|
||||||
FileStatus fsStatus = fs.getFileStatus(remoteAppLogFile);
|
FileStatus fsStatus = fs.getFileStatus(remoteAppLogFile);
|
||||||
|
@ -311,7 +312,8 @@ public class TestAggregatedLogFormat {
|
||||||
|
|
||||||
UserGroupInformation ugi =
|
UserGroupInformation ugi =
|
||||||
UserGroupInformation.getCurrentUser();
|
UserGroupInformation.getCurrentUser();
|
||||||
LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
|
try (LogWriter logWriter = new LogWriter()) {
|
||||||
|
logWriter.initialize(conf, remoteAppLogFile, ugi);
|
||||||
|
|
||||||
LogKey logKey = new LogKey(testContainerId1);
|
LogKey logKey = new LogKey(testContainerId1);
|
||||||
String randomUser = "randomUser";
|
String randomUser = "randomUser";
|
||||||
|
@ -321,13 +323,12 @@ public class TestAggregatedLogFormat {
|
||||||
|
|
||||||
// It is trying simulate a situation where first log file is owned by
|
// It is trying simulate a situation where first log file is owned by
|
||||||
// different user (probably symlink) and second one by the user itself.
|
// different user (probably symlink) and second one by the user itself.
|
||||||
// The first file should not be aggregated. Because this log file has the invalid
|
// The first file should not be aggregated. Because this log file has
|
||||||
// user name.
|
// the invalid user name.
|
||||||
when(logValue.getUser()).thenReturn(randomUser).thenReturn(
|
when(logValue.getUser()).thenReturn(randomUser).thenReturn(
|
||||||
ugi.getShortUserName());
|
ugi.getShortUserName());
|
||||||
logWriter.append(logKey, logValue);
|
logWriter.append(logKey, logValue);
|
||||||
|
}
|
||||||
logWriter.close();
|
|
||||||
|
|
||||||
BufferedReader in =
|
BufferedReader in =
|
||||||
new BufferedReader(new FileReader(new File(remoteAppLogFile
|
new BufferedReader(new FileReader(new File(remoteAppLogFile
|
||||||
|
|
|
@ -295,17 +295,20 @@ public class TestAggregatedLogsBlock {
|
||||||
List<String> rootLogDirs = Arrays.asList("target/logs/logs");
|
List<String> rootLogDirs = Arrays.asList("target/logs/logs");
|
||||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||||
|
|
||||||
AggregatedLogFormat.LogWriter writer = new AggregatedLogFormat.LogWriter(
|
try (AggregatedLogFormat.LogWriter writer =
|
||||||
configuration, new Path(path), ugi);
|
new AggregatedLogFormat.LogWriter()) {
|
||||||
|
writer.initialize(configuration, new Path(path), ugi);
|
||||||
writer.writeApplicationOwner(ugi.getUserName());
|
writer.writeApplicationOwner(ugi.getUserName());
|
||||||
|
|
||||||
Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
|
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
|
||||||
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
|
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
|
||||||
writer.writeApplicationACLs(appAcls);
|
writer.writeApplicationACLs(appAcls);
|
||||||
|
|
||||||
writer.append(new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
|
writer.append(
|
||||||
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,UserGroupInformation.getCurrentUser().getShortUserName()));
|
new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
|
||||||
writer.close();
|
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeLogs(String dirName) throws Exception {
|
private void writeLogs(String dirName) throws Exception {
|
||||||
|
|
|
@ -110,13 +110,14 @@ public final class TestContainerLogsUtils {
|
||||||
ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
|
ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
|
||||||
Path path =
|
Path path =
|
||||||
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
|
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
|
||||||
AggregatedLogFormat.LogWriter writer =
|
try (AggregatedLogFormat.LogWriter writer =
|
||||||
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
|
new AggregatedLogFormat.LogWriter()) {
|
||||||
|
writer.initialize(configuration, path, ugi);
|
||||||
writer.writeApplicationOwner(ugi.getUserName());
|
writer.writeApplicationOwner(ugi.getUserName());
|
||||||
|
|
||||||
writer.append(new AggregatedLogFormat.LogKey(containerId),
|
writer.append(new AggregatedLogFormat.LogKey(containerId),
|
||||||
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
||||||
ugi.getShortUserName()));
|
ugi.getShortUserName()));
|
||||||
writer.close();
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -295,18 +295,18 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LogWriter writer = null;
|
|
||||||
String diagnosticMessage = "";
|
|
||||||
boolean logAggregationSucceedInThisCycle = true;
|
|
||||||
try {
|
|
||||||
if (pendingContainerInThisCycle.isEmpty()) {
|
if (pendingContainerInThisCycle.isEmpty()) {
|
||||||
|
sendLogAggregationReport(true, "", appFinished);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
logAggregationTimes++;
|
logAggregationTimes++;
|
||||||
|
String diagnosticMessage = "";
|
||||||
|
boolean logAggregationSucceedInThisCycle = true;
|
||||||
|
try (LogWriter writer = createLogWriter()) {
|
||||||
try {
|
try {
|
||||||
writer = createLogWriter();
|
writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp,
|
||||||
|
this.userUgi);
|
||||||
// Write ACLs once when the writer is created.
|
// Write ACLs once when the writer is created.
|
||||||
writer.writeApplicationACLs(appAcls);
|
writer.writeApplicationACLs(appAcls);
|
||||||
writer.writeApplicationOwner(this.userUgi.getShortUserName());
|
writer.writeApplicationOwner(this.userUgi.getShortUserName());
|
||||||
|
@ -351,11 +351,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
cleanupOldLogTimes++;
|
cleanupOldLogTimes++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (writer != null) {
|
|
||||||
writer.close();
|
|
||||||
writer = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
long currentTime = System.currentTimeMillis();
|
long currentTime = System.currentTimeMillis();
|
||||||
final Path renamedPath = this.rollingMonitorInterval <= 0
|
final Path renamedPath = this.rollingMonitorInterval <= 0
|
||||||
? remoteNodeLogFileForApp : new Path(
|
? remoteNodeLogFileForApp : new Path(
|
||||||
|
@ -396,11 +391,24 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
logAggregationSucceedInThisCycle = false;
|
logAggregationSucceedInThisCycle = false;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
sendLogAggregationReport(logAggregationSucceedInThisCycle,
|
||||||
|
diagnosticMessage, appFinished);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected LogWriter createLogWriter() {
|
||||||
|
return new LogWriter();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendLogAggregationReport(
|
||||||
|
boolean logAggregationSucceedInThisCycle, String diagnosticMessage,
|
||||||
|
boolean appFinished) {
|
||||||
LogAggregationStatus logAggregationStatus =
|
LogAggregationStatus logAggregationStatus =
|
||||||
logAggregationSucceedInThisCycle
|
logAggregationSucceedInThisCycle
|
||||||
? LogAggregationStatus.RUNNING
|
? LogAggregationStatus.RUNNING
|
||||||
: LogAggregationStatus.RUNNING_WITH_FAILURE;
|
: LogAggregationStatus.RUNNING_WITH_FAILURE;
|
||||||
sendLogAggregationReport(logAggregationStatus, diagnosticMessage);
|
sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage);
|
||||||
if (appFinished) {
|
if (appFinished) {
|
||||||
// If the app is finished, one extra final report with log aggregation
|
// If the app is finished, one extra final report with log aggregation
|
||||||
// status SUCCEEDED/FAILED will be sent to RM to inform the RM
|
// status SUCCEEDED/FAILED will be sent to RM to inform the RM
|
||||||
|
@ -409,21 +417,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
|
renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
|
||||||
? LogAggregationStatus.FAILED
|
? LogAggregationStatus.FAILED
|
||||||
: LogAggregationStatus.SUCCEEDED;
|
: LogAggregationStatus.SUCCEEDED;
|
||||||
sendLogAggregationReport(finalLogAggregationStatus, "");
|
sendLogAggregationReportInternal(finalLogAggregationStatus, "");
|
||||||
}
|
|
||||||
|
|
||||||
if (writer != null) {
|
|
||||||
writer.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected LogWriter createLogWriter() throws IOException {
|
private void sendLogAggregationReportInternal(
|
||||||
return new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
|
|
||||||
this.userUgi);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendLogAggregationReport(
|
|
||||||
LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
|
LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
|
||||||
LogAggregationReport report =
|
LogAggregationReport report =
|
||||||
Records.newRecord(LogAggregationReport.class);
|
Records.newRecord(LogAggregationReport.class);
|
||||||
|
|
|
@ -416,8 +416,7 @@ public class TestAppLogAggregatorImpl {
|
||||||
logAggregationContext, context, lfs, -1, recoveredLogInitedTime);
|
logAggregationContext, context, lfs, -1, recoveredLogInitedTime);
|
||||||
this.applicationId = appId;
|
this.applicationId = appId;
|
||||||
this.deletionService = deletionService;
|
this.deletionService = deletionService;
|
||||||
|
this.logWriter = spy(new LogWriter());
|
||||||
this.logWriter = getSpiedLogWriter(conf, ugi, remoteNodeLogFileForApp);
|
|
||||||
this.logValue = ArgumentCaptor.forClass(LogValue.class);
|
this.logValue = ArgumentCaptor.forClass(LogValue.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -425,10 +424,5 @@ public class TestAppLogAggregatorImpl {
|
||||||
protected LogWriter createLogWriter() {
|
protected LogWriter createLogWriter() {
|
||||||
return this.logWriter;
|
return this.logWriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
private LogWriter getSpiedLogWriter(Configuration conf,
|
|
||||||
UserGroupInformation ugi, Path remoteAppLogFile) throws IOException {
|
|
||||||
return spy(new LogWriter(conf, remoteAppLogFile, ugi));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue