YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating running logs of application when rolling is enabled. Contributed by Xuan Gong.

(cherry picked from commit 863079bb87)
This commit is contained in:
Vinod Kumar Vavilapalli 2015-03-12 13:32:29 -07:00
parent 7455412a24
commit 53aa3a4d1f
9 changed files with 227 additions and 48 deletions

View File

@ -712,6 +712,9 @@ Release 2.7.0 - UNRELEASED
YARN-3338. Exclude jline dependency from YARN. (Zhijie Shen via xgong)
YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating
running logs of application when rolling is enabled. (Xuan Gong via vinodkv)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -32,11 +32,20 @@
* <ul>
* <li>includePattern. It uses Java Regex to filter the log files
* which match the defined include pattern and those log files
* will be uploaded. </li>
* will be uploaded when the application finishes. </li>
* <li>excludePattern. It uses Java Regex to filter the log files
* which match the defined exclude pattern and those log files
* will not be uploaded. If the log file name matches both the
* include and the exclude pattern, this file will be excluded eventually</li>
* will not be uploaded when application finishes. If the log file
* name matches both the include and the exclude pattern, this file
* will be excluded eventually</li>
* <li>rolledLogsIncludePattern. It uses Java Regex to filter the log files
* which match the defined include pattern and those log files
* will be aggregated in a rolling fashion.</li>
* <li>rolledLogsExcludePattern. It uses Java Regex to filter the log files
* which match the defined exclude pattern and those log files
* will not be aggregated in a rolling fashion. If the log file
* name matches both the include and the exclude pattern, this file
* will be excluded eventually</li>
* </ul>
* </p>
*
@ -57,8 +66,23 @@ public static LogAggregationContext newInstance(String includePattern,
return context;
}
@Public
@Unstable
public static LogAggregationContext newInstance(String includePattern,
String excludePattern, String rolledLogsIncludePattern,
String rolledLogsExcludePattern) {
LogAggregationContext context =
Records.newRecord(LogAggregationContext.class);
context.setIncludePattern(includePattern);
context.setExcludePattern(excludePattern);
context.setRolledLogsIncludePattern(rolledLogsIncludePattern);
context.setRolledLogsExcludePattern(rolledLogsExcludePattern);
return context;
}
/**
* Get include pattern
* Get include pattern. This includePattern only takes affect
* on logs that exist at the time of application finish.
*
* @return include pattern
*/
@ -67,7 +91,8 @@ public static LogAggregationContext newInstance(String includePattern,
public abstract String getIncludePattern();
/**
* Set include pattern
* Set include pattern. This includePattern only takes affect
* on logs that exist at the time of application finish.
*
* @param includePattern
*/
@ -76,7 +101,8 @@ public static LogAggregationContext newInstance(String includePattern,
public abstract void setIncludePattern(String includePattern);
/**
* Get exclude pattern
* Get exclude pattern. This excludePattern only takes affect
* on logs that exist at the time of application finish.
*
* @return exclude pattern
*/
@ -85,11 +111,50 @@ public static LogAggregationContext newInstance(String includePattern,
public abstract String getExcludePattern();
/**
* Set exclude pattern
* Set exclude pattern. This excludePattern only takes affect
* on logs that exist at the time of application finish.
*
* @param excludePattern
*/
@Public
@Unstable
public abstract void setExcludePattern(String excludePattern);
/**
* Get include pattern in a rolling fashion.
*
* @return include pattern
*/
@Public
@Unstable
public abstract String getRolledLogsIncludePattern();
/**
* Set include pattern in a rolling fashion.
*
* @param rolledLogsIncludePattern
*/
@Public
@Unstable
public abstract void setRolledLogsIncludePattern(
String rolledLogsIncludePattern);
/**
* Get exclude pattern for aggregation in a rolling fashion.
*
* @return exclude pattern
*/
@Public
@Unstable
public abstract String getRolledLogsExcludePattern();
/**
* Set exclude pattern for in a rolling fashion.
*
* @param rolledLogsExcludePattern
*/
@Public
@Unstable
public abstract void setRolledLogsExcludePattern(
String rolledLogsExcludePattern);
}

View File

@ -314,6 +314,8 @@ message ApplicationSubmissionContextProto {
message LogAggregationContextProto {
optional string include_pattern = 1 [default = ".*"];
optional string exclude_pattern = 2 [default = ""];
optional string rolled_logs_include_pattern = 3 [default = ""];
optional string rolled_logs_exclude_pattern = 4 [default = ".*"];
}
enum ApplicationAccessTypeProto {

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder;
import com.google.protobuf.TextFormat;
public class LogAggregationContextPBImpl extends LogAggregationContext{
@ -116,4 +117,42 @@ public void setExcludePattern(String excludePattern) {
}
builder.setExcludePattern(excludePattern);
}
@Override
public String getRolledLogsIncludePattern() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasRolledLogsIncludePattern()) {
return null;
}
return p.getRolledLogsIncludePattern();
}
@Override
public void setRolledLogsIncludePattern(String rolledLogsIncludePattern) {
maybeInitBuilder();
if (rolledLogsIncludePattern == null) {
builder.clearRolledLogsIncludePattern();
return;
}
builder.setRolledLogsIncludePattern(rolledLogsIncludePattern);
}
@Override
public String getRolledLogsExcludePattern() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasRolledLogsExcludePattern()) {
return null;
}
return p.getRolledLogsExcludePattern();
}
@Override
public void setRolledLogsExcludePattern(String rolledLogsExcludePattern) {
maybeInitBuilder();
if (rolledLogsExcludePattern == null) {
builder.clearRolledLogsExcludePattern();
return;
}
builder.setRolledLogsExcludePattern(rolledLogsExcludePattern);
}
}

