diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c730218d563..9a0b4646a99 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1832,6 +1832,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2766. Fixed NM to set secure permissions for files and directories in distributed-cache. (Hitesh Shah via vinodkv) + MAPREDUCE-2696. Fixed NodeManager to cleanup logs in a thread when logs' + aggregation is not enabled. (Siddharth Seth via vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java index aceb5629063..063cb6bdda9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java @@ -228,9 +228,7 @@ public class TestHSWebApp { params); PrintWriter spyPw = WebAppTests.getPrintWriter(injector); verify(spyPw).write( - "Logs not available for container_10_0001_01_000001. Aggregation " - + "may not be complete," - + " Check back later or try the nodemanager on " + "Aggregation is not enabled. Try the nodemanager at " + MockJobs.NM_HOST + ":" + MockJobs.NM_PORT); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 0efcb3d2ebf..b545bf72d08 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -89,6 +89,12 @@ + + + + + + diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 52cad1c9dc8..0779a5f7320 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -288,15 +288,31 @@ public class YarnConfiguration extends Configuration { public static final String NM_LOCALIZER_FETCH_THREAD_COUNT = NM_PREFIX + "localizer.fetch.thread-count"; public static final int DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT = 4; - + /** Where to store container logs.*/ public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs"; public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs"; - + /** Whether to enable log aggregation */ public static final String NM_LOG_AGGREGATION_ENABLED = NM_PREFIX - + "log-aggregation.enable"; + + "log-aggregation-enable"; + public static final boolean DEFAULT_NM_LOG_AGGREGATION_ENABLED = false; + /** + * Number of seconds to retain logs on the NodeManager. Only applicable if Log + * aggregation is disabled + */ + public static final String NM_LOG_RETAIN_SECONDS = NM_PREFIX + + "log.retain-seconds"; + + /** + * Number of threads used in log cleanup. Only applicable if Log aggregation + * is disabled + */ + public static final String NM_LOG_DELETION_THREADS_COUNT = + NM_PREFIX + "log.deletion-threads-count"; + public static final int DEFAULT_NM_LOG_DELETE_THREAD_COUNT = 4; + /** Where to aggregate logs to.*/ public static final String NM_REMOTE_APP_LOG_DIR = NM_PREFIX + "remote-app-log-dir"; @@ -312,11 +328,11 @@ public class YarnConfiguration extends Configuration { public static final String YARN_LOG_SERVER_URL = YARN_PREFIX + "log.server.url"; - + /** Amount of memory in GB that can be allocated for containers.*/ public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb"; public static final int DEFAULT_NM_PMEM_MB = 8 * 1024; - + public static final String NM_VMEM_PMEM_RATIO = NM_PREFIX + "vmem-pmem-ratio"; public static final float DEFAULT_NM_VMEM_PMEM_RATIO = 2.1f; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml index 8ef3487d1ad..cc6f496e124 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml @@ -281,9 +281,18 @@ Whether to enable log aggregation - yarn.nodemanager.log-aggregation.enable + yarn.nodemanager.log-aggregation-enable false + + + Time in seconds to retain user logs. Only applicable if + log aggregation is disabled + + yarn.nodemanager.log.retain-seconds + 10800 + + Where to aggregate logs to. yarn.nodemanager.remote-app-log-dir diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 93b39083139..5e3eb26cb5d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -87,7 +87,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; @@ -154,9 +156,6 @@ public class ContainerManagerImpl extends CompositeService implements new ContainersMonitorImpl(exec, dispatcher, this.context); addService(this.containersMonitor); - LogAggregationService logAggregationService = - createLogAggregationService(this.context, this.deletionService); - addService(logAggregationService); dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher()); @@ -166,13 +165,35 @@ public class ContainerManagerImpl extends CompositeService implements dispatcher.register(AuxServicesEventType.class, auxiliaryServices); dispatcher.register(ContainersMonitorEventType.class, containersMonitor); dispatcher.register(ContainersLauncherEventType.class, containersLauncher); - dispatcher.register(LogAggregatorEventType.class, logAggregationService); + addService(dispatcher); } - protected LogAggregationService createLogAggregationService(Context context, + @Override + public void init(Configuration conf) { + LogHandler logHandler = + createLogHandler(conf, this.context, this.deletionService); + addIfService(logHandler); + dispatcher.register(LogHandlerEventType.class, logHandler); + + super.init(conf); + } + + private void addIfService(Object object) { + if (object instanceof Service) { + addService((Service) object); + } + } + + protected LogHandler createLogHandler(Configuration conf, Context context, DeletionService deletionService) { - return new LogAggregationService(this.dispatcher, context, deletionService); + if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) { + return new LogAggregationService(this.dispatcher, context, + deletionService); + } else { + return new NonAggregatingLogHandler(this.dispatcher, deletionService); + } } public ContainersMonitor getContainersMonitor() { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java index 91fda5ab291..f988a3e435b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java @@ -32,6 +32,6 @@ public enum ApplicationEventType { // Source: Container APPLICATION_CONTAINER_FINISHED, - // Source: Log Aggregation - APPLICATION_LOG_AGGREGATION_FINISHED + // Source: Log Handler + APPLICATION_LOG_HANDLING_FINISHED } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index d1914b5f616..9cc5d6f786e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -42,8 +42,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Reso import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -181,7 +181,7 @@ public class ApplicationImpl implements Application { // Transitions from FINISHED state .addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED, - ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, new AppLogsAggregatedTransition()) // create the topology tables @@ -251,7 +251,7 @@ public class ApplicationImpl implements Application { // Inform the logAggregator app.dispatcher.getEventHandler().handle( - new LogAggregatorAppStartedEvent(app.appId, app.user, + new LogHandlerAppStartedEvent(app.appId, app.user, app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS, app.applicationACLs)); @@ -351,7 +351,7 @@ public class ApplicationImpl implements Application { // Inform the logService app.dispatcher.getEventHandler().handle( - new LogAggregatorAppFinishedEvent(app.appId)); + new LogHandlerAppFinishedEvent(app.appId)); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 4b9bf165024..81b41a7731d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; @@ -410,7 +410,7 @@ public class ContainerImpl implements Container { // Remove the container from the resource-monitor eventHandler.handle(new ContainerStopMonitoringEvent(containerID)); // Tell the logService too - eventHandler.handle(new LogAggregatorContainerFinishedEvent( + eventHandler.handle(new LogHandlerContainerFinishedEvent( containerID, exitCode)); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 391c28e01c9..5db1b5de50e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -179,7 +179,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, - ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED)); + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); this.appAggregationFinished.set(true); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 528cae63d13..d651cb98531 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -43,19 +43,19 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; import org.apache.hadoop.yarn.service.AbstractService; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LogAggregationService extends AbstractService implements - EventHandler { + LogHandler { private static final Log LOG = LogFactory .getLog(LogAggregationService.class); @@ -87,7 +87,6 @@ public class LogAggregationService extends AbstractService implements Path remoteRootLogDir; String remoteRootLogDirSuffix; private NodeId nodeId; - private boolean isLogAggregationEnabled = false; private final ConcurrentMap appLogAggregators; @@ -117,8 +116,6 @@ public class LogAggregationService extends AbstractService implements this.remoteRootLogDirSuffix = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); - this.isLogAggregationEnabled = - conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false); super.init(conf); } @@ -411,31 +408,30 @@ public class LogAggregationService extends AbstractService implements } @Override - public void handle(LogAggregatorEvent event) { - if (this.isLogAggregationEnabled) { - switch (event.getType()) { - case APPLICATION_STARTED: - LogAggregatorAppStartedEvent appStartEvent = - (LogAggregatorAppStartedEvent) event; - initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(), - appStartEvent.getCredentials(), - appStartEvent.getLogRetentionPolicy(), - appStartEvent.getApplicationAcls()); - break; - case CONTAINER_FINISHED: - LogAggregatorContainerFinishedEvent containerFinishEvent = - (LogAggregatorContainerFinishedEvent) event; - stopContainer(containerFinishEvent.getContainerId(), - containerFinishEvent.getExitCode()); - break; - case APPLICATION_FINISHED: - LogAggregatorAppFinishedEvent appFinishedEvent = - (LogAggregatorAppFinishedEvent) event; - stopApp(appFinishedEvent.getApplicationId()); - break; - default: - ; // Ignore - } + public void handle(LogHandlerEvent event) { + switch (event.getType()) { + case APPLICATION_STARTED: + LogHandlerAppStartedEvent appStartEvent = + (LogHandlerAppStartedEvent) event; + initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(), + appStartEvent.getCredentials(), + appStartEvent.getLogRetentionPolicy(), + appStartEvent.getApplicationAcls()); + break; + case CONTAINER_FINISHED: + LogHandlerContainerFinishedEvent containerFinishEvent = + (LogHandlerContainerFinishedEvent) event; + stopContainer(containerFinishEvent.getContainerId(), + containerFinishEvent.getExitCode()); + break; + case APPLICATION_FINISHED: + LogHandlerAppFinishedEvent appFinishedEvent = + (LogHandlerAppFinishedEvent) event; + stopApp(appFinishedEvent.getApplicationId()); + break; + default: + ; // Ignore } + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java new file mode 100644 index 00000000000..6eb3fb45abc --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java @@ -0,0 +1,26 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; + +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; + +public interface LogHandler extends EventHandler { + public void handle(LogHandlerEvent event); +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java new file mode 100644 index 00000000000..96833f3d485 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -0,0 +1,153 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; +import org.apache.hadoop.yarn.service.AbstractService; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Log Handler which schedules deletion of log files based on the configured log + * retention time. + */ +public class NonAggregatingLogHandler extends AbstractService implements + LogHandler { + + private static final Log LOG = LogFactory + .getLog(NonAggregatingLogHandler.class); + private final Dispatcher dispatcher; + private final DeletionService delService; + private final Map appOwners; + + private String[] rootLogDirs; + private long deleteDelaySeconds; + private ScheduledThreadPoolExecutor sched; + + public NonAggregatingLogHandler(Dispatcher dispatcher, + DeletionService delService) { + super(NonAggregatingLogHandler.class.getName()); + this.dispatcher = dispatcher; + this.delService = delService; + this.appOwners = new ConcurrentHashMap(); + } + + @Override + public void init(Configuration conf) { + // Default 3 hours. + this.deleteDelaySeconds = + conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 3 * 60 * 60); + this.rootLogDirs = + conf.getStrings(YarnConfiguration.NM_LOG_DIRS, + YarnConfiguration.DEFAULT_NM_LOG_DIRS); + sched = createScheduledThreadPoolExecutor(conf); + super.init(conf); + } + + @Override + public void stop() { + sched.shutdown(); + boolean isShutdown = false; + try { + isShutdown = sched.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + sched.shutdownNow(); + isShutdown = true; + } + if (!isShutdown) { + sched.shutdownNow(); + } + super.stop(); + } + + @Override + public void handle(LogHandlerEvent event) { + switch (event.getType()) { + case APPLICATION_STARTED: + LogHandlerAppStartedEvent appStartedEvent = + (LogHandlerAppStartedEvent) event; + this.appOwners.put(appStartedEvent.getApplicationId(), + appStartedEvent.getUser()); + break; + case CONTAINER_FINISHED: + // Ignore + break; + case APPLICATION_FINISHED: + LogHandlerAppFinishedEvent appFinishedEvent = + (LogHandlerAppFinishedEvent) event; + // Schedule - so that logs are available on the UI till they're deleted. + LOG.info("Scheduling Log Deletion for application: " + + appFinishedEvent.getApplicationId() + ", with delay of " + + this.deleteDelaySeconds + " seconds"); + sched.schedule( + new LogDeleterRunnable(appOwners.remove(appFinishedEvent + .getApplicationId()), appFinishedEvent.getApplicationId()), + this.deleteDelaySeconds, TimeUnit.SECONDS); + break; + default: + ; // Ignore + } + } + + ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor( + Configuration conf) { + ThreadFactory tf = + new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build(); + sched = + new ScheduledThreadPoolExecutor(conf.getInt( + YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT, + YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf); + return sched; + } + + class LogDeleterRunnable implements Runnable { + private String user; + private ApplicationId applicationId; + + public LogDeleterRunnable(String user, ApplicationId applicationId) { + this.user = user; + this.applicationId = applicationId; + } + + @Override + @SuppressWarnings("unchecked") + public void run() { + Path[] localAppLogDirs = + new Path[NonAggregatingLogHandler.this.rootLogDirs.length]; + int index = 0; + for (String rootLogDir : NonAggregatingLogHandler.this.rootLogDirs) { + localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString()); + index++; + } + // Inform the application before the actual delete itself, so that links + // to logs will no longer be there on NM web-UI. + NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle( + new ApplicationEvent(this.applicationId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); + NonAggregatingLogHandler.this.delService.delete(user, null, + localAppLogDirs); + } + + @Override + public String toString() { + return "LogDeleter for AppId " + this.applicationId.toString() + + ", owned by " + user; + } + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppFinishedEvent.java similarity index 83% rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppFinishedEvent.java index 5ff2e2194ea..4da8f315691 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppFinishedEvent.java @@ -16,16 +16,16 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event; +package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; import org.apache.hadoop.yarn.api.records.ApplicationId; -public class LogAggregatorAppFinishedEvent extends LogAggregatorEvent { +public class LogHandlerAppFinishedEvent extends LogHandlerEvent { private final ApplicationId applicationId; - public LogAggregatorAppFinishedEvent(ApplicationId appId) { - super(LogAggregatorEventType.APPLICATION_FINISHED); + public LogHandlerAppFinishedEvent(ApplicationId appId) { + super(LogHandlerEventType.APPLICATION_FINISHED); this.applicationId = appId; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java similarity index 90% rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java index 3d93c51c211..a598bed264f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event; +package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; import java.util.Map; @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy; -public class LogAggregatorAppStartedEvent extends LogAggregatorEvent { +public class LogHandlerAppStartedEvent extends LogHandlerEvent { private final ApplicationId applicationId; private final ContainerLogsRetentionPolicy retentionPolicy; @@ -33,10 +33,10 @@ public class LogAggregatorAppStartedEvent extends LogAggregatorEvent { private final Credentials credentials; private final Map appAcls; - public LogAggregatorAppStartedEvent(ApplicationId appId, String user, + public LogHandlerAppStartedEvent(ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy, Map appAcls) { - super(LogAggregatorEventType.APPLICATION_STARTED); + super(LogHandlerEventType.APPLICATION_STARTED); this.applicationId = appId; this.user = user; this.credentials = credentials; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java similarity index 84% rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java index 68ec27a73a0..038006edcf6 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java @@ -16,18 +16,18 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event; +package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; import org.apache.hadoop.yarn.api.records.ContainerId; -public class LogAggregatorContainerFinishedEvent extends LogAggregatorEvent { +public class LogHandlerContainerFinishedEvent extends LogHandlerEvent { private final ContainerId containerId; private final int exitCode; - public LogAggregatorContainerFinishedEvent(ContainerId containerId, + public LogHandlerContainerFinishedEvent(ContainerId containerId, int exitCode) { - super(LogAggregatorEventType.CONTAINER_FINISHED); + super(LogHandlerEventType.CONTAINER_FINISHED); this.containerId = containerId; this.exitCode = exitCode; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEvent.java similarity index 84% rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEvent.java index 052d0808030..3d255467e65 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEvent.java @@ -16,14 +16,14 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event; +package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; import org.apache.hadoop.yarn.event.AbstractEvent; -public class LogAggregatorEvent extends AbstractEvent{ +public class LogHandlerEvent extends AbstractEvent{ - public LogAggregatorEvent(LogAggregatorEventType type) { + public LogHandlerEvent(LogHandlerEventType type) { super(type); } -} +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java similarity index 93% rename from hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java rename to hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java index 64adf746d09..684d6b2605f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java @@ -16,8 +16,8 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event; +package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event; -public enum LogAggregatorEventType { +public enum LogHandlerEventType { APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java index f36dbab5a6e..9a75aa24e44 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/AggregatedLogsBlock.java @@ -53,6 +53,14 @@ public class AggregatedLogsBlock extends HtmlBlock { logEntity = containerId.toString(); } + if (!conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) { + html.h1() + ._("Aggregation is not enabled. Try the nodemanager at " + nodeId) + ._(); + return; + } + Path remoteRootLogDir = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); @@ -69,7 +77,7 @@ public class AggregatedLogsBlock extends HtmlBlock { ._("Logs not available for " + logEntity + ". Aggregation may not be complete, " - + "Check back later or try the nodemanager on " + + "Check back later or try the nodemanager at " + nodeId)._(); return; } catch (IOException e) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java index f4e40ad18fd..cf69b0f4ee2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java @@ -86,7 +86,9 @@ public class NMController extends Controller implements NMWebParams { ApplicationId appId = containerId.getApplicationAttemptId().getApplicationId(); Application app = nmContext.getApplications().get(appId); - if (app == null) { + if (app == null + && nmConf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) { String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL); String redirectUrl = null; if (logServerUrl == null || logServerUrl.isEmpty()) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java index 5f7560fd3c0..74d99796914 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java @@ -24,11 +24,9 @@ import java.util.Collection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; @@ -49,8 +47,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; public class DummyContainerManager extends ContainerManagerImpl { @@ -68,6 +66,7 @@ public class DummyContainerManager extends ContainerManagerImpl { } @Override + @SuppressWarnings("unchecked") protected ResourceLocalizationService createResourceLocalizationService(ContainerExecutor exec, DeletionService deletionContext) { return new ResourceLocalizationService(super.dispatcher, exec, deletionContext) { @@ -123,6 +122,7 @@ public class DummyContainerManager extends ContainerManagerImpl { } @Override + @SuppressWarnings("unchecked") protected ContainersLauncher createContainersLauncher(Context context, ContainerExecutor exec) { return new ContainersLauncher(context, super.dispatcher, exec) { @@ -147,23 +147,23 @@ public class DummyContainerManager extends ContainerManagerImpl { } @Override - protected LogAggregationService createLogAggregationService(Context context, - DeletionService deletionService) { - return new LogAggregationService(new AsyncDispatcher(), context, - deletionService) { + protected LogHandler createLogHandler(Configuration conf, + Context context, DeletionService deletionService) { + return new LogHandler() { + @Override - public void handle(LogAggregatorEvent event) { + public void handle(LogHandlerEvent event) { switch (event.getType()) { - case APPLICATION_STARTED: - break; - case CONTAINER_FINISHED: - break; - case APPLICATION_FINISHED: - break; - default: - // Ignore - } + case APPLICATION_STARTED: + break; + case CONTAINER_FINISHED: + break; + case APPLICATION_FINISHED: + break; + default: + // Ignore + } } }; } -} +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 10056ff5512..111d5c4e8f8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -33,8 +33,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -349,7 +349,7 @@ public class TestApplication { final EventHandler monitorBus; final EventHandler auxBus; final EventHandler containerBus; - final EventHandler logAggregationBus; + final EventHandler logAggregationBus; final String user; final List containers; final Context context; @@ -373,7 +373,7 @@ public class TestApplication { dispatcher.register(ContainersMonitorEventType.class, monitorBus); dispatcher.register(AuxServicesEventType.class, auxBus); dispatcher.register(ContainerEventType.class, containerBus); - dispatcher.register(LogAggregatorEventType.class, logAggregationBus); + dispatcher.register(LogHandlerEventType.class, logAggregationBus); context = mock(Context.class); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index b4db6d11086..bc420e4f921 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -70,9 +70,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; @@ -114,7 +114,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true); DrainDispatcher dispatcher = createDispatcher(); EventHandler appEventHandler = mock(EventHandler.class); @@ -133,7 +132,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { new File(localLogDir, ConverterUtils.toString(application1)); app1LogDir.mkdir(); logAggregationService - .handle(new LogAggregatorAppStartedEvent( + .handle(new LogHandlerAppStartedEvent( application1, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); @@ -143,9 +142,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest { // Simulate log-file creation writeContainerLogs(app1LogDir, container11); logAggregationService.handle( - new LogAggregatorContainerFinishedEvent(container11, 0)); + new LogHandlerContainerFinishedEvent(container11, 0)); - logAggregationService.handle(new LogAggregatorAppFinishedEvent( + logAggregationService.handle(new LogHandlerAppFinishedEvent( application1)); logAggregationService.stop(); @@ -169,7 +168,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(ApplicationEvent.class); verify(appEventHandler).handle(eventCaptor.capture()); - assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED, + assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, eventCaptor.getValue().getType()); assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue() .getApplicationID()); @@ -182,7 +181,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true); DrainDispatcher dispatcher = createDispatcher(); EventHandler appEventHandler = mock(EventHandler.class); @@ -200,11 +198,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest { new File(localLogDir, ConverterUtils.toString(application1)); app1LogDir.mkdir(); logAggregationService - .handle(new LogAggregatorAppStartedEvent( + .handle(new LogHandlerAppStartedEvent( application1, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); - logAggregationService.handle(new LogAggregatorAppFinishedEvent( + logAggregationService.handle(new LogHandlerAppFinishedEvent( application1)); logAggregationService.stop(); @@ -217,7 +215,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(ApplicationEvent.class); verify(appEventHandler).handle(eventCaptor.capture()); - assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED, + assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, eventCaptor.getValue().getType()); verify(appEventHandler).handle(eventCaptor.capture()); assertEquals(application1, eventCaptor.getValue() @@ -231,7 +229,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true); DrainDispatcher dispatcher = createDispatcher(); EventHandler appEventHandler = mock(EventHandler.class); @@ -249,7 +246,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { new File(localLogDir, ConverterUtils.toString(application1)); app1LogDir.mkdir(); logAggregationService - .handle(new LogAggregatorAppStartedEvent( + .handle(new LogHandlerAppStartedEvent( application1, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); @@ -260,7 +257,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { // Simulate log-file creation writeContainerLogs(app1LogDir, container11); logAggregationService.handle( - new LogAggregatorContainerFinishedEvent(container11, 0)); + new LogHandlerContainerFinishedEvent(container11, 0)); ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2); ApplicationAttemptId appAttemptId2 = @@ -269,7 +266,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { File app2LogDir = new File(localLogDir, ConverterUtils.toString(application2)); app2LogDir.mkdir(); - logAggregationService.handle(new LogAggregatorAppStartedEvent( + logAggregationService.handle(new LogHandlerAppStartedEvent( application2, this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls)); @@ -278,13 +275,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest { writeContainerLogs(app2LogDir, container21); logAggregationService.handle( - new LogAggregatorContainerFinishedEvent(container21, 0)); + new LogHandlerContainerFinishedEvent(container21, 0)); ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2); writeContainerLogs(app1LogDir, container12); logAggregationService.handle( - new LogAggregatorContainerFinishedEvent(container12, 0)); + new LogHandlerContainerFinishedEvent(container12, 0)); ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3); ApplicationAttemptId appAttemptId3 = @@ -293,7 +290,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { File app3LogDir = new File(localLogDir, ConverterUtils.toString(application3)); app3LogDir.mkdir(); - logAggregationService.handle(new LogAggregatorAppStartedEvent(application3, + logAggregationService.handle(new LogHandlerAppStartedEvent(application3, this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls)); @@ -301,28 +298,28 @@ public class TestLogAggregationService extends BaseContainerManagerTest { ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1); writeContainerLogs(app3LogDir, container31); logAggregationService.handle( - new LogAggregatorContainerFinishedEvent(container31, 0)); + new LogHandlerContainerFinishedEvent(container31, 0)); ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2); writeContainerLogs(app3LogDir, container32); logAggregationService.handle( - new LogAggregatorContainerFinishedEvent(container32, 1)); // Failed + new LogHandlerContainerFinishedEvent(container32, 1)); // Failed ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2); writeContainerLogs(app2LogDir, container22); logAggregationService.handle( - new LogAggregatorContainerFinishedEvent(container22, 0)); + new LogHandlerContainerFinishedEvent(container22, 0)); ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3); writeContainerLogs(app3LogDir, container33); logAggregationService.handle( - new LogAggregatorContainerFinishedEvent(container33, 0)); + new LogHandlerContainerFinishedEvent(container33, 0)); - logAggregationService.handle(new LogAggregatorAppFinishedEvent( + logAggregationService.handle(new LogHandlerAppFinishedEvent( application2)); - logAggregationService.handle(new LogAggregatorAppFinishedEvent( + logAggregationService.handle(new LogHandlerAppFinishedEvent( application3)); - logAggregationService.handle(new LogAggregatorAppFinishedEvent( + logAggregationService.handle(new LogHandlerAppFinishedEvent( application1)); logAggregationService.stop(); @@ -342,7 +339,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { List capturedEvents = eventCaptor.getAllValues(); Set appIds = new HashSet(); for (ApplicationEvent cap : capturedEvents) { - assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED, + assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, eventCaptor.getValue().getType()); appIds.add(cap.getApplicationID()); } @@ -447,7 +444,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest { public void testLogAggregationForRealContainerLaunch() throws IOException, InterruptedException { - this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true); this.containerManager.start(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java new file mode 100644 index 00000000000..6eacb8aa727 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -0,0 +1,187 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.io.File; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.Test; +import org.mockito.exceptions.verification.WantedButNotInvoked; + +public class TestNonAggregatingLogHandler { + + @Test + @SuppressWarnings("unchecked") + public void testLogDeletion() { + DeletionService delService = mock(DeletionService.class); + Configuration conf = new YarnConfiguration(); + String user = "testuser"; + + File[] localLogDirs = new File[2]; + localLogDirs[0] = + new File("target", this.getClass().getName() + "-localLogDir0") + .getAbsoluteFile(); + localLogDirs[1] = + new File("target", this.getClass().getName() + "-localLogDir1") + .getAbsoluteFile(); + String localLogDirsString = + localLogDirs[0].getAbsolutePath() + "," + + localLogDirs[1].getAbsolutePath(); + + conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); + conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l); + + DrainDispatcher dispatcher = createDispatcher(conf); + EventHandler appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + + ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1); + ApplicationAttemptId appAttemptId1 = + BuilderUtils.newApplicationAttemptId(appId1, 1); + ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1); + + NonAggregatingLogHandler logHandler = + new NonAggregatingLogHandler(dispatcher, delService); + logHandler.init(conf); + logHandler.start(); + + logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null, + ContainerLogsRetentionPolicy.ALL_CONTAINERS, null)); + + logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); + + logHandler.handle(new LogHandlerAppFinishedEvent(appId1)); + + Path[] localAppLogDirs = new Path[2]; + localAppLogDirs[0] = + new Path(localLogDirs[0].getAbsolutePath(), appId1.toString()); + localAppLogDirs[1] = + new Path(localLogDirs[1].getAbsolutePath(), appId1.toString()); + + // 5 seconds for the delete which is a separate thread. + long verifyStartTime = System.currentTimeMillis(); + WantedButNotInvoked notInvokedException = null; + boolean matched = false; + while (!matched && System.currentTimeMillis() < verifyStartTime + 5000l) { + try { + verify(delService).delete(eq(user), (Path) eq(null), + eq(localAppLogDirs[0]), eq(localAppLogDirs[1])); + matched = true; + } catch (WantedButNotInvoked e) { + notInvokedException = e; + try { + Thread.sleep(50l); + } catch (InterruptedException i) { + } + } + } + if (!matched) { + throw notInvokedException; + } + } + + @Test + @SuppressWarnings("unchecked") + public void testDelayedDelete() { + DeletionService delService = mock(DeletionService.class); + Configuration conf = new YarnConfiguration(); + String user = "testuser"; + + File[] localLogDirs = new File[2]; + localLogDirs[0] = + new File("target", this.getClass().getName() + "-localLogDir0") + .getAbsoluteFile(); + localLogDirs[1] = + new File("target", this.getClass().getName() + "-localLogDir1") + .getAbsoluteFile(); + String localLogDirsString = + localLogDirs[0].getAbsolutePath() + "," + + localLogDirs[1].getAbsolutePath(); + + conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); + conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false); + + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 10800l); + + DrainDispatcher dispatcher = createDispatcher(conf); + EventHandler appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + + ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1); + ApplicationAttemptId appAttemptId1 = + BuilderUtils.newApplicationAttemptId(appId1, 1); + ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1); + + NonAggregatingLogHandler logHandler = + new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService); + logHandler.init(conf); + logHandler.start(); + + logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null, + ContainerLogsRetentionPolicy.ALL_CONTAINERS, null)); + + logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); + + logHandler.handle(new LogHandlerAppFinishedEvent(appId1)); + + Path[] localAppLogDirs = new Path[2]; + localAppLogDirs[0] = + new Path(localLogDirs[0].getAbsolutePath(), appId1.toString()); + localAppLogDirs[1] = + new Path(localLogDirs[1].getAbsolutePath(), appId1.toString()); + + ScheduledThreadPoolExecutor mockSched = + ((NonAggregatingLogHandlerWithMockExecutor) logHandler).mockSched; + + verify(mockSched).schedule(any(Runnable.class), eq(10800l), + eq(TimeUnit.SECONDS)); + } + + private class NonAggregatingLogHandlerWithMockExecutor extends + NonAggregatingLogHandler { + + private ScheduledThreadPoolExecutor mockSched; + + public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher, + DeletionService delService) { + super(dispatcher, delService); + } + + @Override + ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor( + Configuration conf) { + mockSched = mock(ScheduledThreadPoolExecutor.class); + return mockSched; + } + + } + + private DrainDispatcher createDispatcher(Configuration conf) { + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + return dispatcher; + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm index f9371594a1c..5ce75d21d07 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm @@ -224,18 +224,13 @@ Hadoop MapReduce Next Generation - Cluster Setup | | <<>> Scheduler class. | | | | | <<>> (recommended) or <<>> | *-------------------------+-------------------------+------------------------+ -| <<>> | | | -| | | | -| | | HDFS directory where the application logs are moved on application | -| | | completion. Need to set appropriate permissions. | -*-------------------------+-------------------------+------------------------+ | <<>> / | | | | <<>> | | | | | List of permitted/excluded NodeManagers. | | | | | If necessary, use these files to control the list of allowable | | | | NodeManagers. | *-------------------------+-------------------------+------------------------+ -| + * Configurations for NodeManager: *-------------------------+-------------------------+------------------------+ @@ -263,6 +258,27 @@ Hadoop MapReduce Next Generation - Cluster Setup | | are written. | | | | | Multiple paths help spread disk i/o. | *-------------------------+-------------------------+------------------------+ +| <<>> | | | +| | | | +| | | Configuration to enable or disable log aggregation | +*-------------------------+-------------------------+------------------------+ +| <<>> | | | +| | <10800> | | +| | | Default time (in seconds) to retain log files on the NodeManager | +| | | Only applicable if log-aggregation is disabled. | +*-------------------------+-------------------------+------------------------+ +| <<>> | | | +| | | | +| | | HDFS directory where the application logs are moved on application | +| | | completion. Need to set appropriate permissions. | +| | | Only applicable if log-aggregation is enabled. | +*-------------------------+-------------------------+------------------------+ +| <<>> | | | +| | | | +| | | Suffix appended to the remote log dir. Logs will be aggregated to | +| | | $\{yarn.nodemanager.remote-app-log-dir\}/$\{user\}/$\{thisParam\} | +| | | Only applicable if log-aggregation is enabled. | +*-------------------------+-------------------------+------------------------+ | <<>> | | | | | mapreduce.shuffle | | | | | Shuffle service that needs to be set for Map Reduce applications. |