YARN-5237. Fix missing log files issue in rolling log aggregation. Contributed by Xuan Gong.

(cherry picked from commit 5dfc38ff57)
This commit is contained in:
Junping Du 2016-06-15 16:17:54 -07:00
parent 14b849489a
commit 6dd34baf3c
3 changed files with 134 additions and 22 deletions

View File

@ -168,6 +168,7 @@ public static class LogValue {
private final Set<String> alreadyUploadedLogFiles;
private Set<String> allExistingFileMeta = new HashSet<String>();
private final boolean appFinished;
private final boolean containerFinished;
/**
* The retention context to determine if log files are older than
@ -186,13 +187,14 @@ public static class LogValue {
public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user) {
this(rootLogDirs, containerId, user, null, new HashSet<String>(),
null, true);
null, true, true);
}
public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user, LogAggregationContext logAggregationContext,
Set<String> alreadyUploadedLogFiles,
LogRetentionContext retentionContext, boolean appFinished) {
LogRetentionContext retentionContext, boolean appFinished,
boolean containerFinished) {
this.rootLogDirs = new ArrayList<String>(rootLogDirs);
this.containerId = containerId;
this.user = user;
@ -202,6 +204,7 @@ public LogValue(List<String> rootLogDirs, ContainerId containerId,
this.logAggregationContext = logAggregationContext;
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
this.appFinished = appFinished;
this.containerFinished = containerFinished;
this.logRetentionContext = retentionContext;
}
@ -318,29 +321,40 @@ private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
return candidates;
}
Set<File> fileCandidates = new HashSet<File>(candidates);
if (this.logAggregationContext != null && candidates.size() > 0) {
filterFiles(
this.appFinished ? this.logAggregationContext.getIncludePattern()
fileCandidates = getFileCandidates(fileCandidates, this.appFinished);
if (!this.appFinished && this.containerFinished) {
Set<File> addition = new HashSet<File>(candidates);
addition = getFileCandidates(addition, true);
fileCandidates.addAll(addition);
}
}
return fileCandidates;
}
private Set<File> getFileCandidates(Set<File> candidates,
boolean useRegularPattern) {
filterFiles(
useRegularPattern ? this.logAggregationContext.getIncludePattern()
: this.logAggregationContext.getRolledLogsIncludePattern(),
candidates, false);
filterFiles(
this.appFinished ? this.logAggregationContext.getExcludePattern()
filterFiles(
useRegularPattern ? this.logAggregationContext.getExcludePattern()
: this.logAggregationContext.getRolledLogsExcludePattern(),
candidates, true);
Iterable<File> mask =
Iterables.filter(candidates, new Predicate<File>() {
@Override
public boolean apply(File next) {
return !alreadyUploadedLogFiles
Iterable<File> mask =
Iterables.filter(candidates, new Predicate<File>() {
@Override
public boolean apply(File next) {
return !alreadyUploadedLogFiles
.contains(getLogFileMetaData(next));
}
});
candidates = Sets.newHashSet(mask);
}
return candidates;
}
});
return Sets.newHashSet(mask);
}
private void filterFiles(String pattern, Set<File> candidates,

View File

@ -328,7 +328,8 @@ private void uploadLogsForContainers(boolean appFinished) {
containerLogAggregators.put(container, aggregator);
}
Set<Path> uploadedFilePathsInThisCycle =
aggregator.doContainerLogAggregation(writer, appFinished);
aggregator.doContainerLogAggregation(writer, appFinished,
finishedContainers.contains(container));
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
this.delService.delete(this.userUgi.getShortUserName(), null,
@ -643,7 +644,7 @@ private AggregatedLogFormat.LogRetentionContext getRetentionContext() {
}
public Set<Path> doContainerLogAggregation(LogWriter writer,
boolean appFinished) {
boolean appFinished, boolean containerFinished) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirsForRead()));
@ -651,7 +652,8 @@ public Set<Path> doContainerLogAggregation(LogWriter writer,
final LogValue logValue =
new LogValue(dirsHandler.getLogDirsForRead(), containerId,
userUgi.getShortUserName(), logAggregationContext,
this.uploadedFileMeta, retentionContext, appFinished);
this.uploadedFileMeta, retentionContext, appFinished,
containerFinished);
try {
writer.append(logKey, logValue);
} catch (Exception e) {

View File

@ -1575,6 +1575,102 @@ public void testLogAggregationServiceWithPatterns() throws Exception {
"getApplicationID");
}
@SuppressWarnings("resource")
@Test (timeout = 50000)
public void testLogAggregationServiceWithPatternsAndIntervals()
throws Exception {
LogAggregationContext logAggregationContext =
Records.newRecord(LogAggregationContext.class);
// set IncludePattern and RolledLogsIncludePattern.
// When the app is running, we only aggregate the log with
// the name stdout. After the app finishes, we only aggregate
// the log with the name std_final.
logAggregationContext.setRolledLogsIncludePattern("stdout");
logAggregationContext.setIncludePattern("std_final");
this.conf.set(
YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
//configure YarnConfiguration.NM_REMOTE_APP_LOG_DIR to
//have fully qualified path
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.toURI().toString());
this.conf.setLong(
YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
3600);
this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
ApplicationId application =
BuilderUtils.newApplicationId(System.currentTimeMillis(), 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(application, 1);
ContainerId container = createContainer(appAttemptId, 1,
ContainerType.APPLICATION_MASTER);
ConcurrentMap<ApplicationId, Application> maps =
this.context.getApplications();
Application app = mock(Application.class);
maps.put(application, app);
when(app.getContainers()).thenReturn(this.context.getContainers());
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, context, this.delSrvc,
super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
// AppLogDir should be created
File appLogDir =
new File(localLogDir, ConverterUtils.toString(application));
appLogDir.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(application,
this.user, null, this.acls, logAggregationContext));
// Simulate log-file creation
// create std_final in log directory which will not be aggregated
// until the app finishes.
String[] logFilesWithFinalLog =
new String[] {"stdout", "std_final"};
writeContainerLogs(appLogDir, container, logFilesWithFinalLog);
// Do log aggregation
AppLogAggregatorImpl aggregator =
(AppLogAggregatorImpl) logAggregationService.getAppLogAggregators()
.get(application);
aggregator.doLogAggregationOutOfBand();
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 1, false, null));
String[] logFiles = new String[] { "stdout" };
verifyContainerLogs(logAggregationService, application,
new ContainerId[] {container}, logFiles, 1, true);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container, 0));
dispatcher.await();
// Do the log aggregation after ContainerFinishedEvent but before
// AppFinishedEvent. The std_final is expected to be aggregated this time
// even if the app is running but the container finishes.
aggregator.doLogAggregationOutOfBand();
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 2, false, null));
// This container finishes.
// The log "std_final" should be aggregated this time.
String[] logFinalLog = new String[] {"std_final"};
verifyContainerLogs(logAggregationService, application,
new ContainerId[] {container}, logFinalLog, 1, true);
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
logAggregationService.stop();
}
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testNoneContainerPolicy() throws Exception {
@ -1583,14 +1679,14 @@ public void testNoneContainerPolicy() throws Exception {
LogAggregationService logAggregationService = createLogAggregationService(
appId, NoneContainerLogAggregationPolicy.class, null);
String[] logFiles = new String[] { "stdout" };
String[] logFiles = new String[] {"stdout"};
ContainerId container1 = finishContainer(appId, logAggregationService,
ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container1 }, logFiles, 0, false);
new ContainerId[] {container1}, logFiles, 0, false);
verifyLogAggFinishEvent(appId);
}