YARN-2651. Spun off LogRollingInterval from LogAggregationContext. Contributed by Xuan Gong.

(cherry picked from commit 4aed2d8e91)
This commit is contained in:
Zhijie Shen 2014-10-13 10:53:37 -07:00
parent c3dfc71193
commit e51ae64761
10 changed files with 31 additions and 54 deletions

View File

@ -600,6 +600,9 @@ Release 2.6.0 - UNRELEASED
YARN-2667. Fix the release audit warning caused by hadoop-yarn-registry YARN-2667. Fix the release audit warning caused by hadoop-yarn-registry
(Yi Liu via jlowe) (Yi Liu via jlowe)
YARN-2651. Spun off LogRollingInterval from LogAggregationContext. (Xuan Gong
via zjshen)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -37,13 +37,6 @@ import org.apache.hadoop.yarn.util.Records;
* which match the defined exclude pattern and those log files * which match the defined exclude pattern and those log files
* will not be uploaded. If the log file name matches both the * will not be uploaded. If the log file name matches both the
* include and the exclude pattern, this file will be excluded eventually</li> * include and the exclude pattern, this file will be excluded eventually</li>
* <li>rollingIntervalSeconds. The default value is -1. By default,
* the logAggregationService only uploads container logs when
* the application is finished. This configure defines
* how often the logAggregationSerivce uploads container logs in seconds.
* By setting this configure, the logAggregationSerivce can upload container
* logs periodically when the application is running.
* </li>
* </ul> * </ul>
* </p> * </p>
* *
@ -57,11 +50,10 @@ public abstract class LogAggregationContext {
@Public @Public
@Unstable @Unstable
public static LogAggregationContext newInstance(String includePattern, public static LogAggregationContext newInstance(String includePattern,
String excludePattern, long rollingIntervalSeconds) { String excludePattern) {
LogAggregationContext context = Records.newRecord(LogAggregationContext.class); LogAggregationContext context = Records.newRecord(LogAggregationContext.class);
context.setIncludePattern(includePattern); context.setIncludePattern(includePattern);
context.setExcludePattern(excludePattern); context.setExcludePattern(excludePattern);
context.setRollingIntervalSeconds(rollingIntervalSeconds);
return context; return context;
} }
@ -100,22 +92,4 @@ public abstract class LogAggregationContext {
@Public @Public
@Unstable @Unstable
public abstract void setExcludePattern(String excludePattern); public abstract void setExcludePattern(String excludePattern);
/**
* Get rollingIntervalSeconds
*
* @return the rollingIntervalSeconds
*/
@Public
@Unstable
public abstract long getRollingIntervalSeconds();
/**
* Set rollingIntervalSeconds
*
* @param rollingIntervalSeconds
*/
@Public
@Unstable
public abstract void setRollingIntervalSeconds(long rollingIntervalSeconds);
} }

View File

@ -711,6 +711,13 @@ public class YarnConfiguration extends Configuration {
+ "log.retain-seconds"; + "log.retain-seconds";
public static final long DEFAULT_NM_LOG_RETAIN_SECONDS = 3 * 60 * 60; public static final long DEFAULT_NM_LOG_RETAIN_SECONDS = 3 * 60 * 60;
/**
* Define how often NMs wake up and upload log files
*/
public static final String NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS =
NM_PREFIX + "log-aggregation.roll-monitoring-interval-seconds";
public static final long
DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS = -1;
/** /**
* Number of threads used in log cleanup. Only applicable if Log aggregation * Number of threads used in log cleanup. Only applicable if Log aggregation
* is disabled * is disabled

View File

@ -302,7 +302,6 @@ message ApplicationSubmissionContextProto {
message LogAggregationContextProto { message LogAggregationContextProto {
optional string include_pattern = 1 [default = ".*"]; optional string include_pattern = 1 [default = ".*"];
optional string exclude_pattern = 2 [default = ""]; optional string exclude_pattern = 2 [default = ""];
optional int64 rolling_interval_seconds = 3 [default = -1];
} }
enum ApplicationAccessTypeProto { enum ApplicationAccessTypeProto {

View File

@ -116,19 +116,4 @@ public class LogAggregationContextPBImpl extends LogAggregationContext{
} }
builder.setExcludePattern(excludePattern); builder.setExcludePattern(excludePattern);
} }
@Override
public long getRollingIntervalSeconds() {
LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
if (! p.hasRollingIntervalSeconds()) {
return -1;
}
return p.getRollingIntervalSeconds();
}
@Override
public void setRollingIntervalSeconds(long rollingIntervalSeconds) {
maybeInitBuilder();
builder.setRollingIntervalSeconds(rollingIntervalSeconds);
}
} }

View File

@ -1522,4 +1522,14 @@
<value>Client</value> <value>Client</value>
</property> </property>
<property>
<description>Defines how often NMs wake up to upload log files.
The default value is -1. By default, the logs will be uploaded when
the application is finished. By setting this configure, logs can be uploaded
periodically when the application is running. The minimum rolling-interval-seconds
can be set is 3600.
</description>
<name>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds</name>
<value>-1</value>
</property>
</configuration> </configuration>

View File

@ -148,9 +148,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
} else { } else {
this.retentionSize = configuredRentionSize; this.retentionSize = configuredRentionSize;
} }
long configuredRollingMonitorInterval = long configuredRollingMonitorInterval = conf.getLong(
this.logAggregationContext == null ? -1 : this.logAggregationContext YarnConfiguration
.getRollingIntervalSeconds(); .NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
YarnConfiguration
.DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS);
boolean debug_mode = boolean debug_mode =
conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED,
DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED); DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED);

View File

@ -130,8 +130,7 @@ public class TestContainerManagerRecovery {
containerTokens, acls); containerTokens, acls);
// create the logAggregationContext // create the logAggregationContext
LogAggregationContext logAggregationContext = LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance("includePattern", "excludePattern", LogAggregationContext.newInstance("includePattern", "excludePattern");
1000);
StartContainersResponse startResponse = startContainer(context, cm, cid, StartContainersResponse startResponse = startContainer(context, cm, cid,
clc, logAggregationContext); clc, logAggregationContext);
assertTrue(startResponse.getFailedRequests().isEmpty()); assertTrue(startResponse.getFailedRequests().isEmpty());
@ -168,8 +167,6 @@ public class TestContainerManagerRecovery {
LogAggregationContext recovered = LogAggregationContext recovered =
((ApplicationImpl) app).getLogAggregationContext(); ((ApplicationImpl) app).getLogAggregationContext();
assertNotNull(recovered); assertNotNull(recovered);
assertEquals(logAggregationContext.getRollingIntervalSeconds(),
recovered.getRollingIntervalSeconds());
assertEquals(logAggregationContext.getIncludePattern(), assertEquals(logAggregationContext.getIncludePattern(),
recovered.getIncludePattern()); recovered.getIncludePattern());
assertEquals(logAggregationContext.getExcludePattern(), assertEquals(logAggregationContext.getExcludePattern(),

View File

@ -1235,10 +1235,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
throws Exception { throws Exception {
LogAggregationContext logAggregationContextWithInterval = LogAggregationContext logAggregationContextWithInterval =
Records.newRecord(LogAggregationContext.class); Records.newRecord(LogAggregationContext.class);
logAggregationContextWithInterval.setRollingIntervalSeconds(5000);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath()); this.remoteRootLogDir.getAbsolutePath());
this.conf.setLong(
YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
3600);
if (retentionSizeLimitation) { if (retentionSizeLimitation) {
// set the retention size as 1. The number of logs for one application // set the retention size as 1. The number of logs for one application
// in one NM should be 1. // in one NM should be 1.

View File

@ -212,16 +212,14 @@ public class TestContainerAllocation {
.assertNull(getLogAggregationContextFromContainerToken(rm1, nm1, null)); .assertNull(getLogAggregationContextFromContainerToken(rm1, nm1, null));
// create a not-null LogAggregationContext // create a not-null LogAggregationContext
final int interval = 2000;
LogAggregationContext logAggregationContext = LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance( LogAggregationContext.newInstance(
"includePattern", "excludePattern", interval); "includePattern", "excludePattern");
LogAggregationContext returned = LogAggregationContext returned =
getLogAggregationContextFromContainerToken(rm1, nm2, getLogAggregationContextFromContainerToken(rm1, nm2,
logAggregationContext); logAggregationContext);
Assert.assertEquals("includePattern", returned.getIncludePattern()); Assert.assertEquals("includePattern", returned.getIncludePattern());
Assert.assertEquals("excludePattern", returned.getExcludePattern()); Assert.assertEquals("excludePattern", returned.getExcludePattern());
Assert.assertEquals(interval, returned.getRollingIntervalSeconds());
rm1.stop(); rm1.stop();
} }