From 4638fa00fc715469d8a07657ccfffff61261ae13 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Wed, 3 Jul 2019 13:44:27 +0200 Subject: [PATCH] YARN-9629. Support configurable MIN_LOG_ROLLING_INTERVAL. Contributed by Adam Antal. (cherry picked from commit a2a8be18cb5e912c8de0ea6beec1de4a99de656b) --- .../hadoop/yarn/conf/YarnConfiguration.java | 8 ++ .../src/main/resources/yarn-default.xml | 17 +++- .../logaggregation/LogAggregationService.java | 79 ++++++++++++------- .../TestLogAggregationService.java | 39 +++++++++ 4 files changed, 112 insertions(+), 31 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9774bde614d..d8dc7c0c8c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1383,6 +1383,14 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS = -1; + /** + * The allowed hard minimum limit for {@link + * YarnConfiguration#NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS}. + */ + public static final String MIN_LOG_ROLLING_INTERVAL_SECONDS = NM_PREFIX + + "log-aggregation.roll-monitoring-interval-seconds.min"; + public static final long MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT = 3600; + /** * Define how many aggregated log files per application per NM we can have * in remote file system. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 72436f85b13..76ae2be47e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3134,14 +3134,25 @@ Defines how often NMs wake up to upload log files. The default value is -1. By default, the logs will be uploaded when - the application is finished. By setting this configure, logs can be uploaded - periodically when the application is running. The minimum rolling-interval-seconds - can be set is 3600. + the application is finished. By setting this configuration logs can + be uploaded periodically while the application is running. + The minimum positive accepted value can be configured by the setting + "yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds.min". yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds -1 + + Defines the positive minimum hard limit for + "yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds". + If this configuration has been set less than its default value (3600) + the NodeManager may raise a warning. + + yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds.min + 3600 + + Define how many aggregated log files per application per NM we can have in remote file system. By default, the total number of diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index d8db96780ff..2280e750f88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -71,7 +71,6 @@ public class LogAggregationService extends AbstractService implements private static final Logger LOG = LoggerFactory.getLogger(LogAggregationService.class); - private static final long MIN_LOG_ROLLING_INTERVAL = 3600; // This configuration is for debug and test purpose. By setting // this configuration as true. We can break the lower bound of // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS. @@ -106,6 +105,49 @@ public class LogAggregationService extends AbstractService implements this.invalidTokenApps = ConcurrentHashMap.newKeySet(); } + private static long calculateRollingMonitorInterval(Configuration conf) { + long interval = conf.getLong( + YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, + YarnConfiguration. + DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS); + + if (interval <= 0) { + LOG.info("rollingMonitorInterval is set as " + interval + + ". The log rolling monitoring interval is disabled. " + + "The logs will be aggregated after this application is finished."); + } else { + boolean logAggregationDebugMode = + conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, false); + long minRollingMonitorInterval = conf.getLong( + YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS, + YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT); + + boolean warnHardMinLimitLowerThanDefault = minRollingMonitorInterval < + YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT && + !logAggregationDebugMode; + if (warnHardMinLimitLowerThanDefault) { + LOG.warn("{} has been set to {}, which is less than the default " + + "minimum value {}. This may impact NodeManager's performance.", + YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS, + minRollingMonitorInterval, + YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT); + } + boolean lowerThanHardLimit = interval < minRollingMonitorInterval; + if (lowerThanHardLimit) { + if (logAggregationDebugMode) { + LOG.info("Log aggregation debug mode enabled. " + + "Skipped checking minimum limit."); + } else { + LOG.warn("rollingMonitorInterval should be more than " + + "or equal to {} seconds. Using {} seconds instead.", + minRollingMonitorInterval, minRollingMonitorInterval); + interval = minRollingMonitorInterval; + } + } + } + return interval; + } + protected void serviceInit(Configuration conf) throws Exception { int threadPoolSize = getAggregatorThreadPoolSize(conf); this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize, @@ -113,33 +155,10 @@ public class LogAggregationService extends AbstractService implements .setNameFormat("LogAggregationService #%d") .build()); - rollingMonitorInterval = conf.getLong( - YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, - YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS); - - boolean logAggregationDebugMode = - conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, false); - - if (rollingMonitorInterval > 0 - && rollingMonitorInterval < MIN_LOG_ROLLING_INTERVAL) { - if (logAggregationDebugMode) { - LOG.info("Log aggregation debug mode enabled. rollingMonitorInterval = " - + rollingMonitorInterval); - } else { - LOG.warn("rollingMonitorInterval should be more than or equal to {} " + - "seconds. Using {} seconds instead.", - MIN_LOG_ROLLING_INTERVAL, MIN_LOG_ROLLING_INTERVAL); - this.rollingMonitorInterval = MIN_LOG_ROLLING_INTERVAL; - } - } else if (rollingMonitorInterval <= 0) { - LOG.info("rollingMonitorInterval is set as " + rollingMonitorInterval - + ". The log rolling monitoring interval is disabled. " - + "The logs will be aggregated after this application is finished."); - } else { - LOG.info("rollingMonitorInterval is set as " + rollingMonitorInterval - + ". The logs will be aggregated every " + rollingMonitorInterval - + " seconds"); - } + rollingMonitorInterval = calculateRollingMonitorInterval(conf); + LOG.info("rollingMonitorInterval is set as {}. The logs will be " + + "aggregated every {} seconds", rollingMonitorInterval, + rollingMonitorInterval); super.serviceInit(conf); } @@ -413,6 +432,10 @@ public class LogAggregationService extends AbstractService implements return this.nodeId; } + @VisibleForTesting + public long getRollingMonitorInterval() { + return rollingMonitorInterval; + } private int getAggregatorThreadPoolSize(Configuration conf) { int threadPoolSize; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 8c089091dd3..285ec715899 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -2611,4 +2611,43 @@ public class TestLogAggregationService extends BaseContainerManagerTest { return this.logFileTypesInLastCycle; } } + + @Test + public void testRollingMonitorIntervalDefault() { + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, this.delSrvc, + super.dirsHandler); + logAggregationService.init(this.conf); + + long interval = logAggregationService.getRollingMonitorInterval(); + assertEquals(-1L, interval); + } + + @Test + public void testRollingMonitorIntervalGreaterThanSet() { + this.conf.set(YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS, "1800"); + this.conf.set(YarnConfiguration + .NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, "2700"); + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, this.delSrvc, + super.dirsHandler); + logAggregationService.init(this.conf); + + long interval = logAggregationService.getRollingMonitorInterval(); + assertEquals(2700L, interval); + } + + @Test + public void testRollingMonitorIntervalLessThanSet() { + this.conf.set(YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS, "1800"); + this.conf.set(YarnConfiguration + .NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS, "600"); + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, this.delSrvc, + super.dirsHandler); + logAggregationService.init(this.conf); + + long interval = logAggregationService.getRollingMonitorInterval(); + assertEquals(1800L, interval); + } }