YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating running logs of application when rolling is enabled. Contributed by Xuan Gong.
(cherry picked from commit 863079bb87
)
This commit is contained in:
parent
a1fffc3fcc
commit
0cd9eb9987
|
@ -700,6 +700,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-3338. Exclude jline dependency from YARN. (Zhijie Shen via xgong)
|
YARN-3338. Exclude jline dependency from YARN. (Zhijie Shen via xgong)
|
||||||
|
|
||||||
|
YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating
|
||||||
|
running logs of application when rolling is enabled. (Xuan Gong via vinodkv)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -32,11 +32,20 @@ import org.apache.hadoop.yarn.util.Records;
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li>includePattern. It uses Java Regex to filter the log files
|
* <li>includePattern. It uses Java Regex to filter the log files
|
||||||
* which match the defined include pattern and those log files
|
* which match the defined include pattern and those log files
|
||||||
* will be uploaded. </li>
|
* will be uploaded when the application finishes. </li>
|
||||||
* <li>excludePattern. It uses Java Regex to filter the log files
|
* <li>excludePattern. It uses Java Regex to filter the log files
|
||||||
* which match the defined exclude pattern and those log files
|
* which match the defined exclude pattern and those log files
|
||||||
* will not be uploaded. If the log file name matches both the
|
* will not be uploaded when application finishes. If the log file
|
||||||
* include and the exclude pattern, this file will be excluded eventually</li>
|
* name matches both the include and the exclude pattern, this file
|
||||||
|
* will be excluded eventually</li>
|
||||||
|
* <li>rolledLogsIncludePattern. It uses Java Regex to filter the log files
|
||||||
|
* which match the defined include pattern and those log files
|
||||||
|
* will be aggregated in a rolling fashion.</li>
|
||||||
|
* <li>rolledLogsExcludePattern. It uses Java Regex to filter the log files
|
||||||
|
* which match the defined exclude pattern and those log files
|
||||||
|
* will not be aggregated in a rolling fashion. If the log file
|
||||||
|
* name matches both the include and the exclude pattern, this file
|
||||||
|
* will be excluded eventually</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
|
@ -57,8 +66,23 @@ public abstract class LogAggregationContext {
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public static LogAggregationContext newInstance(String includePattern,
|
||||||
|
String excludePattern, String rolledLogsIncludePattern,
|
||||||
|
String rolledLogsExcludePattern) {
|
||||||
|
LogAggregationContext context =
|
||||||
|
Records.newRecord(LogAggregationContext.class);
|
||||||
|
context.setIncludePattern(includePattern);
|
||||||
|
context.setExcludePattern(excludePattern);
|
||||||
|
context.setRolledLogsIncludePattern(rolledLogsIncludePattern);
|
||||||
|
context.setRolledLogsExcludePattern(rolledLogsExcludePattern);
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get include pattern
|
* Get include pattern. This includePattern only takes affect
|
||||||
|
* on logs that exist at the time of application finish.
|
||||||
*
|
*
|
||||||
* @return include pattern
|
* @return include pattern
|
||||||
*/
|
*/
|
||||||
|
@ -67,7 +91,8 @@ public abstract class LogAggregationContext {
|
||||||
public abstract String getIncludePattern();
|
public abstract String getIncludePattern();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set include pattern
|
* Set include pattern. This includePattern only takes affect
|
||||||
|
* on logs that exist at the time of application finish.
|
||||||
*
|
*
|
||||||
* @param includePattern
|
* @param includePattern
|
||||||
*/
|
*/
|
||||||
|
@ -76,7 +101,8 @@ public abstract class LogAggregationContext {
|
||||||
public abstract void setIncludePattern(String includePattern);
|
public abstract void setIncludePattern(String includePattern);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get exclude pattern
|
* Get exclude pattern. This excludePattern only takes affect
|
||||||
|
* on logs that exist at the time of application finish.
|
||||||
*
|
*
|
||||||
* @return exclude pattern
|
* @return exclude pattern
|
||||||
*/
|
*/
|
||||||
|
@ -85,11 +111,50 @@ public abstract class LogAggregationContext {
|
||||||
public abstract String getExcludePattern();
|
public abstract String getExcludePattern();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set exclude pattern
|
* Set exclude pattern. This excludePattern only takes affect
|
||||||
|
* on logs that exist at the time of application finish.
|
||||||
*
|
*
|
||||||
* @param excludePattern
|
* @param excludePattern
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract void setExcludePattern(String excludePattern);
|
public abstract void setExcludePattern(String excludePattern);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get include pattern in a rolling fashion.
|
||||||
|
*
|
||||||
|
* @return include pattern
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract String getRolledLogsIncludePattern();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set include pattern in a rolling fashion.
|
||||||
|
*
|
||||||
|
* @param rolledLogsIncludePattern
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract void setRolledLogsIncludePattern(
|
||||||
|
String rolledLogsIncludePattern);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get exclude pattern for aggregation in a rolling fashion.
|
||||||
|
*
|
||||||
|
* @return exclude pattern
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract String getRolledLogsExcludePattern();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set exclude pattern for in a rolling fashion.
|
||||||
|
*
|
||||||
|
* @param rolledLogsExcludePattern
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Unstable
|
||||||
|
public abstract void setRolledLogsExcludePattern(
|
||||||
|
String rolledLogsExcludePattern);
|
||||||
}
|
}
|
||||||
|
|
|
@ -314,6 +314,8 @@ message ApplicationSubmissionContextProto {
|
||||||
message LogAggregationContextProto {
|
message LogAggregationContextProto {
|
||||||
optional string include_pattern = 1 [default = ".*"];
|
optional string include_pattern = 1 [default = ".*"];
|
||||||
optional string exclude_pattern = 2 [default = ""];
|
optional string exclude_pattern = 2 [default = ""];
|
||||||
|
optional string rolled_logs_include_pattern = 3 [default = ""];
|
||||||
|
optional string rolled_logs_exclude_pattern = 4 [default = ".*"];
|
||||||
}
|
}
|
||||||
|
|
||||||
enum ApplicationAccessTypeProto {
|
enum ApplicationAccessTypeProto {
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder;
|
||||||
|
|
||||||
import com.google.protobuf.TextFormat;
|
import com.google.protobuf.TextFormat;
|
||||||
|
|
||||||
public class LogAggregationContextPBImpl extends LogAggregationContext{
|
public class LogAggregationContextPBImpl extends LogAggregationContext{
|
||||||
|
@ -116,4 +117,42 @@ public class LogAggregationContextPBImpl extends LogAggregationContext{
|
||||||
}
|
}
|
||||||
builder.setExcludePattern(excludePattern);
|
builder.setExcludePattern(excludePattern);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getRolledLogsIncludePattern() {
|
||||||
|
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (! p.hasRolledLogsIncludePattern()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return p.getRolledLogsIncludePattern();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRolledLogsIncludePattern(String rolledLogsIncludePattern) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (rolledLogsIncludePattern == null) {
|
||||||
|
builder.clearRolledLogsIncludePattern();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
builder.setRolledLogsIncludePattern(rolledLogsIncludePattern);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getRolledLogsExcludePattern() {
|
||||||
|
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (! p.hasRolledLogsExcludePattern()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return p.getRolledLogsExcludePattern();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRolledLogsExcludePattern(String rolledLogsExcludePattern) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (rolledLogsExcludePattern == null) {
|
||||||
|
builder.clearRolledLogsExcludePattern();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
builder.setRolledLogsExcludePattern(rolledLogsExcludePattern);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -167,17 +167,18 @@ public class AggregatedLogFormat {
|
||||||
private Set<File> uploadedFiles = new HashSet<File>();
|
private Set<File> uploadedFiles = new HashSet<File>();
|
||||||
private final Set<String> alreadyUploadedLogFiles;
|
private final Set<String> alreadyUploadedLogFiles;
|
||||||
private Set<String> allExistingFileMeta = new HashSet<String>();
|
private Set<String> allExistingFileMeta = new HashSet<String>();
|
||||||
|
private final boolean appFinished;
|
||||||
// TODO Maybe add a version string here. Instead of changing the version of
|
// TODO Maybe add a version string here. Instead of changing the version of
|
||||||
// the entire k-v format
|
// the entire k-v format
|
||||||
|
|
||||||
public LogValue(List<String> rootLogDirs, ContainerId containerId,
|
public LogValue(List<String> rootLogDirs, ContainerId containerId,
|
||||||
String user) {
|
String user) {
|
||||||
this(rootLogDirs, containerId, user, null, new HashSet<String>());
|
this(rootLogDirs, containerId, user, null, new HashSet<String>(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public LogValue(List<String> rootLogDirs, ContainerId containerId,
|
public LogValue(List<String> rootLogDirs, ContainerId containerId,
|
||||||
String user, LogAggregationContext logAggregationContext,
|
String user, LogAggregationContext logAggregationContext,
|
||||||
Set<String> alreadyUploadedLogFiles) {
|
Set<String> alreadyUploadedLogFiles, boolean appFinished) {
|
||||||
this.rootLogDirs = new ArrayList<String>(rootLogDirs);
|
this.rootLogDirs = new ArrayList<String>(rootLogDirs);
|
||||||
this.containerId = containerId;
|
this.containerId = containerId;
|
||||||
this.user = user;
|
this.user = user;
|
||||||
|
@ -186,6 +187,7 @@ public class AggregatedLogFormat {
|
||||||
Collections.sort(this.rootLogDirs);
|
Collections.sort(this.rootLogDirs);
|
||||||
this.logAggregationContext = logAggregationContext;
|
this.logAggregationContext = logAggregationContext;
|
||||||
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
|
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
|
||||||
|
this.appFinished = appFinished;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<File> getPendingLogFilesToUploadForThisContainer() {
|
private Set<File> getPendingLogFilesToUploadForThisContainer() {
|
||||||
|
@ -296,17 +298,15 @@ public class AggregatedLogFormat {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.logAggregationContext != null && candidates.size() > 0) {
|
if (this.logAggregationContext != null && candidates.size() > 0) {
|
||||||
if (this.logAggregationContext.getIncludePattern() != null
|
filterFiles(
|
||||||
&& !this.logAggregationContext.getIncludePattern().isEmpty()) {
|
this.appFinished ? this.logAggregationContext.getIncludePattern()
|
||||||
filterFiles(this.logAggregationContext.getIncludePattern(),
|
: this.logAggregationContext.getRolledLogsIncludePattern(),
|
||||||
candidates, false);
|
candidates, false);
|
||||||
}
|
|
||||||
|
|
||||||
if (this.logAggregationContext.getExcludePattern() != null
|
filterFiles(
|
||||||
&& !this.logAggregationContext.getExcludePattern().isEmpty()) {
|
this.appFinished ? this.logAggregationContext.getExcludePattern()
|
||||||
filterFiles(this.logAggregationContext.getExcludePattern(),
|
: this.logAggregationContext.getRolledLogsExcludePattern(),
|
||||||
candidates, true);
|
candidates, true);
|
||||||
}
|
|
||||||
|
|
||||||
Iterable<File> mask =
|
Iterable<File> mask =
|
||||||
Iterables.filter(candidates, new Predicate<File>() {
|
Iterables.filter(candidates, new Predicate<File>() {
|
||||||
|
@ -323,14 +323,15 @@ public class AggregatedLogFormat {
|
||||||
|
|
||||||
private void filterFiles(String pattern, Set<File> candidates,
|
private void filterFiles(String pattern, Set<File> candidates,
|
||||||
boolean exclusion) {
|
boolean exclusion) {
|
||||||
Pattern filterPattern =
|
if (pattern != null && !pattern.isEmpty()) {
|
||||||
Pattern.compile(pattern);
|
Pattern filterPattern = Pattern.compile(pattern);
|
||||||
for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
|
for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
|
||||||
.hasNext();) {
|
.hasNext();) {
|
||||||
File candidate = candidatesItr.next();
|
File candidate = candidatesItr.next();
|
||||||
boolean match = filterPattern.matcher(candidate.getName()).find();
|
boolean match = filterPattern.matcher(candidate.getName()).find();
|
||||||
if ((!match && !exclusion) || (match && exclusion)) {
|
if ((!match && !exclusion) || (match && exclusion)) {
|
||||||
candidatesItr.remove();
|
candidatesItr.remove();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -116,6 +115,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
private final Context context;
|
private final Context context;
|
||||||
private final int retentionSize;
|
private final int retentionSize;
|
||||||
private final long rollingMonitorInterval;
|
private final long rollingMonitorInterval;
|
||||||
|
private final boolean logAggregationInRolling;
|
||||||
private final NodeId nodeId;
|
private final NodeId nodeId;
|
||||||
// This variable is only for testing
|
// This variable is only for testing
|
||||||
private final AtomicBoolean waiting = new AtomicBoolean(false);
|
private final AtomicBoolean waiting = new AtomicBoolean(false);
|
||||||
|
@ -193,9 +193,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
}
|
}
|
||||||
this.rollingMonitorInterval = configuredRollingMonitorInterval;
|
this.rollingMonitorInterval = configuredRollingMonitorInterval;
|
||||||
}
|
}
|
||||||
|
this.logAggregationInRolling =
|
||||||
|
this.rollingMonitorInterval <= 0 || this.logAggregationContext == null
|
||||||
|
|| this.logAggregationContext.getRolledLogsIncludePattern() == null
|
||||||
|
|| this.logAggregationContext.getRolledLogsIncludePattern()
|
||||||
|
.isEmpty() ? false : true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void uploadLogsForContainers() {
|
private void uploadLogsForContainers(boolean appFinished) {
|
||||||
if (this.logAggregationDisabled) {
|
if (this.logAggregationDisabled) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -262,7 +267,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
containerLogAggregators.put(container, aggregator);
|
containerLogAggregators.put(container, aggregator);
|
||||||
}
|
}
|
||||||
Set<Path> uploadedFilePathsInThisCycle =
|
Set<Path> uploadedFilePathsInThisCycle =
|
||||||
aggregator.doContainerLogAggregation(writer);
|
aggregator.doContainerLogAggregation(writer, appFinished);
|
||||||
if (uploadedFilePathsInThisCycle.size() > 0) {
|
if (uploadedFilePathsInThisCycle.size() > 0) {
|
||||||
uploadedLogsInThisCycle = true;
|
uploadedLogsInThisCycle = true;
|
||||||
}
|
}
|
||||||
|
@ -394,12 +399,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
try {
|
try {
|
||||||
waiting.set(true);
|
waiting.set(true);
|
||||||
if (this.rollingMonitorInterval > 0) {
|
if (logAggregationInRolling) {
|
||||||
wait(this.rollingMonitorInterval * 1000);
|
wait(this.rollingMonitorInterval * 1000);
|
||||||
if (this.appFinishing.get() || this.aborted.get()) {
|
if (this.appFinishing.get() || this.aborted.get()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
uploadLogsForContainers();
|
uploadLogsForContainers(false);
|
||||||
} else {
|
} else {
|
||||||
wait(THREAD_SLEEP_TIME);
|
wait(THREAD_SLEEP_TIME);
|
||||||
}
|
}
|
||||||
|
@ -415,7 +420,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
}
|
}
|
||||||
|
|
||||||
// App is finished, upload the container logs.
|
// App is finished, upload the container logs.
|
||||||
uploadLogsForContainers();
|
uploadLogsForContainers(true);
|
||||||
|
|
||||||
// Remove the local app-log-dirs
|
// Remove the local app-log-dirs
|
||||||
List<Path> localAppLogDirs = new ArrayList<Path>();
|
List<Path> localAppLogDirs = new ArrayList<Path>();
|
||||||
|
@ -536,7 +541,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
this.containerId = containerId;
|
this.containerId = containerId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<Path> doContainerLogAggregation(LogWriter writer) {
|
public Set<Path> doContainerLogAggregation(LogWriter writer,
|
||||||
|
boolean appFinished) {
|
||||||
LOG.info("Uploading logs for container " + containerId
|
LOG.info("Uploading logs for container " + containerId
|
||||||
+ ". Current good log dirs are "
|
+ ". Current good log dirs are "
|
||||||
+ StringUtils.join(",", dirsHandler.getLogDirs()));
|
+ StringUtils.join(",", dirsHandler.getLogDirs()));
|
||||||
|
@ -544,7 +550,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
final LogValue logValue =
|
final LogValue logValue =
|
||||||
new LogValue(dirsHandler.getLogDirs(), containerId,
|
new LogValue(dirsHandler.getLogDirs(), containerId,
|
||||||
userUgi.getShortUserName(), logAggregationContext,
|
userUgi.getShortUserName(), logAggregationContext,
|
||||||
this.uploadedFileMeta);
|
this.uploadedFileMeta, appFinished);
|
||||||
try {
|
try {
|
||||||
writer.append(logKey, logValue);
|
writer.append(logKey, logValue);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -130,8 +130,10 @@ public class TestContainerManagerRecovery {
|
||||||
containerTokens, acls);
|
containerTokens, acls);
|
||||||
// create the logAggregationContext
|
// create the logAggregationContext
|
||||||
LogAggregationContext logAggregationContext =
|
LogAggregationContext logAggregationContext =
|
||||||
LogAggregationContext.newInstance("includePattern", "excludePattern");
|
LogAggregationContext.newInstance("includePattern", "excludePattern",
|
||||||
StartContainersResponse startResponse = startContainer(context, cm, cid,
|
"includePatternInRollingAggregation",
|
||||||
|
"excludePatternInRollingAggregation");
|
||||||
|
StartContainersResponse startResponse = startContainer(context, cm, cid,
|
||||||
clc, logAggregationContext);
|
clc, logAggregationContext);
|
||||||
assertTrue(startResponse.getFailedRequests().isEmpty());
|
assertTrue(startResponse.getFailedRequests().isEmpty());
|
||||||
assertEquals(1, context.getApplications().size());
|
assertEquals(1, context.getApplications().size());
|
||||||
|
@ -171,6 +173,10 @@ public class TestContainerManagerRecovery {
|
||||||
recovered.getIncludePattern());
|
recovered.getIncludePattern());
|
||||||
assertEquals(logAggregationContext.getExcludePattern(),
|
assertEquals(logAggregationContext.getExcludePattern(),
|
||||||
recovered.getExcludePattern());
|
recovered.getExcludePattern());
|
||||||
|
assertEquals(logAggregationContext.getRolledLogsIncludePattern(),
|
||||||
|
recovered.getRolledLogsIncludePattern());
|
||||||
|
assertEquals(logAggregationContext.getRolledLogsExcludePattern(),
|
||||||
|
recovered.getRolledLogsExcludePattern());
|
||||||
|
|
||||||
waitForAppState(app, ApplicationState.INITING);
|
waitForAppState(app, ApplicationState.INITING);
|
||||||
assertTrue(context.getApplicationACLsManager().checkAccess(
|
assertTrue(context.getApplicationACLsManager().checkAccess(
|
||||||
|
|
|
@ -698,7 +698,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String verifyContainerLogs(LogAggregationService logAggregationService,
|
private LogFileStatusInLastCycle verifyContainerLogs(LogAggregationService logAggregationService,
|
||||||
ApplicationId appId, ContainerId[] expectedContainerIds,
|
ApplicationId appId, ContainerId[] expectedContainerIds,
|
||||||
String[] logFiles, int numOfContainerLogs, boolean multiLogs)
|
String[] logFiles, int numOfContainerLogs, boolean multiLogs)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -743,7 +743,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
new AggregatedLogFormat.LogReader(this.conf, targetNodeFile.getPath());
|
new AggregatedLogFormat.LogReader(this.conf, targetNodeFile.getPath());
|
||||||
Assert.assertEquals(this.user, reader.getApplicationOwner());
|
Assert.assertEquals(this.user, reader.getApplicationOwner());
|
||||||
verifyAcls(reader.getApplicationAcls());
|
verifyAcls(reader.getApplicationAcls());
|
||||||
|
|
||||||
|
List<String> fileTypes = new ArrayList<String>();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Map<String, Map<String, String>> logMap =
|
Map<String, Map<String, String>> logMap =
|
||||||
new HashMap<String, Map<String, String>>();
|
new HashMap<String, Map<String, String>>();
|
||||||
|
@ -769,6 +771,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
|
|
||||||
Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
|
Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
|
||||||
String fileType = writtenLines[0].substring(8);
|
String fileType = writtenLines[0].substring(8);
|
||||||
|
fileTypes.add(fileType);
|
||||||
|
|
||||||
Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
|
Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
|
||||||
String fileLengthStr = writtenLines[1].substring(10);
|
String fileLengthStr = writtenLines[1].substring(10);
|
||||||
|
@ -811,7 +814,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
Assert.assertEquals(0, thisContainerMap.size());
|
Assert.assertEquals(0, thisContainerMap.size());
|
||||||
}
|
}
|
||||||
Assert.assertEquals(0, logMap.size());
|
Assert.assertEquals(0, logMap.size());
|
||||||
return targetNodeFile.getPath().getName();
|
return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(), fileTypes);
|
||||||
} finally {
|
} finally {
|
||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
|
@ -1289,6 +1292,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
throws Exception {
|
throws Exception {
|
||||||
LogAggregationContext logAggregationContextWithInterval =
|
LogAggregationContext logAggregationContextWithInterval =
|
||||||
Records.newRecord(LogAggregationContext.class);
|
Records.newRecord(LogAggregationContext.class);
|
||||||
|
// set IncludePattern/excludePattern in rolling fashion
|
||||||
|
// we expect all the logs except std_final will be uploaded
|
||||||
|
// when app is running. The std_final will be uploaded when
|
||||||
|
// the app finishes.
|
||||||
|
logAggregationContextWithInterval.setRolledLogsIncludePattern(".*");
|
||||||
|
logAggregationContextWithInterval.setRolledLogsExcludePattern("std_final");
|
||||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
this.remoteRootLogDir.getAbsolutePath());
|
this.remoteRootLogDir.getAbsolutePath());
|
||||||
|
@ -1338,9 +1347,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
|
this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
|
||||||
logAggregationContextWithInterval));
|
logAggregationContextWithInterval));
|
||||||
|
|
||||||
|
LogFileStatusInLastCycle logFileStatusInLastCycle = null;
|
||||||
// Simulate log-file creation
|
// Simulate log-file creation
|
||||||
String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" };
|
// create std_final in log directory which will not be aggregated
|
||||||
writeContainerLogs(appLogDir, container, logFiles1);
|
// until the app finishes.
|
||||||
|
String[] logFiles1WithFinalLog =
|
||||||
|
new String[] { "stdout", "stderr", "syslog", "std_final" };
|
||||||
|
String[] logFiles1 = new String[] { "stdout", "stderr", "syslog"};
|
||||||
|
writeContainerLogs(appLogDir, container, logFiles1WithFinalLog);
|
||||||
|
|
||||||
// Do log aggregation
|
// Do log aggregation
|
||||||
AppLogAggregatorImpl aggregator =
|
AppLogAggregatorImpl aggregator =
|
||||||
|
@ -1355,10 +1369,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
50, 1, false, null));
|
50, 1, false, null));
|
||||||
}
|
}
|
||||||
String logFileInLastCycle = null;
|
|
||||||
// Container logs should be uploaded
|
// Container logs should be uploaded
|
||||||
logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
|
logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
|
||||||
new ContainerId[] { container }, logFiles1, 3, true);
|
new ContainerId[] { container }, logFiles1, 3, true);
|
||||||
|
for(String logFile : logFiles1) {
|
||||||
|
Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
|
||||||
|
.contains(logFile));
|
||||||
|
}
|
||||||
|
// Make sure the std_final is not uploaded.
|
||||||
|
Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
|
||||||
|
.contains("std_final"));
|
||||||
|
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
@ -1380,15 +1400,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
|
|
||||||
if (retentionSizeLimitation) {
|
if (retentionSizeLimitation) {
|
||||||
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
50, 1, true, logFileInLastCycle));
|
50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle()));
|
||||||
} else {
|
} else {
|
||||||
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
50, 2, false, null));
|
50, 2, false, null));
|
||||||
}
|
}
|
||||||
// Container logs should be uploaded
|
// Container logs should be uploaded
|
||||||
logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
|
logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
|
||||||
new ContainerId[] { container }, logFiles2, 3, true);
|
new ContainerId[] { container }, logFiles2, 3, true);
|
||||||
|
|
||||||
|
for(String logFile : logFiles2) {
|
||||||
|
Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
|
||||||
|
.contains(logFile));
|
||||||
|
}
|
||||||
|
// Make sure the std_final is not uploaded.
|
||||||
|
Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
|
||||||
|
.contains("std_final"));
|
||||||
|
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
|
|
||||||
// create another logs
|
// create another logs
|
||||||
|
@ -1402,13 +1430,17 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
|
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
|
||||||
if (retentionSizeLimitation) {
|
if (retentionSizeLimitation) {
|
||||||
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
50, 1, true, logFileInLastCycle));
|
50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle()));
|
||||||
} else {
|
} else {
|
||||||
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
||||||
50, 3, false, null));
|
50, 3, false, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the app is finished. The log "std_final" should be aggregated this time.
|
||||||
|
String[] logFiles3WithFinalLog =
|
||||||
|
new String[] { "stdout_2", "stderr_2", "syslog_2", "std_final" };
|
||||||
verifyContainerLogs(logAggregationService, application,
|
verifyContainerLogs(logAggregationService, application,
|
||||||
new ContainerId[] { container }, logFiles3, 3, true);
|
new ContainerId[] { container }, logFiles3WithFinalLog, 4, true);
|
||||||
logAggregationService.stop();
|
logAggregationService.stop();
|
||||||
assertEquals(0, logAggregationService.getNumAggregators());
|
assertEquals(0, logAggregationService.getNumAggregators());
|
||||||
}
|
}
|
||||||
|
@ -1512,4 +1544,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
return numOfLogsAvailable(logAggregationService, application, sizeLimited,
|
return numOfLogsAvailable(logAggregationService, application, sizeLimited,
|
||||||
lastLogFile) == expectNum;
|
lastLogFile) == expectNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class LogFileStatusInLastCycle {
|
||||||
|
private String logFilePathInLastCycle;
|
||||||
|
private List<String> logFileTypesInLastCycle;
|
||||||
|
|
||||||
|
public LogFileStatusInLastCycle(String logFilePathInLastCycle,
|
||||||
|
List<String> logFileTypesInLastCycle) {
|
||||||
|
this.logFilePathInLastCycle = logFilePathInLastCycle;
|
||||||
|
this.logFileTypesInLastCycle = logFileTypesInLastCycle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLogFilePathInLastCycle() {
|
||||||
|
return this.logFilePathInLastCycle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getLogFileTypesInLastCycle() {
|
||||||
|
return this.logFileTypesInLastCycle;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -230,12 +230,18 @@ public class TestContainerAllocation {
|
||||||
// create a not-null LogAggregationContext
|
// create a not-null LogAggregationContext
|
||||||
LogAggregationContext logAggregationContext =
|
LogAggregationContext logAggregationContext =
|
||||||
LogAggregationContext.newInstance(
|
LogAggregationContext.newInstance(
|
||||||
"includePattern", "excludePattern");
|
"includePattern", "excludePattern",
|
||||||
|
"rolledLogsIncludePattern",
|
||||||
|
"rolledLogsExcludePattern");
|
||||||
LogAggregationContext returned =
|
LogAggregationContext returned =
|
||||||
getLogAggregationContextFromContainerToken(rm1, nm2,
|
getLogAggregationContextFromContainerToken(rm1, nm2,
|
||||||
logAggregationContext);
|
logAggregationContext);
|
||||||
Assert.assertEquals("includePattern", returned.getIncludePattern());
|
Assert.assertEquals("includePattern", returned.getIncludePattern());
|
||||||
Assert.assertEquals("excludePattern", returned.getExcludePattern());
|
Assert.assertEquals("excludePattern", returned.getExcludePattern());
|
||||||
|
Assert.assertEquals("rolledLogsIncludePattern",
|
||||||
|
returned.getRolledLogsIncludePattern());
|
||||||
|
Assert.assertEquals("rolledLogsExcludePattern",
|
||||||
|
returned.getRolledLogsExcludePattern());
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue