From 0cd9eb99872147e06b8ee32e7fed0b25d06a1f80 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 12 Mar 2015 13:32:29 -0700 Subject: [PATCH] YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating running logs of application when rolling is enabled. Contributed by Xuan Gong. (cherry picked from commit 863079bb874ba77918ca1c0741eae10e245995c8) --- hadoop-yarn-project/CHANGES.txt | 3 + .../api/records/LogAggregationContext.java | 79 +++++++++++++++++-- .../src/main/proto/yarn_protos.proto | 2 + .../impl/pb/LogAggregationContextPBImpl.java | 39 +++++++++ .../logaggregation/AggregatedLogFormat.java | 39 ++++----- .../logaggregation/AppLogAggregatorImpl.java | 22 ++++-- .../TestContainerManagerRecovery.java | 10 ++- .../TestLogAggregationService.java | 73 ++++++++++++++--- .../capacity/TestContainerAllocation.java | 8 +- 9 files changed, 227 insertions(+), 48 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d12b8f6d11e..f8a110d3162 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -700,6 +700,9 @@ Release 2.7.0 - UNRELEASED YARN-3338. Exclude jline dependency from YARN. (Zhijie Shen via xgong) + YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating + running logs of application when rolling is enabled. (Xuan Gong via vinodkv) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java index 46c1809b417..e582d2c6e8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java @@ -32,11 +32,20 @@ import org.apache.hadoop.yarn.util.Records; * *

