YARN-5237. Fix missing log files issue in rolling log aggregation. Contributed by Xuan Gong.

This commit is contained in:
Junping Du 2016-06-16 07:18:36 -07:00
parent ea2e7321d6
commit 810470508b
3 changed files with 134 additions and 22 deletions

View File

@ -168,17 +168,20 @@ public class AggregatedLogFormat {
private final Set<String> alreadyUploadedLogFiles; private final Set<String> alreadyUploadedLogFiles;
private Set<String> allExistingFileMeta = new HashSet<String>(); private Set<String> allExistingFileMeta = new HashSet<String>();
private final boolean appFinished; private final boolean appFinished;
private final boolean containerFinished;
// TODO Maybe add a version string here. Instead of changing the version of // TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format // the entire k-v format
public LogValue(List<String> rootLogDirs, ContainerId containerId, public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user) { String user) {
this(rootLogDirs, containerId, user, null, new HashSet<String>(), true); this(rootLogDirs, containerId, user, null, new HashSet<String>(), true,
true);
} }
public LogValue(List<String> rootLogDirs, ContainerId containerId, public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user, LogAggregationContext logAggregationContext, String user, LogAggregationContext logAggregationContext,
Set<String> alreadyUploadedLogFiles, boolean appFinished) { Set<String> alreadyUploadedLogFiles, boolean appFinished,
boolean containerFinished) {
this.rootLogDirs = new ArrayList<String>(rootLogDirs); this.rootLogDirs = new ArrayList<String>(rootLogDirs);
this.containerId = containerId; this.containerId = containerId;
this.user = user; this.user = user;
@ -188,6 +191,7 @@ public class AggregatedLogFormat {
this.logAggregationContext = logAggregationContext; this.logAggregationContext = logAggregationContext;
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
this.appFinished = appFinished; this.appFinished = appFinished;
this.containerFinished = containerFinished;
} }
private Set<File> getPendingLogFilesToUploadForThisContainer() { private Set<File> getPendingLogFilesToUploadForThisContainer() {
@ -294,28 +298,39 @@ public class AggregatedLogFormat {
this.allExistingFileMeta.add(getLogFileMetaData(logFile)); this.allExistingFileMeta.add(getLogFileMetaData(logFile));
} }
Set<File> fileCandidates = new HashSet<File>(candidates);
if (this.logAggregationContext != null && candidates.size() > 0) { if (this.logAggregationContext != null && candidates.size() > 0) {
filterFiles( fileCandidates = getFileCandidates(fileCandidates, this.appFinished);
this.appFinished ? this.logAggregationContext.getIncludePattern() if (!this.appFinished && this.containerFinished) {
Set<File> addition = new HashSet<File>(candidates);
addition = getFileCandidates(addition, true);
fileCandidates.addAll(addition);
}
}
return fileCandidates;
}
private Set<File> getFileCandidates(Set<File> candidates,
boolean useRegularPattern) {
filterFiles(
useRegularPattern ? this.logAggregationContext.getIncludePattern()
: this.logAggregationContext.getRolledLogsIncludePattern(), : this.logAggregationContext.getRolledLogsIncludePattern(),
candidates, false); candidates, false);
filterFiles( filterFiles(
this.appFinished ? this.logAggregationContext.getExcludePattern() useRegularPattern ? this.logAggregationContext.getExcludePattern()
: this.logAggregationContext.getRolledLogsExcludePattern(), : this.logAggregationContext.getRolledLogsExcludePattern(),
candidates, true); candidates, true);
Iterable<File> mask = Iterable<File> mask =
Iterables.filter(candidates, new Predicate<File>() { Iterables.filter(candidates, new Predicate<File>() {
@Override @Override
public boolean apply(File next) { public boolean apply(File next) {
return !alreadyUploadedLogFiles return !alreadyUploadedLogFiles
.contains(getLogFileMetaData(next)); .contains(getLogFileMetaData(next));
} }
}); });
candidates = Sets.newHashSet(mask); return Sets.newHashSet(mask);
}
return candidates;
} }
private void filterFiles(String pattern, Set<File> candidates, private void filterFiles(String pattern, Set<File> candidates,

View File

@ -348,7 +348,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
containerLogAggregators.put(container, aggregator); containerLogAggregators.put(container, aggregator);
} }
Set<Path> uploadedFilePathsInThisCycle = Set<Path> uploadedFilePathsInThisCycle =
aggregator.doContainerLogAggregation(writer, appFinished); aggregator.doContainerLogAggregation(writer, appFinished,
finishedContainers.contains(container));
if (uploadedFilePathsInThisCycle.size() > 0) { if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true; uploadedLogsInThisCycle = true;
this.delService.delete(this.userUgi.getShortUserName(), null, this.delService.delete(this.userUgi.getShortUserName(), null,
@ -650,15 +651,15 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
} }
public Set<Path> doContainerLogAggregation(LogWriter writer, public Set<Path> doContainerLogAggregation(LogWriter writer,
boolean appFinished) { boolean appFinished, boolean containerFinished) {
LOG.info("Uploading logs for container " + containerId LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are " + ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirsForRead())); + StringUtils.join(",", dirsHandler.getLogDirsForRead()));
final LogKey logKey = new LogKey(containerId); final LogKey logKey = new LogKey(containerId);
final LogValue logValue = final LogValue logValue =
new LogValue(dirsHandler.getLogDirsForRead(), containerId, new LogValue(dirsHandler.getLogDirsForRead(), containerId,
userUgi.getShortUserName(), logAggregationContext, userUgi.getShortUserName(), logAggregationContext,
this.uploadedFileMeta, appFinished); this.uploadedFileMeta, appFinished, containerFinished);
try { try {
writer.append(logKey, logValue); writer.append(logKey, logValue);
} catch (Exception e) { } catch (Exception e) {

View File

@ -1435,6 +1435,102 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
"getApplicationID"); "getApplicationID");
} }
@SuppressWarnings("resource")
@Test (timeout = 50000)
public void testLogAggregationServiceWithPatternsAndIntervals()
throws Exception {
LogAggregationContext logAggregationContext =
Records.newRecord(LogAggregationContext.class);
// set IncludePattern and RolledLogsIncludePattern.
// When the app is running, we only aggregate the log with
// the name stdout. After the app finishes, we only aggregate
// the log with the name std_final.
logAggregationContext.setRolledLogsIncludePattern("stdout");
logAggregationContext.setIncludePattern("std_final");
this.conf.set(
YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
//configure YarnConfiguration.NM_REMOTE_APP_LOG_DIR to
//have fully qualified path
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.toURI().toString());
this.conf.setLong(
YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
3600);
this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
ApplicationId application =
BuilderUtils.newApplicationId(System.currentTimeMillis(), 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(application, 1);
ContainerId container = createContainer(appAttemptId, 1,
ContainerType.APPLICATION_MASTER);
ConcurrentMap<ApplicationId, Application> maps =
this.context.getApplications();
Application app = mock(Application.class);
maps.put(application, app);
when(app.getContainers()).thenReturn(this.context.getContainers());
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, context, this.delSrvc,
super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
// AppLogDir should be created
File appLogDir =
new File(localLogDir, ConverterUtils.toString(application));
appLogDir.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(application,
this.user, null, this.acls, logAggregationContext));
// Simulate log-file creation
// create std_final in log directory which will not be aggregated
// until the app finishes.
String[] logFilesWithFinalLog =
new String[] {"stdout", "std_final"};
writeContainerLogs(appLogDir, container, logFilesWithFinalLog);
// Do log aggregation
AppLogAggregatorImpl aggregator =
(AppLogAggregatorImpl) logAggregationService.getAppLogAggregators()
.get(application);
aggregator.doLogAggregationOutOfBand();
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 1, false, null));
String[] logFiles = new String[] { "stdout" };
verifyContainerLogs(logAggregationService, application,
new ContainerId[] {container}, logFiles, 1, true);
logAggregationService.handle(
new LogHandlerContainerFinishedEvent(container, 0));
dispatcher.await();
// Do the log aggregation after ContainerFinishedEvent but before
// AppFinishedEvent. The std_final is expected to be aggregated this time
// even if the app is running but the container finishes.
aggregator.doLogAggregationOutOfBand();
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 2, false, null));
// This container finishes.
// The log "std_final" should be aggregated this time.
String[] logFinalLog = new String[] {"std_final"};
verifyContainerLogs(logAggregationService, application,
new ContainerId[] {container}, logFinalLog, 1, true);
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
logAggregationService.stop();
}
@Test (timeout = 50000) @Test (timeout = 50000)
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testNoneContainerPolicy() throws Exception { public void testNoneContainerPolicy() throws Exception {
@ -1443,14 +1539,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
LogAggregationService logAggregationService = createLogAggregationService( LogAggregationService logAggregationService = createLogAggregationService(
appId, NoneContainerLogAggregationPolicy.class, null); appId, NoneContainerLogAggregationPolicy.class, null);
String[] logFiles = new String[] { "stdout" }; String[] logFiles = new String[] {"stdout"};
ContainerId container1 = finishContainer(appId, logAggregationService, ContainerId container1 = finishContainer(appId, logAggregationService,
ContainerType.APPLICATION_MASTER, 1, 0, logFiles); ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
finishApplication(appId, logAggregationService); finishApplication(appId, logAggregationService);
verifyContainerLogs(logAggregationService, appId, verifyContainerLogs(logAggregationService, appId,
new ContainerId[] { container1 }, logFiles, 0, false); new ContainerId[] {container1}, logFiles, 0, false);
verifyLogAggFinishEvent(appId); verifyLogAggFinishEvent(appId);
} }