YARN-5112. Excessive log warnings for directory permission issue on NM recovery. Contributed by Jian He.

This commit is contained in:
Junping Du 2016-05-22 16:02:57 -07:00
parent a59f30272d
commit 867cd2f5a2
2 changed files with 45 additions and 47 deletions

View File

@ -95,11 +95,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
// NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS. // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
= YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled"; = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
private static final boolean
DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED = false;
private static final long
NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS = 3600;
private final LocalDirsHandlerService dirsHandler; private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
@ -142,7 +137,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
Map<ApplicationAccessType, String> appAcls, Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context, LogAggregationContext logAggregationContext, Context context,
FileContext lfs) { FileContext lfs, long rollingMonitorInterval) {
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.conf = conf; this.conf = conf;
this.delService = deletionService; this.delService = deletionService;
@ -167,43 +162,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
} else { } else {
this.retentionSize = configuredRentionSize; this.retentionSize = configuredRentionSize;
} }
long configuredRollingMonitorInterval = conf.getLong( this.rollingMonitorInterval = rollingMonitorInterval;
YarnConfiguration
.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
YarnConfiguration
.DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS);
boolean debug_mode =
conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED,
DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED);
if (configuredRollingMonitorInterval > 0
&& configuredRollingMonitorInterval <
NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS) {
if (debug_mode) {
this.rollingMonitorInterval = configuredRollingMonitorInterval;
} else {
LOG.warn(
"rollingMonitorIntervall should be more than or equal to "
+ NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS
+ " seconds. Using "
+ NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS
+ " seconds instead.");
this.rollingMonitorInterval =
NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS;
}
} else {
if (configuredRollingMonitorInterval <= 0) {
LOG.info("rollingMonitorInterval is set as "
+ configuredRollingMonitorInterval + ". "
+ "The log rolling monitoring interval is disabled. "
+ "The logs will be aggregated after this application is finished.");
} else {
LOG.info("rollingMonitorInterval is set as "
+ configuredRollingMonitorInterval + ". "
+ "The logs will be aggregated every "
+ configuredRollingMonitorInterval + " seconds");
}
this.rollingMonitorInterval = configuredRollingMonitorInterval;
}
this.logAggregationInRolling = this.logAggregationInRolling =
this.rollingMonitorInterval <= 0 || this.logAggregationContext == null this.rollingMonitorInterval <= 0 || this.logAggregationContext == null
|| this.logAggregationContext.getRolledLogsIncludePattern() == null || this.logAggregationContext.getRolledLogsIncludePattern() == null

View File

@ -69,8 +69,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LogAggregationService extends AbstractService implements public class LogAggregationService extends AbstractService implements
LogHandler { LogHandler {
private static final Log LOG = LogFactory private static final Log LOG = LogFactory.getLog(LogAggregationService.class);
.getLog(LogAggregationService.class); private static final long MIN_LOG_ROLLING_INTERVAL = 3600;
// This configuration is for debug and test purpose. By setting
// this configuration as true. We can break the lower bound of
// NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
= YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
private long rollingMonitorInterval;
/* /*
* Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup - * Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
@ -101,6 +107,7 @@ public class LogAggregationService extends AbstractService implements
private NodeId nodeId; private NodeId nodeId;
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators; private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
private boolean logPermError = true;
@VisibleForTesting @VisibleForTesting
ExecutorService threadPool; ExecutorService threadPool;
@ -128,6 +135,35 @@ public class LogAggregationService extends AbstractService implements
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setNameFormat("LogAggregationService #%d") .setNameFormat("LogAggregationService #%d")
.build()); .build());
rollingMonitorInterval = conf.getLong(
YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS);
boolean logAggregationDebugMode =
conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, false);
if (rollingMonitorInterval > 0
&& rollingMonitorInterval < MIN_LOG_ROLLING_INTERVAL) {
if (logAggregationDebugMode) {
LOG.info("Log aggregation debug mode enabled. rollingMonitorInterval = "
+ rollingMonitorInterval);
} else {
LOG.warn("rollingMonitorIntervall should be more than or equal to "
+ MIN_LOG_ROLLING_INTERVAL + " seconds. Using "
+ MIN_LOG_ROLLING_INTERVAL + " seconds instead.");
this.rollingMonitorInterval = MIN_LOG_ROLLING_INTERVAL;
}
} else if (rollingMonitorInterval <= 0) {
LOG.info("rollingMonitorInterval is set as " + rollingMonitorInterval
+ ". The log rolling monitoring interval is disabled. "
+ "The logs will be aggregated after this application is finished.");
} else {
LOG.info("rollingMonitorInterval is set as " + rollingMonitorInterval
+ ". The logs will be aggregated every " + rollingMonitorInterval
+ " seconds");
}
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -197,11 +233,14 @@ public class LogAggregationService extends AbstractService implements
try { try {
FsPermission perms = FsPermission perms =
remoteFS.getFileStatus(this.remoteRootLogDir).getPermission(); remoteFS.getFileStatus(this.remoteRootLogDir).getPermission();
if (!perms.equals(TLDIR_PERMISSIONS)) { if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+ "] already exist, but with incorrect permissions. " + "] already exist, but with incorrect permissions. "
+ "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+ "]." + " The cluster may have problems with multiple users."); + "]." + " The cluster may have problems with multiple users.");
logPermError = false;
} else {
logPermError = true;
} }
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
remoteExists = false; remoteExists = false;
@ -360,7 +399,7 @@ public class LogAggregationService extends AbstractService implements
getConfig(), appId, userUgi, this.nodeId, dirsHandler, getConfig(), appId, userUgi, this.nodeId, dirsHandler,
getRemoteNodeLogFileForApp(appId, user), getRemoteNodeLogFileForApp(appId, user),
appAcls, logAggregationContext, this.context, appAcls, logAggregationContext, this.context,
getLocalFileContext(getConfig())); getLocalFileContext(getConfig()), this.rollingMonitorInterval);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnRuntimeException("Duplicate initApp for " + appId); throw new YarnRuntimeException("Duplicate initApp for " + appId);
} }