* @@ -57,8 +66,23 @@ public abstract class LogAggregationContext { return context; } + @Public + @Unstable + public static LogAggregationContext newInstance(String includePattern, + String excludePattern, String rolledLogsIncludePattern, + String rolledLogsExcludePattern) { + LogAggregationContext context = + Records.newRecord(LogAggregationContext.class); + context.setIncludePattern(includePattern); + context.setExcludePattern(excludePattern); + context.setRolledLogsIncludePattern(rolledLogsIncludePattern); + context.setRolledLogsExcludePattern(rolledLogsExcludePattern); + return context; + } + /** - * Get include pattern + * Get include pattern. This includePattern only takes affect + * on logs that exist at the time of application finish. * * @return include pattern */ @@ -67,7 +91,8 @@ public abstract class LogAggregationContext { public abstract String getIncludePattern(); /** - * Set include pattern + * Set include pattern. This includePattern only takes affect + * on logs that exist at the time of application finish. * * @param includePattern */ @@ -76,7 +101,8 @@ public abstract class LogAggregationContext { public abstract void setIncludePattern(String includePattern); /** - * Get exclude pattern + * Get exclude pattern. This excludePattern only takes affect + * on logs that exist at the time of application finish. * * @return exclude pattern */ @@ -85,11 +111,50 @@ public abstract class LogAggregationContext { public abstract String getExcludePattern(); /** - * Set exclude pattern + * Set exclude pattern. This excludePattern only takes affect + * on logs that exist at the time of application finish. * * @param excludePattern */ @Public @Unstable public abstract void setExcludePattern(String excludePattern); + + /** + * Get include pattern in a rolling fashion. + * + * @return include pattern + */ + @Public + @Unstable + public abstract String getRolledLogsIncludePattern(); + + /** + * Set include pattern in a rolling fashion. + * + * @param rolledLogsIncludePattern + */ + @Public + @Unstable + public abstract void setRolledLogsIncludePattern( + String rolledLogsIncludePattern); + + /** + * Get exclude pattern for aggregation in a rolling fashion. + * + * @return exclude pattern + */ + @Public + @Unstable + public abstract String getRolledLogsExcludePattern(); + + /** + * Set exclude pattern for in a rolling fashion. + * + * @param rolledLogsExcludePattern + */ + @Public + @Unstable + public abstract void setRolledLogsExcludePattern( + String rolledLogsExcludePattern); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 90706ed4be2..2edff99b2a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -314,6 +314,8 @@ message ApplicationSubmissionContextProto { message LogAggregationContextProto { optional string include_pattern = 1 [default = ".*"]; optional string exclude_pattern = 2 [default = ""]; + optional string rolled_logs_include_pattern = 3 [default = ""]; + optional string rolled_logs_exclude_pattern = 4 [default = ".*"]; } enum ApplicationAccessTypeProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java index dc7a21d2a5b..f6409bb5825 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records.impl.pb; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder; + import com.google.protobuf.TextFormat; public class LogAggregationContextPBImpl extends LogAggregationContext{ @@ -116,4 +117,42 @@ public class LogAggregationContextPBImpl extends LogAggregationContext{ } builder.setExcludePattern(excludePattern); } + + @Override + public String getRolledLogsIncludePattern() { + LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder; + if (! p.hasRolledLogsIncludePattern()) { + return null; + } + return p.getRolledLogsIncludePattern(); + } + + @Override + public void setRolledLogsIncludePattern(String rolledLogsIncludePattern) { + maybeInitBuilder(); + if (rolledLogsIncludePattern == null) { + builder.clearRolledLogsIncludePattern(); + return; + } + builder.setRolledLogsIncludePattern(rolledLogsIncludePattern); + } + + @Override + public String getRolledLogsExcludePattern() { + LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder; + if (! p.hasRolledLogsExcludePattern()) { + return null; + } + return p.getRolledLogsExcludePattern(); + } + + @Override + public void setRolledLogsExcludePattern(String rolledLogsExcludePattern) { + maybeInitBuilder(); + if (rolledLogsExcludePattern == null) { + builder.clearRolledLogsExcludePattern(); + return; + } + builder.setRolledLogsExcludePattern(rolledLogsExcludePattern); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index b6693326eae..29122dce103 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -167,17 +167,18 @@ public class AggregatedLogFormat { private Set uploadedFiles = new HashSet(); private final Set alreadyUploadedLogFiles; private Set allExistingFileMeta = new HashSet(); + private final boolean appFinished; // TODO Maybe add a version string here. Instead of changing the version of // the entire k-v format public LogValue(List rootLogDirs, ContainerId containerId, String user) { - this(rootLogDirs, containerId, user, null, new HashSet()); + this(rootLogDirs, containerId, user, null, new HashSet(), true); } public LogValue(List rootLogDirs, ContainerId containerId, String user, LogAggregationContext logAggregationContext, - Set alreadyUploadedLogFiles) { + Set alreadyUploadedLogFiles, boolean appFinished) { this.rootLogDirs = new ArrayList(rootLogDirs); this.containerId = containerId; this.user = user; @@ -186,6 +187,7 @@ public class AggregatedLogFormat { Collections.sort(this.rootLogDirs); this.logAggregationContext = logAggregationContext; this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; + this.appFinished = appFinished; } private Set getPendingLogFilesToUploadForThisContainer() { @@ -296,17 +298,15 @@ public class AggregatedLogFormat { } if (this.logAggregationContext != null && candidates.size() > 0) { - if (this.logAggregationContext.getIncludePattern() != null - && !this.logAggregationContext.getIncludePattern().isEmpty()) { - filterFiles(this.logAggregationContext.getIncludePattern(), - candidates, false); - } + filterFiles( + this.appFinished ? this.logAggregationContext.getIncludePattern() + : this.logAggregationContext.getRolledLogsIncludePattern(), + candidates, false); - if (this.logAggregationContext.getExcludePattern() != null - && !this.logAggregationContext.getExcludePattern().isEmpty()) { - filterFiles(this.logAggregationContext.getExcludePattern(), - candidates, true); - } + filterFiles( + this.appFinished ? this.logAggregationContext.getExcludePattern() + : this.logAggregationContext.getRolledLogsExcludePattern(), + candidates, true); Iterable mask = Iterables.filter(candidates, new Predicate() { @@ -323,14 +323,15 @@ public class AggregatedLogFormat { private void filterFiles(String pattern, Set candidates, boolean exclusion) { - Pattern filterPattern = - Pattern.compile(pattern); - for (Iterator candidatesItr = candidates.iterator(); candidatesItr + if (pattern != null && !pattern.isEmpty()) { + Pattern filterPattern = Pattern.compile(pattern); + for (Iterator candidatesItr = candidates.iterator(); candidatesItr .hasNext();) { - File candidate = candidatesItr.next(); - boolean match = filterPattern.matcher(candidate.getName()).find(); - if ((!match && !exclusion) || (match && exclusion)) { - candidatesItr.remove(); + File candidate = candidatesItr.next(); + boolean match = filterPattern.matcher(candidate.getName()).find(); + if ((!match && !exclusion) || (match && exclusion)) { + candidatesItr.remove(); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 787422b8c5d..ff70a681047 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -44,7 +44,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -116,6 +115,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private final Context context; private final int retentionSize; private final long rollingMonitorInterval; + private final boolean logAggregationInRolling; private final NodeId nodeId; // This variable is only for testing private final AtomicBoolean waiting = new AtomicBoolean(false); @@ -193,9 +193,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } this.rollingMonitorInterval = configuredRollingMonitorInterval; } + this.logAggregationInRolling = + this.rollingMonitorInterval <= 0 || this.logAggregationContext == null + || this.logAggregationContext.getRolledLogsIncludePattern() == null + || this.logAggregationContext.getRolledLogsIncludePattern() + .isEmpty() ? false : true; } - private void uploadLogsForContainers() { + private void uploadLogsForContainers(boolean appFinished) { if (this.logAggregationDisabled) { return; } @@ -262,7 +267,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { containerLogAggregators.put(container, aggregator); } Set uploadedFilePathsInThisCycle = - aggregator.doContainerLogAggregation(writer); + aggregator.doContainerLogAggregation(writer, appFinished); if (uploadedFilePathsInThisCycle.size() > 0) { uploadedLogsInThisCycle = true; } @@ -394,12 +399,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator { synchronized(this) { try { waiting.set(true); - if (this.rollingMonitorInterval > 0) { + if (logAggregationInRolling) { wait(this.rollingMonitorInterval * 1000); if (this.appFinishing.get() || this.aborted.get()) { break; } - uploadLogsForContainers(); + uploadLogsForContainers(false); } else { wait(THREAD_SLEEP_TIME); } @@ -415,7 +420,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } // App is finished, upload the container logs. - uploadLogsForContainers(); + uploadLogsForContainers(true); // Remove the local app-log-dirs List localAppLogDirs = new ArrayList(); @@ -536,7 +541,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.containerId = containerId; } - public Set doContainerLogAggregation(LogWriter writer) { + public Set doContainerLogAggregation(LogWriter writer, + boolean appFinished) { LOG.info("Uploading logs for container " + containerId + ". Current good log dirs are " + StringUtils.join(",", dirsHandler.getLogDirs())); @@ -544,7 +550,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { final LogValue logValue = new LogValue(dirsHandler.getLogDirs(), containerId, userUgi.getShortUserName(), logAggregationContext, - this.uploadedFileMeta); + this.uploadedFileMeta, appFinished); try { writer.append(logKey, logValue); } catch (Exception e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index a73d58341bb..c45ffbb93dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -130,8 +130,10 @@ public class TestContainerManagerRecovery { containerTokens, acls); // create the logAggregationContext LogAggregationContext logAggregationContext = - LogAggregationContext.newInstance("includePattern", "excludePattern"); - StartContainersResponse startResponse = startContainer(context, cm, cid, + LogAggregationContext.newInstance("includePattern", "excludePattern", + "includePatternInRollingAggregation", + "excludePatternInRollingAggregation"); + StartContainersResponse startResponse = startContainer(context, cm, cid, clc, logAggregationContext); assertTrue(startResponse.getFailedRequests().isEmpty()); assertEquals(1, context.getApplications().size()); @@ -171,6 +173,10 @@ public class TestContainerManagerRecovery { recovered.getIncludePattern()); assertEquals(logAggregationContext.getExcludePattern(), recovered.getExcludePattern()); + assertEquals(logAggregationContext.getRolledLogsIncludePattern(), + recovered.getRolledLogsIncludePattern()); + assertEquals(logAggregationContext.getRolledLogsExcludePattern(), + recovered.getRolledLogsExcludePattern()); waitForAppState(app, ApplicationState.INITING); assertTrue(context.getApplicationACLsManager().checkAccess( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 901e45a57bb..df51a0dab83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -698,7 +698,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } } - private String verifyContainerLogs(LogAggregationService logAggregationService, + private LogFileStatusInLastCycle verifyContainerLogs(LogAggregationService logAggregationService, ApplicationId appId, ContainerId[] expectedContainerIds, String[] logFiles, int numOfContainerLogs, boolean multiLogs) throws IOException { @@ -743,7 +743,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest { new AggregatedLogFormat.LogReader(this.conf, targetNodeFile.getPath()); Assert.assertEquals(this.user, reader.getApplicationOwner()); verifyAcls(reader.getApplicationAcls()); - + + List fileTypes = new ArrayList(); + try { Map> logMap = new HashMap>(); @@ -769,6 +771,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8)); String fileType = writtenLines[0].substring(8); + fileTypes.add(fileType); Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10)); String fileLengthStr = writtenLines[1].substring(10); @@ -811,7 +814,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { Assert.assertEquals(0, thisContainerMap.size()); } Assert.assertEquals(0, logMap.size()); - return targetNodeFile.getPath().getName(); + return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(), fileTypes); } finally { reader.close(); } @@ -1289,6 +1292,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { throws Exception { LogAggregationContext logAggregationContextWithInterval = Records.newRecord(LogAggregationContext.class); + // set IncludePattern/excludePattern in rolling fashion + // we expect all the logs except std_final will be uploaded + // when app is running. The std_final will be uploaded when + // the app finishes. + logAggregationContextWithInterval.setRolledLogsIncludePattern(".*"); + logAggregationContextWithInterval.setRolledLogsExcludePattern("std_final"); this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); @@ -1338,9 +1347,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, logAggregationContextWithInterval)); + LogFileStatusInLastCycle logFileStatusInLastCycle = null; // Simulate log-file creation - String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" }; - writeContainerLogs(appLogDir, container, logFiles1); + // create std_final in log directory which will not be aggregated + // until the app finishes. + String[] logFiles1WithFinalLog = + new String[] { "stdout", "stderr", "syslog", "std_final" }; + String[] logFiles1 = new String[] { "stdout", "stderr", "syslog"}; + writeContainerLogs(appLogDir, container, logFiles1WithFinalLog); // Do log aggregation AppLogAggregatorImpl aggregator = @@ -1355,10 +1369,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest { Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, 50, 1, false, null)); } - String logFileInLastCycle = null; // Container logs should be uploaded - logFileInLastCycle = verifyContainerLogs(logAggregationService, application, + logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application, new ContainerId[] { container }, logFiles1, 3, true); + for(String logFile : logFiles1) { + Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle() + .contains(logFile)); + } + // Make sure the std_final is not uploaded. + Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle() + .contains("std_final")); Thread.sleep(2000); @@ -1380,15 +1400,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest { if (retentionSizeLimitation) { Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, - 50, 1, true, logFileInLastCycle)); + 50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle())); } else { Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, 50, 2, false, null)); } // Container logs should be uploaded - logFileInLastCycle = verifyContainerLogs(logAggregationService, application, + logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application, new ContainerId[] { container }, logFiles2, 3, true); + for(String logFile : logFiles2) { + Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle() + .contains(logFile)); + } + // Make sure the std_final is not uploaded. + Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle() + .contains("std_final")); + Thread.sleep(2000); // create another logs @@ -1402,13 +1430,17 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.handle(new LogHandlerAppFinishedEvent(application)); if (retentionSizeLimitation) { Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, - 50, 1, true, logFileInLastCycle)); + 50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle())); } else { Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application, 50, 3, false, null)); } + + // the app is finished. The log "std_final" should be aggregated this time. + String[] logFiles3WithFinalLog = + new String[] { "stdout_2", "stderr_2", "syslog_2", "std_final" }; verifyContainerLogs(logAggregationService, application, - new ContainerId[] { container }, logFiles3, 3, true); + new ContainerId[] { container }, logFiles3WithFinalLog, 4, true); logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); } @@ -1512,4 +1544,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest { return numOfLogsAvailable(logAggregationService, application, sizeLimited, lastLogFile) == expectNum; } + + private static class LogFileStatusInLastCycle { + private String logFilePathInLastCycle; + private List logFileTypesInLastCycle; + + public LogFileStatusInLastCycle(String logFilePathInLastCycle, + List logFileTypesInLastCycle) { + this.logFilePathInLastCycle = logFilePathInLastCycle; + this.logFileTypesInLastCycle = logFileTypesInLastCycle; + } + + public String getLogFilePathInLastCycle() { + return this.logFilePathInLastCycle; + } + + public List getLogFileTypesInLastCycle() { + return this.logFileTypesInLastCycle; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 169517dd372..0ad295774b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -230,12 +230,18 @@ public class TestContainerAllocation { // create a not-null LogAggregationContext LogAggregationContext logAggregationContext = LogAggregationContext.newInstance( - "includePattern", "excludePattern"); + "includePattern", "excludePattern", + "rolledLogsIncludePattern", + "rolledLogsExcludePattern"); LogAggregationContext returned = getLogAggregationContextFromContainerToken(rm1, nm2, logAggregationContext); Assert.assertEquals("includePattern", returned.getIncludePattern()); Assert.assertEquals("excludePattern", returned.getExcludePattern()); + Assert.assertEquals("rolledLogsIncludePattern", + returned.getRolledLogsIncludePattern()); + Assert.assertEquals("rolledLogsExcludePattern", + returned.getRolledLogsExcludePattern()); rm1.stop(); }