diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 11d1cc9fe05..80d26979b33 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -757,6 +757,9 @@ Release 2.7.0 - UNRELEASED
YARN-3338. Exclude jline dependency from YARN. (Zhijie Shen via xgong)
+ YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating
+ running logs of application when rolling is enabled. (Xuan Gong via vinodkv)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
index 46c1809b417..e582d2c6e8c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
@@ -32,11 +32,20 @@ import org.apache.hadoop.yarn.util.Records;
*
* - includePattern. It uses Java Regex to filter the log files
* which match the defined include pattern and those log files
- * will be uploaded.
+ * will be uploaded when the application finishes.
* - excludePattern. It uses Java Regex to filter the log files
* which match the defined exclude pattern and those log files
- * will not be uploaded. If the log file name matches both the
- * include and the exclude pattern, this file will be excluded eventually
+ * will not be uploaded when application finishes. If the log file
+ * name matches both the include and the exclude pattern, this file
+ * will be excluded eventually
+ * - rolledLogsIncludePattern. It uses Java Regex to filter the log files
+ * which match the defined include pattern and those log files
+ * will be aggregated in a rolling fashion.
+ * - rolledLogsExcludePattern. It uses Java Regex to filter the log files
+ * which match the defined exclude pattern and those log files
+ * will not be aggregated in a rolling fashion. If the log file
+ * name matches both the include and the exclude pattern, this file
+ * will be excluded eventually
*
*
*
@@ -57,8 +66,23 @@ public abstract class LogAggregationContext {
return context;
}
+ @Public
+ @Unstable
+ public static LogAggregationContext newInstance(String includePattern,
+ String excludePattern, String rolledLogsIncludePattern,
+ String rolledLogsExcludePattern) {
+ LogAggregationContext context =
+ Records.newRecord(LogAggregationContext.class);
+ context.setIncludePattern(includePattern);
+ context.setExcludePattern(excludePattern);
+ context.setRolledLogsIncludePattern(rolledLogsIncludePattern);
+ context.setRolledLogsExcludePattern(rolledLogsExcludePattern);
+ return context;
+ }
+
/**
- * Get include pattern
+ * Get include pattern. This includePattern only takes affect
+ * on logs that exist at the time of application finish.
*
* @return include pattern
*/
@@ -67,7 +91,8 @@ public abstract class LogAggregationContext {
public abstract String getIncludePattern();
/**
- * Set include pattern
+ * Set include pattern. This includePattern only takes affect
+ * on logs that exist at the time of application finish.
*
* @param includePattern
*/
@@ -76,7 +101,8 @@ public abstract class LogAggregationContext {
public abstract void setIncludePattern(String includePattern);
/**
- * Get exclude pattern
+ * Get exclude pattern. This excludePattern only takes affect
+ * on logs that exist at the time of application finish.
*
* @return exclude pattern
*/
@@ -85,11 +111,50 @@ public abstract class LogAggregationContext {
public abstract String getExcludePattern();
/**
- * Set exclude pattern
+ * Set exclude pattern. This excludePattern only takes affect
+ * on logs that exist at the time of application finish.
*
* @param excludePattern
*/
@Public
@Unstable
public abstract void setExcludePattern(String excludePattern);
+
+ /**
+ * Get include pattern in a rolling fashion.
+ *
+ * @return include pattern
+ */
+ @Public
+ @Unstable
+ public abstract String getRolledLogsIncludePattern();
+
+ /**
+ * Set include pattern in a rolling fashion.
+ *
+ * @param rolledLogsIncludePattern
+ */
+ @Public
+ @Unstable
+ public abstract void setRolledLogsIncludePattern(
+ String rolledLogsIncludePattern);
+
+ /**
+ * Get exclude pattern for aggregation in a rolling fashion.
+ *
+ * @return exclude pattern
+ */
+ @Public
+ @Unstable
+ public abstract String getRolledLogsExcludePattern();
+
+ /**
+ * Set exclude pattern for in a rolling fashion.
+ *
+ * @param rolledLogsExcludePattern
+ */
+ @Public
+ @Unstable
+ public abstract void setRolledLogsExcludePattern(
+ String rolledLogsExcludePattern);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 90706ed4be2..2edff99b2a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -314,6 +314,8 @@ message ApplicationSubmissionContextProto {
message LogAggregationContextProto {
optional string include_pattern = 1 [default = ".*"];
optional string exclude_pattern = 2 [default = ""];
+ optional string rolled_logs_include_pattern = 3 [default = ""];
+ optional string rolled_logs_exclude_pattern = 4 [default = ".*"];
}
enum ApplicationAccessTypeProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
index dc7a21d2a5b..f6409bb5825 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder;
+
import com.google.protobuf.TextFormat;
public class LogAggregationContextPBImpl extends LogAggregationContext{
@@ -116,4 +117,42 @@ public class LogAggregationContextPBImpl extends LogAggregationContext{
}
builder.setExcludePattern(excludePattern);
}
+
+ @Override
+ public String getRolledLogsIncludePattern() {
+ LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (! p.hasRolledLogsIncludePattern()) {
+ return null;
+ }
+ return p.getRolledLogsIncludePattern();
+ }
+
+ @Override
+ public void setRolledLogsIncludePattern(String rolledLogsIncludePattern) {
+ maybeInitBuilder();
+ if (rolledLogsIncludePattern == null) {
+ builder.clearRolledLogsIncludePattern();
+ return;
+ }
+ builder.setRolledLogsIncludePattern(rolledLogsIncludePattern);
+ }
+
+ @Override
+ public String getRolledLogsExcludePattern() {
+ LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (! p.hasRolledLogsExcludePattern()) {
+ return null;
+ }
+ return p.getRolledLogsExcludePattern();
+ }
+
+ @Override
+ public void setRolledLogsExcludePattern(String rolledLogsExcludePattern) {
+ maybeInitBuilder();
+ if (rolledLogsExcludePattern == null) {
+ builder.clearRolledLogsExcludePattern();
+ return;
+ }
+ builder.setRolledLogsExcludePattern(rolledLogsExcludePattern);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index b6693326eae..29122dce103 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -167,17 +167,18 @@ public class AggregatedLogFormat {
private Set uploadedFiles = new HashSet();
private final Set alreadyUploadedLogFiles;
private Set allExistingFileMeta = new HashSet();
+ private final boolean appFinished;
// TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format
public LogValue(List rootLogDirs, ContainerId containerId,
String user) {
- this(rootLogDirs, containerId, user, null, new HashSet());
+ this(rootLogDirs, containerId, user, null, new HashSet(), true);
}
public LogValue(List rootLogDirs, ContainerId containerId,
String user, LogAggregationContext logAggregationContext,
- Set alreadyUploadedLogFiles) {
+ Set alreadyUploadedLogFiles, boolean appFinished) {
this.rootLogDirs = new ArrayList(rootLogDirs);
this.containerId = containerId;
this.user = user;
@@ -186,6 +187,7 @@ public class AggregatedLogFormat {
Collections.sort(this.rootLogDirs);
this.logAggregationContext = logAggregationContext;
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
+ this.appFinished = appFinished;
}
private Set getPendingLogFilesToUploadForThisContainer() {
@@ -296,17 +298,15 @@ public class AggregatedLogFormat {
}
if (this.logAggregationContext != null && candidates.size() > 0) {
- if (this.logAggregationContext.getIncludePattern() != null
- && !this.logAggregationContext.getIncludePattern().isEmpty()) {
- filterFiles(this.logAggregationContext.getIncludePattern(),
- candidates, false);
- }
+ filterFiles(
+ this.appFinished ? this.logAggregationContext.getIncludePattern()
+ : this.logAggregationContext.getRolledLogsIncludePattern(),
+ candidates, false);
- if (this.logAggregationContext.getExcludePattern() != null
- && !this.logAggregationContext.getExcludePattern().isEmpty()) {
- filterFiles(this.logAggregationContext.getExcludePattern(),
- candidates, true);
- }
+ filterFiles(
+ this.appFinished ? this.logAggregationContext.getExcludePattern()
+ : this.logAggregationContext.getRolledLogsExcludePattern(),
+ candidates, true);
Iterable mask =
Iterables.filter(candidates, new Predicate() {
@@ -323,14 +323,15 @@ public class AggregatedLogFormat {
private void filterFiles(String pattern, Set candidates,
boolean exclusion) {
- Pattern filterPattern =
- Pattern.compile(pattern);
- for (Iterator candidatesItr = candidates.iterator(); candidatesItr
+ if (pattern != null && !pattern.isEmpty()) {
+ Pattern filterPattern = Pattern.compile(pattern);
+ for (Iterator candidatesItr = candidates.iterator(); candidatesItr
.hasNext();) {
- File candidate = candidatesItr.next();
- boolean match = filterPattern.matcher(candidate.getName()).find();
- if ((!match && !exclusion) || (match && exclusion)) {
- candidatesItr.remove();
+ File candidate = candidatesItr.next();
+ boolean match = filterPattern.matcher(candidate.getName()).find();
+ if ((!match && !exclusion) || (match && exclusion)) {
+ candidatesItr.remove();
+ }
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 787422b8c5d..ff70a681047 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -116,6 +115,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final Context context;
private final int retentionSize;
private final long rollingMonitorInterval;
+ private final boolean logAggregationInRolling;
private final NodeId nodeId;
// This variable is only for testing
private final AtomicBoolean waiting = new AtomicBoolean(false);
@@ -193,9 +193,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
this.rollingMonitorInterval = configuredRollingMonitorInterval;
}
+ this.logAggregationInRolling =
+ this.rollingMonitorInterval <= 0 || this.logAggregationContext == null
+ || this.logAggregationContext.getRolledLogsIncludePattern() == null
+ || this.logAggregationContext.getRolledLogsIncludePattern()
+ .isEmpty() ? false : true;
}
- private void uploadLogsForContainers() {
+ private void uploadLogsForContainers(boolean appFinished) {
if (this.logAggregationDisabled) {
return;
}
@@ -262,7 +267,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
containerLogAggregators.put(container, aggregator);
}
Set uploadedFilePathsInThisCycle =
- aggregator.doContainerLogAggregation(writer);
+ aggregator.doContainerLogAggregation(writer, appFinished);
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
}
@@ -394,12 +399,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
synchronized(this) {
try {
waiting.set(true);
- if (this.rollingMonitorInterval > 0) {
+ if (logAggregationInRolling) {
wait(this.rollingMonitorInterval * 1000);
if (this.appFinishing.get() || this.aborted.get()) {
break;
}
- uploadLogsForContainers();
+ uploadLogsForContainers(false);
} else {
wait(THREAD_SLEEP_TIME);
}
@@ -415,7 +420,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
// App is finished, upload the container logs.
- uploadLogsForContainers();
+ uploadLogsForContainers(true);
// Remove the local app-log-dirs
List localAppLogDirs = new ArrayList();
@@ -536,7 +541,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.containerId = containerId;
}
- public Set doContainerLogAggregation(LogWriter writer) {
+ public Set doContainerLogAggregation(LogWriter writer,
+ boolean appFinished) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirs()));
@@ -544,7 +550,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
final LogValue logValue =
new LogValue(dirsHandler.getLogDirs(), containerId,
userUgi.getShortUserName(), logAggregationContext,
- this.uploadedFileMeta);
+ this.uploadedFileMeta, appFinished);
try {
writer.append(logKey, logValue);
} catch (Exception e) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index a73d58341bb..c45ffbb93dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -130,8 +130,10 @@ public class TestContainerManagerRecovery {
containerTokens, acls);
// create the logAggregationContext
LogAggregationContext logAggregationContext =
- LogAggregationContext.newInstance("includePattern", "excludePattern");
- StartContainersResponse startResponse = startContainer(context, cm, cid,
+ LogAggregationContext.newInstance("includePattern", "excludePattern",
+ "includePatternInRollingAggregation",
+ "excludePatternInRollingAggregation");
+ StartContainersResponse startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
@@ -171,6 +173,10 @@ public class TestContainerManagerRecovery {
recovered.getIncludePattern());
assertEquals(logAggregationContext.getExcludePattern(),
recovered.getExcludePattern());
+ assertEquals(logAggregationContext.getRolledLogsIncludePattern(),
+ recovered.getRolledLogsIncludePattern());
+ assertEquals(logAggregationContext.getRolledLogsExcludePattern(),
+ recovered.getRolledLogsExcludePattern());
waitForAppState(app, ApplicationState.INITING);
assertTrue(context.getApplicationACLsManager().checkAccess(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 901e45a57bb..df51a0dab83 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -698,7 +698,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
}
- private String verifyContainerLogs(LogAggregationService logAggregationService,
+ private LogFileStatusInLastCycle verifyContainerLogs(LogAggregationService logAggregationService,
ApplicationId appId, ContainerId[] expectedContainerIds,
String[] logFiles, int numOfContainerLogs, boolean multiLogs)
throws IOException {
@@ -743,7 +743,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
new AggregatedLogFormat.LogReader(this.conf, targetNodeFile.getPath());
Assert.assertEquals(this.user, reader.getApplicationOwner());
verifyAcls(reader.getApplicationAcls());
-
+
+ List fileTypes = new ArrayList();
+
try {
Map> logMap =
new HashMap>();
@@ -769,6 +771,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
String fileType = writtenLines[0].substring(8);
+ fileTypes.add(fileType);
Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
String fileLengthStr = writtenLines[1].substring(10);
@@ -811,7 +814,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
Assert.assertEquals(0, thisContainerMap.size());
}
Assert.assertEquals(0, logMap.size());
- return targetNodeFile.getPath().getName();
+ return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(), fileTypes);
} finally {
reader.close();
}
@@ -1289,6 +1292,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
throws Exception {
LogAggregationContext logAggregationContextWithInterval =
Records.newRecord(LogAggregationContext.class);
+ // set IncludePattern/excludePattern in rolling fashion
+ // we expect all the logs except std_final will be uploaded
+ // when app is running. The std_final will be uploaded when
+ // the app finishes.
+ logAggregationContextWithInterval.setRolledLogsIncludePattern(".*");
+ logAggregationContextWithInterval.setRolledLogsExcludePattern("std_final");
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
@@ -1338,9 +1347,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
logAggregationContextWithInterval));
+ LogFileStatusInLastCycle logFileStatusInLastCycle = null;
// Simulate log-file creation
- String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" };
- writeContainerLogs(appLogDir, container, logFiles1);
+ // create std_final in log directory which will not be aggregated
+ // until the app finishes.
+ String[] logFiles1WithFinalLog =
+ new String[] { "stdout", "stderr", "syslog", "std_final" };
+ String[] logFiles1 = new String[] { "stdout", "stderr", "syslog"};
+ writeContainerLogs(appLogDir, container, logFiles1WithFinalLog);
// Do log aggregation
AppLogAggregatorImpl aggregator =
@@ -1355,10 +1369,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 1, false, null));
}
- String logFileInLastCycle = null;
// Container logs should be uploaded
- logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
+ logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles1, 3, true);
+ for(String logFile : logFiles1) {
+ Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
+ .contains(logFile));
+ }
+ // Make sure the std_final is not uploaded.
+ Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
+ .contains("std_final"));
Thread.sleep(2000);
@@ -1380,15 +1400,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
if (retentionSizeLimitation) {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
- 50, 1, true, logFileInLastCycle));
+ 50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle()));
} else {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 2, false, null));
}
// Container logs should be uploaded
- logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
+ logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles2, 3, true);
+ for(String logFile : logFiles2) {
+ Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
+ .contains(logFile));
+ }
+ // Make sure the std_final is not uploaded.
+ Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
+ .contains("std_final"));
+
Thread.sleep(2000);
// create another logs
@@ -1402,13 +1430,17 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
if (retentionSizeLimitation) {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
- 50, 1, true, logFileInLastCycle));
+ 50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle()));
} else {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 3, false, null));
}
+
+ // the app is finished. The log "std_final" should be aggregated this time.
+ String[] logFiles3WithFinalLog =
+ new String[] { "stdout_2", "stderr_2", "syslog_2", "std_final" };
verifyContainerLogs(logAggregationService, application,
- new ContainerId[] { container }, logFiles3, 3, true);
+ new ContainerId[] { container }, logFiles3WithFinalLog, 4, true);
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
}
@@ -1512,4 +1544,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
return numOfLogsAvailable(logAggregationService, application, sizeLimited,
lastLogFile) == expectNum;
}
+
+ private static class LogFileStatusInLastCycle {
+ private String logFilePathInLastCycle;
+ private List logFileTypesInLastCycle;
+
+ public LogFileStatusInLastCycle(String logFilePathInLastCycle,
+ List logFileTypesInLastCycle) {
+ this.logFilePathInLastCycle = logFilePathInLastCycle;
+ this.logFileTypesInLastCycle = logFileTypesInLastCycle;
+ }
+
+ public String getLogFilePathInLastCycle() {
+ return this.logFilePathInLastCycle;
+ }
+
+ public List getLogFileTypesInLastCycle() {
+ return this.logFileTypesInLastCycle;
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 169517dd372..0ad295774b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -230,12 +230,18 @@ public class TestContainerAllocation {
// create a not-null LogAggregationContext
LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance(
- "includePattern", "excludePattern");
+ "includePattern", "excludePattern",
+ "rolledLogsIncludePattern",
+ "rolledLogsExcludePattern");
LogAggregationContext returned =
getLogAggregationContextFromContainerToken(rm1, nm2,
logAggregationContext);
Assert.assertEquals("includePattern", returned.getIncludePattern());
Assert.assertEquals("excludePattern", returned.getExcludePattern());
+ Assert.assertEquals("rolledLogsIncludePattern",
+ returned.getRolledLogsIncludePattern());
+ Assert.assertEquals("rolledLogsExcludePattern",
+ returned.getRolledLogsExcludePattern());
rm1.stop();
}