View File

@ -167,17 +167,18 @@ public static class LogValue {
private Set<File> uploadedFiles = new HashSet<File>();
private final Set<String> alreadyUploadedLogFiles;
private Set<String> allExistingFileMeta = new HashSet<String>();
private final boolean appFinished;
// TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format
public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user) {
this(rootLogDirs, containerId, user, null, new HashSet<String>());
this(rootLogDirs, containerId, user, null, new HashSet<String>(), true);
}
public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user, LogAggregationContext logAggregationContext,
Set<String> alreadyUploadedLogFiles) {
Set<String> alreadyUploadedLogFiles, boolean appFinished) {
this.rootLogDirs = new ArrayList<String>(rootLogDirs);
this.containerId = containerId;
this.user = user;
@ -186,6 +187,7 @@ public LogValue(List<String> rootLogDirs, ContainerId containerId,
Collections.sort(this.rootLogDirs);
this.logAggregationContext = logAggregationContext;
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
this.appFinished = appFinished;
}
private Set<File> getPendingLogFilesToUploadForThisContainer() {
@ -296,17 +298,15 @@ private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
}
if (this.logAggregationContext != null && candidates.size() > 0) {
if (this.logAggregationContext.getIncludePattern() != null
&& !this.logAggregationContext.getIncludePattern().isEmpty()) {
filterFiles(this.logAggregationContext.getIncludePattern(),
filterFiles(
this.appFinished ? this.logAggregationContext.getIncludePattern()
: this.logAggregationContext.getRolledLogsIncludePattern(),
candidates, false);
}
if (this.logAggregationContext.getExcludePattern() != null
&& !this.logAggregationContext.getExcludePattern().isEmpty()) {
filterFiles(this.logAggregationContext.getExcludePattern(),
filterFiles(
this.appFinished ? this.logAggregationContext.getExcludePattern()
: this.logAggregationContext.getRolledLogsExcludePattern(),
candidates, true);
}
Iterable<File> mask =
Iterables.filter(candidates, new Predicate<File>() {
@ -323,8 +323,8 @@ public boolean apply(File next) {
private void filterFiles(String pattern, Set<File> candidates,
boolean exclusion) {
Pattern filterPattern =
Pattern.compile(pattern);
if (pattern != null && !pattern.isEmpty()) {
Pattern filterPattern = Pattern.compile(pattern);
for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
.hasNext();) {
File candidate = candidatesItr.next();
@ -334,6 +334,7 @@ private void filterFiles(String pattern, Set<File> candidates,
}
}
}
}
public Set<Path> getCurrentUpLoadedFilesPath() {
Set<Path> path = new HashSet<Path>();

View File

@ -44,7 +44,6 @@
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -116,6 +115,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final Context context;
private final int retentionSize;
private final long rollingMonitorInterval;
private final boolean logAggregationInRolling;
private final NodeId nodeId;
// This variable is only for testing
private final AtomicBoolean waiting = new AtomicBoolean(false);
@ -193,9 +193,14 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
}
this.rollingMonitorInterval = configuredRollingMonitorInterval;
}
this.logAggregationInRolling =
this.rollingMonitorInterval <= 0 || this.logAggregationContext == null
|| this.logAggregationContext.getRolledLogsIncludePattern() == null
|| this.logAggregationContext.getRolledLogsIncludePattern()
.isEmpty() ? false : true;
}
private void uploadLogsForContainers() {
private void uploadLogsForContainers(boolean appFinished) {
if (this.logAggregationDisabled) {
return;
}
@ -262,7 +267,7 @@ private void uploadLogsForContainers() {
containerLogAggregators.put(container, aggregator);
}
Set<Path> uploadedFilePathsInThisCycle =
aggregator.doContainerLogAggregation(writer);
aggregator.doContainerLogAggregation(writer, appFinished);
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
}
@ -394,12 +399,12 @@ private void doAppLogAggregation() {
synchronized(this) {
try {
waiting.set(true);
if (this.rollingMonitorInterval > 0) {
if (logAggregationInRolling) {
wait(this.rollingMonitorInterval * 1000);
if (this.appFinishing.get() || this.aborted.get()) {
break;
}
uploadLogsForContainers();
uploadLogsForContainers(false);
} else {
wait(THREAD_SLEEP_TIME);
}
@ -415,7 +420,7 @@ private void doAppLogAggregation() {
}
// App is finished, upload the container logs.
uploadLogsForContainers();
uploadLogsForContainers(true);
// Remove the local app-log-dirs
List<Path> localAppLogDirs = new ArrayList<Path>();
@ -536,7 +541,8 @@ public ContainerLogAggregator(ContainerId containerId) {
this.containerId = containerId;
}
public Set<Path> doContainerLogAggregation(LogWriter writer) {
public Set<Path> doContainerLogAggregation(LogWriter writer,
boolean appFinished) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirs()));
@ -544,7 +550,7 @@ public Set<Path> doContainerLogAggregation(LogWriter writer) {
final LogValue logValue =
new LogValue(dirsHandler.getLogDirs(), containerId,
userUgi.getShortUserName(), logAggregationContext,
this.uploadedFileMeta);
this.uploadedFileMeta, appFinished);
try {
writer.append(logKey, logValue);
} catch (Exception e) {

View File

@ -130,7 +130,9 @@ public void testApplicationRecovery() throws Exception {
containerTokens, acls);
// create the logAggregationContext
LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance("includePattern", "excludePattern");
LogAggregationContext.newInstance("includePattern", "excludePattern",
"includePatternInRollingAggregation",
"excludePatternInRollingAggregation");
StartContainersResponse startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
assertTrue(startResponse.getFailedRequests().isEmpty());
@ -171,6 +173,10 @@ public void testApplicationRecovery() throws Exception {
recovered.getIncludePattern());
assertEquals(logAggregationContext.getExcludePattern(),
recovered.getExcludePattern());
assertEquals(logAggregationContext.getRolledLogsIncludePattern(),
recovered.getRolledLogsIncludePattern());
assertEquals(logAggregationContext.getRolledLogsExcludePattern(),
recovered.getRolledLogsExcludePattern());
waitForAppState(app, ApplicationState.INITING);
assertTrue(context.getApplicationACLsManager().checkAccess(

View File

@ -698,7 +698,7 @@ private void writeContainerLogs(File appLogDir, ContainerId containerId,
}
}
private String verifyContainerLogs(LogAggregationService logAggregationService,
private LogFileStatusInLastCycle verifyContainerLogs(LogAggregationService logAggregationService,
ApplicationId appId, ContainerId[] expectedContainerIds,
String[] logFiles, int numOfContainerLogs, boolean multiLogs)
throws IOException {
@ -744,6 +744,8 @@ private String verifyContainerLogs(LogAggregationService logAggregationService,
Assert.assertEquals(this.user, reader.getApplicationOwner());
verifyAcls(reader.getApplicationAcls());
List<String> fileTypes = new ArrayList<String>();
try {
Map<String, Map<String, String>> logMap =
new HashMap<String, Map<String, String>>();
@ -769,6 +771,7 @@ private String verifyContainerLogs(LogAggregationService logAggregationService,
Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
String fileType = writtenLines[0].substring(8);
fileTypes.add(fileType);
Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
String fileLengthStr = writtenLines[1].substring(10);
@ -811,7 +814,7 @@ private String verifyContainerLogs(LogAggregationService logAggregationService,
Assert.assertEquals(0, thisContainerMap.size());
}
Assert.assertEquals(0, logMap.size());
return targetNodeFile.getPath().getName();
return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(), fileTypes);
} finally {
reader.close();
}
@ -1289,6 +1292,12 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
throws Exception {
LogAggregationContext logAggregationContextWithInterval =
Records.newRecord(LogAggregationContext.class);
// set IncludePattern/excludePattern in rolling fashion
// we expect all the logs except std_final will be uploaded
// when app is running. The std_final will be uploaded when
// the app finishes.
logAggregationContextWithInterval.setRolledLogsIncludePattern(".*");
logAggregationContextWithInterval.setRolledLogsExcludePattern("std_final");
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
@ -1338,9 +1347,14 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
logAggregationContextWithInterval));
LogFileStatusInLastCycle logFileStatusInLastCycle = null;
// Simulate log-file creation
String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" };
writeContainerLogs(appLogDir, container, logFiles1);
// create std_final in log directory which will not be aggregated
// until the app finishes.
String[] logFiles1WithFinalLog =
new String[] { "stdout", "stderr", "syslog", "std_final" };
String[] logFiles1 = new String[] { "stdout", "stderr", "syslog"};
writeContainerLogs(appLogDir, container, logFiles1WithFinalLog);
// Do log aggregation
AppLogAggregatorImpl aggregator =
@ -1355,10 +1369,16 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 1, false, null));
}
String logFileInLastCycle = null;
// Container logs should be uploaded
logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles1, 3, true);
for(String logFile : logFiles1) {
Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
.contains(logFile));
}
// Make sure the std_final is not uploaded.
Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
.contains("std_final"));
Thread.sleep(2000);
@ -1380,15 +1400,23 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
if (retentionSizeLimitation) {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 1, true, logFileInLastCycle));
50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle()));
} else {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 2, false, null));
}
// Container logs should be uploaded
logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles2, 3, true);
for(String logFile : logFiles2) {
Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
.contains(logFile));
}
// Make sure the std_final is not uploaded.
Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
.contains("std_final"));
Thread.sleep(2000);
// create another logs
@ -1402,13 +1430,17 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
if (retentionSizeLimitation) {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 1, true, logFileInLastCycle));
50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle()));
} else {
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
50, 3, false, null));
}
// the app is finished. The log "std_final" should be aggregated this time.
String[] logFiles3WithFinalLog =
new String[] { "stdout_2", "stderr_2", "syslog_2", "std_final" };
verifyContainerLogs(logAggregationService, application,
new ContainerId[] { container }, logFiles3, 3, true);
new ContainerId[] { container }, logFiles3WithFinalLog, 4, true);
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
}
@ -1512,4 +1544,23 @@ private boolean waitAndCheckLogNum(
return numOfLogsAvailable(logAggregationService, application, sizeLimited,
lastLogFile) == expectNum;
}
private static class LogFileStatusInLastCycle {
private String logFilePathInLastCycle;
private List<String> logFileTypesInLastCycle;
public LogFileStatusInLastCycle(String logFilePathInLastCycle,
List<String> logFileTypesInLastCycle) {
this.logFilePathInLastCycle = logFilePathInLastCycle;
this.logFileTypesInLastCycle = logFileTypesInLastCycle;
}
public String getLogFilePathInLastCycle() {
return this.logFilePathInLastCycle;
}
public List<String> getLogFileTypesInLastCycle() {
return this.logFileTypesInLastCycle;
}
}
}

View File

@ -230,12 +230,18 @@ public void testLogAggregationContextPassedIntoContainerToken()
// create a not-null LogAggregationContext
LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance(
"includePattern", "excludePattern");
"includePattern", "excludePattern",
"rolledLogsIncludePattern",
"rolledLogsExcludePattern");
LogAggregationContext returned =
getLogAggregationContextFromContainerToken(rm1, nm2,
logAggregationContext);
Assert.assertEquals("includePattern", returned.getIncludePattern());
Assert.assertEquals("excludePattern", returned.getExcludePattern());
Assert.assertEquals("rolledLogsIncludePattern",
returned.getRolledLogsIncludePattern());
Assert.assertEquals("rolledLogsExcludePattern",
returned.getRolledLogsExcludePattern());
rm1.stop();
}