YARN-5237. Fix missing log files issue in rolling log aggregation. Contributed by Xuan Gong.
This commit is contained in:
parent
6f0aa75121
commit
5dfc38ff57
|
@ -168,6 +168,7 @@ public class AggregatedLogFormat {
|
||||||
private final Set<String> alreadyUploadedLogFiles;
|
private final Set<String> alreadyUploadedLogFiles;
|
||||||
private Set<String> allExistingFileMeta = new HashSet<String>();
|
private Set<String> allExistingFileMeta = new HashSet<String>();
|
||||||
private final boolean appFinished;
|
private final boolean appFinished;
|
||||||
|
private final boolean containerFinished;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The retention context to determine if log files are older than
|
* The retention context to determine if log files are older than
|
||||||
|
@ -186,13 +187,14 @@ public class AggregatedLogFormat {
|
||||||
public LogValue(List<String> rootLogDirs, ContainerId containerId,
|
public LogValue(List<String> rootLogDirs, ContainerId containerId,
|
||||||
String user) {
|
String user) {
|
||||||
this(rootLogDirs, containerId, user, null, new HashSet<String>(),
|
this(rootLogDirs, containerId, user, null, new HashSet<String>(),
|
||||||
null, true);
|
null, true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public LogValue(List<String> rootLogDirs, ContainerId containerId,
|
public LogValue(List<String> rootLogDirs, ContainerId containerId,
|
||||||
String user, LogAggregationContext logAggregationContext,
|
String user, LogAggregationContext logAggregationContext,
|
||||||
Set<String> alreadyUploadedLogFiles,
|
Set<String> alreadyUploadedLogFiles,
|
||||||
LogRetentionContext retentionContext, boolean appFinished) {
|
LogRetentionContext retentionContext, boolean appFinished,
|
||||||
|
boolean containerFinished) {
|
||||||
this.rootLogDirs = new ArrayList<String>(rootLogDirs);
|
this.rootLogDirs = new ArrayList<String>(rootLogDirs);
|
||||||
this.containerId = containerId;
|
this.containerId = containerId;
|
||||||
this.user = user;
|
this.user = user;
|
||||||
|
@ -202,6 +204,7 @@ public class AggregatedLogFormat {
|
||||||
this.logAggregationContext = logAggregationContext;
|
this.logAggregationContext = logAggregationContext;
|
||||||
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
|
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
|
||||||
this.appFinished = appFinished;
|
this.appFinished = appFinished;
|
||||||
|
this.containerFinished = containerFinished;
|
||||||
this.logRetentionContext = retentionContext;
|
this.logRetentionContext = retentionContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -318,14 +321,28 @@ public class AggregatedLogFormat {
|
||||||
return candidates;
|
return candidates;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Set<File> fileCandidates = new HashSet<File>(candidates);
|
||||||
if (this.logAggregationContext != null && candidates.size() > 0) {
|
if (this.logAggregationContext != null && candidates.size() > 0) {
|
||||||
|
fileCandidates = getFileCandidates(fileCandidates, this.appFinished);
|
||||||
|
if (!this.appFinished && this.containerFinished) {
|
||||||
|
Set<File> addition = new HashSet<File>(candidates);
|
||||||
|
addition = getFileCandidates(addition, true);
|
||||||
|
fileCandidates.addAll(addition);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fileCandidates;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<File> getFileCandidates(Set<File> candidates,
|
||||||
|
boolean useRegularPattern) {
|
||||||
filterFiles(
|
filterFiles(
|
||||||
this.appFinished ? this.logAggregationContext.getIncludePattern()
|
useRegularPattern ? this.logAggregationContext.getIncludePattern()
|
||||||
: this.logAggregationContext.getRolledLogsIncludePattern(),
|
: this.logAggregationContext.getRolledLogsIncludePattern(),
|
||||||
candidates, false);
|
candidates, false);
|
||||||
|
|
||||||
filterFiles(
|
filterFiles(
|
||||||
this.appFinished ? this.logAggregationContext.getExcludePattern()
|
useRegularPattern ? this.logAggregationContext.getExcludePattern()
|
||||||
: this.logAggregationContext.getRolledLogsExcludePattern(),
|
: this.logAggregationContext.getRolledLogsExcludePattern(),
|
||||||
candidates, true);
|
candidates, true);
|
||||||
|
|
||||||
|
@ -337,10 +354,7 @@ public class AggregatedLogFormat {
|
||||||
.contains(getLogFileMetaData(next));
|
.contains(getLogFileMetaData(next));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
candidates = Sets.newHashSet(mask);
|
return Sets.newHashSet(mask);
|
||||||
}
|
|
||||||
|
|
||||||
return candidates;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void filterFiles(String pattern, Set<File> candidates,
|
private void filterFiles(String pattern, Set<File> candidates,
|
||||||
|
|
|
@ -328,7 +328,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
containerLogAggregators.put(container, aggregator);
|
containerLogAggregators.put(container, aggregator);
|
||||||
}
|
}
|
||||||
Set<Path> uploadedFilePathsInThisCycle =
|
Set<Path> uploadedFilePathsInThisCycle =
|
||||||
aggregator.doContainerLogAggregation(writer, appFinished);
|
aggregator.doContainerLogAggregation(writer, appFinished,
|
||||||
|
finishedContainers.contains(container));
|
||||||
if (uploadedFilePathsInThisCycle.size() > 0) {
|
if (uploadedFilePathsInThisCycle.size() > 0) {
|
||||||
uploadedLogsInThisCycle = true;
|
uploadedLogsInThisCycle = true;
|
||||||
this.delService.delete(this.userUgi.getShortUserName(), null,
|
this.delService.delete(this.userUgi.getShortUserName(), null,
|
||||||
|
@ -643,7 +644,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<Path> doContainerLogAggregation(LogWriter writer,
|
public Set<Path> doContainerLogAggregation(LogWriter writer,
|
||||||
boolean appFinished) {
|
boolean appFinished, boolean containerFinished) {
|
||||||
LOG.info("Uploading logs for container " + containerId
|
LOG.info("Uploading logs for container " + containerId
|
||||||
+ ". Current good log dirs are "
|
+ ". Current good log dirs are "
|
||||||
+ StringUtils.join(",", dirsHandler.getLogDirsForRead()));
|
+ StringUtils.join(",", dirsHandler.getLogDirsForRead()));
|
||||||
|
@ -651,7 +652,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
final LogValue logValue =
|
final LogValue logValue =
|
||||||
new LogValue(dirsHandler.getLogDirsForRead(), containerId,
|
new LogValue(dirsHandler.getLogDirsForRead(), containerId,
|
||||||
userUgi.getShortUserName(), logAggregationContext,
|
userUgi.getShortUserName(), logAggregationContext,
|
||||||
this.uploadedFileMeta, retentionContext, appFinished);
|
this.uploadedFileMeta, retentionContext, appFinished,
|
||||||
|
containerFinished);
|
||||||
try {
|
try {
|
||||||
writer.append(logKey, logValue);
|
writer.append(logKey, logValue);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -1575,6 +1575,102 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
"getApplicationID");
|
"getApplicationID");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("resource")
|
||||||
|
@Test (timeout = 50000)
|
||||||
|
public void testLogAggregationServiceWithPatternsAndIntervals()
|
||||||
|
throws Exception {
|
||||||
|
LogAggregationContext logAggregationContext =
|
||||||
|
Records.newRecord(LogAggregationContext.class);
|
||||||
|
// set IncludePattern and RolledLogsIncludePattern.
|
||||||
|
// When the app is running, we only aggregate the log with
|
||||||
|
// the name stdout. After the app finishes, we only aggregate
|
||||||
|
// the log with the name std_final.
|
||||||
|
logAggregationContext.setRolledLogsIncludePattern("stdout");
|
||||||
|
logAggregationContext.setIncludePattern("std_final");
|
||||||
|
this.conf.set(
|
||||||
|
YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||||
|
//configure YarnConfiguration.NM_REMOTE_APP_LOG_DIR to
|
||||||
|
//have fully qualified path
|
||||||
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
|
this.remoteRootLogDir.toURI().toString());
|
||||||
|
this.conf.setLong(
|
||||||
|
YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
|
||||||
|
3600);
|
||||||
|
|
||||||
|
this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
|
||||||
|
|
||||||
|
ApplicationId application =
|
||||||
|
BuilderUtils.newApplicationId(System.currentTimeMillis(), 1);
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
BuilderUtils.newApplicationAttemptId(application, 1);
|
||||||
|
ContainerId container = createContainer(appAttemptId, 1,
|
||||||
|
ContainerType.APPLICATION_MASTER);
|
||||||
|
|
||||||
|
ConcurrentMap<ApplicationId, Application> maps =
|
||||||
|
this.context.getApplications();
|
||||||
|
Application app = mock(Application.class);
|
||||||
|
maps.put(application, app);
|
||||||
|
when(app.getContainers()).thenReturn(this.context.getContainers());
|
||||||
|
|
||||||
|
LogAggregationService logAggregationService =
|
||||||
|
new LogAggregationService(dispatcher, context, this.delSrvc,
|
||||||
|
super.dirsHandler);
|
||||||
|
|
||||||
|
logAggregationService.init(this.conf);
|
||||||
|
logAggregationService.start();
|
||||||
|
|
||||||
|
// AppLogDir should be created
|
||||||
|
File appLogDir =
|
||||||
|
new File(localLogDir, ConverterUtils.toString(application));
|
||||||
|
appLogDir.mkdir();
|
||||||
|
logAggregationService.handle(new LogHandlerAppStartedEvent(application,
|
||||||
|
this.user, null, this.acls, logAggregationContext));
|
||||||
|
|
||||||
|
// Simulate log-file creation
|
||||||
|
// create std_final in log directory which will not be aggregated
|
||||||
|
// until the app finishes.
|
||||||
|
String[] logFilesWithFinalLog =
|
||||||
|
new String[] {"stdout", "std_final"};
|
||||||
|
writeContainerLogs(appLogDir, container, logFilesWithFinalLog);
|
||||||
|
|
||||||
|
// Do log aggregation
|
||||||
|
AppLogAggregatorImpl aggregator =
|
||||||
|
(AppLogAggregatorImpl) logAggregationService.getAppLogAggregators()
|
||||||
|
.get(application);
|
||||||
|
|
||||||
|
aggregator.doLogAggregationOutOfBand();
|
||||||
|
|
||||||
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
|
50, 1, false, null));
|
||||||
|
|
||||||
|
String[] logFiles = new String[] { "stdout" };
|
||||||
|
verifyContainerLogs(logAggregationService, application,
|
||||||
|
new ContainerId[] {container}, logFiles, 1, true);
|
||||||
|
|
||||||
|
logAggregationService.handle(
|
||||||
|
new LogHandlerContainerFinishedEvent(container, 0));
|
||||||
|
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Do the log aggregation after ContainerFinishedEvent but before
|
||||||
|
// AppFinishedEvent. The std_final is expected to be aggregated this time
|
||||||
|
// even if the app is running but the container finishes.
|
||||||
|
aggregator.doLogAggregationOutOfBand();
|
||||||
|
|
||||||
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
|
50, 2, false, null));
|
||||||
|
|
||||||
|
// This container finishes.
|
||||||
|
// The log "std_final" should be aggregated this time.
|
||||||
|
String[] logFinalLog = new String[] {"std_final"};
|
||||||
|
verifyContainerLogs(logAggregationService, application,
|
||||||
|
new ContainerId[] {container}, logFinalLog, 1, true);
|
||||||
|
|
||||||
|
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
|
||||||
|
|
||||||
|
logAggregationService.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 50000)
|
@Test (timeout = 50000)
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testNoneContainerPolicy() throws Exception {
|
public void testNoneContainerPolicy() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue