YARN-9629. Support configurable MIN_LOG_ROLLING_INTERVAL. Contributed by Adam Antal.
(cherry picked from commit a2a8be18cb
)
This commit is contained in:
parent
d18986e4e8
commit
4638fa00fc
|
@ -1383,6 +1383,14 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final long
|
public static final long
|
||||||
DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS = -1;
|
DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS = -1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The allowed hard minimum limit for {@link
|
||||||
|
* YarnConfiguration#NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS}.
|
||||||
|
*/
|
||||||
|
public static final String MIN_LOG_ROLLING_INTERVAL_SECONDS = NM_PREFIX
|
||||||
|
+ "log-aggregation.roll-monitoring-interval-seconds.min";
|
||||||
|
public static final long MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT = 3600;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Define how many aggregated log files per application per NM we can have
|
* Define how many aggregated log files per application per NM we can have
|
||||||
* in remote file system.
|
* in remote file system.
|
||||||
|
|
|
@ -3134,14 +3134,25 @@
|
||||||
<property>
|
<property>
|
||||||
<description>Defines how often NMs wake up to upload log files.
|
<description>Defines how often NMs wake up to upload log files.
|
||||||
The default value is -1. By default, the logs will be uploaded when
|
The default value is -1. By default, the logs will be uploaded when
|
||||||
the application is finished. By setting this configure, logs can be uploaded
|
the application is finished. By setting this configuration logs can
|
||||||
periodically when the application is running. The minimum rolling-interval-seconds
|
be uploaded periodically while the application is running.
|
||||||
can be set is 3600.
|
The minimum positive accepted value can be configured by the setting
|
||||||
|
"yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds.min".
|
||||||
</description>
|
</description>
|
||||||
<name>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds</name>
|
<name>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds</name>
|
||||||
<value>-1</value>
|
<value>-1</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>Defines the positive minimum hard limit for
|
||||||
|
"yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds".
|
||||||
|
If this configuration has been set less than its default value (3600)
|
||||||
|
the NodeManager may raise a warning.
|
||||||
|
</description>
|
||||||
|
<name>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds.min</name>
|
||||||
|
<value>3600</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Define how many aggregated log files per application per NM
|
<description>Define how many aggregated log files per application per NM
|
||||||
we can have in remote file system. By default, the total number of
|
we can have in remote file system. By default, the total number of
|
||||||
|
|
|
@ -71,7 +71,6 @@ public class LogAggregationService extends AbstractService implements
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(LogAggregationService.class);
|
LoggerFactory.getLogger(LogAggregationService.class);
|
||||||
private static final long MIN_LOG_ROLLING_INTERVAL = 3600;
|
|
||||||
// This configuration is for debug and test purpose. By setting
|
// This configuration is for debug and test purpose. By setting
|
||||||
// this configuration as true. We can break the lower bound of
|
// this configuration as true. We can break the lower bound of
|
||||||
// NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
|
// NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
|
||||||
|
@ -106,6 +105,49 @@ public class LogAggregationService extends AbstractService implements
|
||||||
this.invalidTokenApps = ConcurrentHashMap.newKeySet();
|
this.invalidTokenApps = ConcurrentHashMap.newKeySet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static long calculateRollingMonitorInterval(Configuration conf) {
|
||||||
|
long interval = conf.getLong(
|
||||||
|
YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
|
||||||
|
YarnConfiguration.
|
||||||
|
DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS);
|
||||||
|
|
||||||
|
if (interval <= 0) {
|
||||||
|
LOG.info("rollingMonitorInterval is set as " + interval
|
||||||
|
+ ". The log rolling monitoring interval is disabled. "
|
||||||
|
+ "The logs will be aggregated after this application is finished.");
|
||||||
|
} else {
|
||||||
|
boolean logAggregationDebugMode =
|
||||||
|
conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, false);
|
||||||
|
long minRollingMonitorInterval = conf.getLong(
|
||||||
|
YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS,
|
||||||
|
YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT);
|
||||||
|
|
||||||
|
boolean warnHardMinLimitLowerThanDefault = minRollingMonitorInterval <
|
||||||
|
YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT &&
|
||||||
|
!logAggregationDebugMode;
|
||||||
|
if (warnHardMinLimitLowerThanDefault) {
|
||||||
|
LOG.warn("{} has been set to {}, which is less than the default "
|
||||||
|
+ "minimum value {}. This may impact NodeManager's performance.",
|
||||||
|
YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS,
|
||||||
|
minRollingMonitorInterval,
|
||||||
|
YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT);
|
||||||
|
}
|
||||||
|
boolean lowerThanHardLimit = interval < minRollingMonitorInterval;
|
||||||
|
if (lowerThanHardLimit) {
|
||||||
|
if (logAggregationDebugMode) {
|
||||||
|
LOG.info("Log aggregation debug mode enabled. " +
|
||||||
|
"Skipped checking minimum limit.");
|
||||||
|
} else {
|
||||||
|
LOG.warn("rollingMonitorInterval should be more than " +
|
||||||
|
"or equal to {} seconds. Using {} seconds instead.",
|
||||||
|
minRollingMonitorInterval, minRollingMonitorInterval);
|
||||||
|
interval = minRollingMonitorInterval;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return interval;
|
||||||
|
}
|
||||||
|
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
int threadPoolSize = getAggregatorThreadPoolSize(conf);
|
int threadPoolSize = getAggregatorThreadPoolSize(conf);
|
||||||
this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
|
this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
|
||||||
|
@ -113,33 +155,10 @@ public class LogAggregationService extends AbstractService implements
|
||||||
.setNameFormat("LogAggregationService #%d")
|
.setNameFormat("LogAggregationService #%d")
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
rollingMonitorInterval = conf.getLong(
|
rollingMonitorInterval = calculateRollingMonitorInterval(conf);
|
||||||
YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
|
LOG.info("rollingMonitorInterval is set as {}. The logs will be " +
|
||||||
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS);
|
"aggregated every {} seconds", rollingMonitorInterval,
|
||||||
|
rollingMonitorInterval);
|
||||||
boolean logAggregationDebugMode =
|
|
||||||
conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, false);
|
|
||||||
|
|
||||||
if (rollingMonitorInterval > 0
|
|
||||||
&& rollingMonitorInterval < MIN_LOG_ROLLING_INTERVAL) {
|
|
||||||
if (logAggregationDebugMode) {
|
|
||||||
LOG.info("Log aggregation debug mode enabled. rollingMonitorInterval = "
|
|
||||||
+ rollingMonitorInterval);
|
|
||||||
} else {
|
|
||||||
LOG.warn("rollingMonitorInterval should be more than or equal to {} " +
|
|
||||||
"seconds. Using {} seconds instead.",
|
|
||||||
MIN_LOG_ROLLING_INTERVAL, MIN_LOG_ROLLING_INTERVAL);
|
|
||||||
this.rollingMonitorInterval = MIN_LOG_ROLLING_INTERVAL;
|
|
||||||
}
|
|
||||||
} else if (rollingMonitorInterval <= 0) {
|
|
||||||
LOG.info("rollingMonitorInterval is set as " + rollingMonitorInterval
|
|
||||||
+ ". The log rolling monitoring interval is disabled. "
|
|
||||||
+ "The logs will be aggregated after this application is finished.");
|
|
||||||
} else {
|
|
||||||
LOG.info("rollingMonitorInterval is set as " + rollingMonitorInterval
|
|
||||||
+ ". The logs will be aggregated every " + rollingMonitorInterval
|
|
||||||
+ " seconds");
|
|
||||||
}
|
|
||||||
|
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
@ -413,6 +432,10 @@ public class LogAggregationService extends AbstractService implements
|
||||||
return this.nodeId;
|
return this.nodeId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getRollingMonitorInterval() {
|
||||||
|
return rollingMonitorInterval;
|
||||||
|
}
|
||||||
|
|
||||||
private int getAggregatorThreadPoolSize(Configuration conf) {
|
private int getAggregatorThreadPoolSize(Configuration conf) {
|
||||||
int threadPoolSize;
|
int threadPoolSize;
|
||||||
|
|
|
@ -2611,4 +2611,43 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
return this.logFileTypesInLastCycle;
|
return this.logFileTypesInLastCycle;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRollingMonitorIntervalDefault() {
|
||||||
|
LogAggregationService logAggregationService =
|
||||||
|
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||||
|
super.dirsHandler);
|
||||||
|
logAggregationService.init(this.conf);
|
||||||
|
|
||||||
|
long interval = logAggregationService.getRollingMonitorInterval();
|
||||||
|
assertEquals(-1L, interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRollingMonitorIntervalGreaterThanSet() {
|
||||||
|
this.conf.set(YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS, "1800");
|
||||||
|
this.conf.set(YarnConfiguration
|
||||||
|
.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, "2700");
|
||||||
|
LogAggregationService logAggregationService =
|
||||||
|
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||||
|
super.dirsHandler);
|
||||||
|
logAggregationService.init(this.conf);
|
||||||
|
|
||||||
|
long interval = logAggregationService.getRollingMonitorInterval();
|
||||||
|
assertEquals(2700L, interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRollingMonitorIntervalLessThanSet() {
|
||||||
|
this.conf.set(YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS, "1800");
|
||||||
|
this.conf.set(YarnConfiguration
|
||||||
|
.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, "600");
|
||||||
|
LogAggregationService logAggregationService =
|
||||||
|
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||||
|
super.dirsHandler);
|
||||||
|
logAggregationService.init(this.conf);
|
||||||
|
|
||||||
|
long interval = logAggregationService.getRollingMonitorInterval();
|
||||||
|
assertEquals(1800L, interval);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue