MAPREDUCE-2696. Fixed NodeManager to cleanup logs in a thread when logs' aggregation is not enabled. Contributed by Siddharth Seth.

svn merge -c r1195383 --ignore-ancestry ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1195387 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-31 10:00:55 +00:00
parent 4b4b1c89cc
commit cee984b9b3
25 changed files with 575 additions and 138 deletions

View File

@ -1832,6 +1832,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2766. Fixed NM to set secure permissions for files and directories
in distributed-cache. (Hitesh Shah via vinodkv)
MAPREDUCE-2696. Fixed NodeManager to cleanup logs in a thread when logs'
aggregation is not enabled. (Siddharth Seth via vinodkv)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -228,9 +228,7 @@ public class TestHSWebApp {
params);
PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
verify(spyPw).write(
"Logs not available for container_10_0001_01_000001. Aggregation "
+ "may not be complete,"
+ " Check back later or try the nodemanager on "
"Aggregation is not enabled. Try the nodemanager at "
+ MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
}
}

View File

@ -89,6 +89,12 @@
<Method name="handle" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<Match>
<Class name="~org\.apache\.hadoop\.yarn\.server\.nodemanager\.containermanager\.loghandler\.NonAggregatingLogHandler.*" />
<Method name="handle" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<!-- Ignore intentional switch fallthroughs -->
<Match>

View File

@ -288,15 +288,31 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LOCALIZER_FETCH_THREAD_COUNT =
NM_PREFIX + "localizer.fetch.thread-count";
public static final int DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT = 4;
/** Where to store container logs.*/
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
/** Whether to enable log aggregation */
public static final String NM_LOG_AGGREGATION_ENABLED = NM_PREFIX
+ "log-aggregation.enable";
+ "log-aggregation-enable";
public static final boolean DEFAULT_NM_LOG_AGGREGATION_ENABLED = false;
/**
* Number of seconds to retain logs on the NodeManager. Only applicable if Log
* aggregation is disabled
*/
public static final String NM_LOG_RETAIN_SECONDS = NM_PREFIX
+ "log.retain-seconds";
/**
* Number of threads used in log cleanup. Only applicable if Log aggregation
* is disabled
*/
public static final String NM_LOG_DELETION_THREADS_COUNT =
NM_PREFIX + "log.deletion-threads-count";
public static final int DEFAULT_NM_LOG_DELETE_THREAD_COUNT = 4;
/** Where to aggregate logs to.*/
public static final String NM_REMOTE_APP_LOG_DIR =
NM_PREFIX + "remote-app-log-dir";
@ -312,11 +328,11 @@ public class YarnConfiguration extends Configuration {
public static final String YARN_LOG_SERVER_URL =
YARN_PREFIX + "log.server.url";
/** Amount of memory in GB that can be allocated for containers.*/
public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb";
public static final int DEFAULT_NM_PMEM_MB = 8 * 1024;
public static final String NM_VMEM_PMEM_RATIO =
NM_PREFIX + "vmem-pmem-ratio";
public static final float DEFAULT_NM_VMEM_PMEM_RATIO = 2.1f;

View File

@ -281,9 +281,18 @@
<property>
<description>Whether to enable log aggregation</description>
<name>yarn.nodemanager.log-aggregation.enable</name>
<name>yarn.nodemanager.log-aggregation-enable</name>
<value>false</value>
</property>
<property>
<description>Time in seconds to retain user logs. Only applicable if
log aggregation is disabled
</description>
<name>yarn.nodemanager.log.retain-seconds</name>
<value>10800</value>
</property>
<property>
<description>Where to aggregate logs to.</description>
<name>yarn.nodemanager.remote-app-log-dir</name>

View File

@ -87,7 +87,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@ -154,9 +156,6 @@ public class ContainerManagerImpl extends CompositeService implements
new ContainersMonitorImpl(exec, dispatcher, this.context);
addService(this.containersMonitor);
LogAggregationService logAggregationService =
createLogAggregationService(this.context, this.deletionService);
addService(logAggregationService);
dispatcher.register(ContainerEventType.class,
new ContainerEventDispatcher());
@ -166,13 +165,35 @@ public class ContainerManagerImpl extends CompositeService implements
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
dispatcher.register(LogAggregatorEventType.class, logAggregationService);
addService(dispatcher);
}
protected LogAggregationService createLogAggregationService(Context context,
@Override
public void init(Configuration conf) {
LogHandler logHandler =
createLogHandler(conf, this.context, this.deletionService);
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
super.init(conf);
}
private void addIfService(Object object) {
if (object instanceof Service) {
addService((Service) object);
}
}
protected LogHandler createLogHandler(Configuration conf, Context context,
DeletionService deletionService) {
return new LogAggregationService(this.dispatcher, context, deletionService);
if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
return new LogAggregationService(this.dispatcher, context,
deletionService);
} else {
return new NonAggregatingLogHandler(this.dispatcher, deletionService);
}
}
public ContainersMonitor getContainersMonitor() {

View File

@ -32,6 +32,6 @@ public enum ApplicationEventType {
// Source: Container
APPLICATION_CONTAINER_FINISHED,
// Source: Log Aggregation
APPLICATION_LOG_AGGREGATION_FINISHED
// Source: Log Handler
APPLICATION_LOG_HANDLING_FINISHED
}

View File

@ -42,8 +42,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Reso
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@ -181,7 +181,7 @@ public class ApplicationImpl implements Application {
// Transitions from FINISHED state
.addTransition(ApplicationState.FINISHED,
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
new AppLogsAggregatedTransition())
// create the topology tables
@ -251,7 +251,7 @@ public class ApplicationImpl implements Application {
// Inform the logAggregator
app.dispatcher.getEventHandler().handle(
new LogAggregatorAppStartedEvent(app.appId, app.user,
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
app.applicationACLs));
@ -351,7 +351,7 @@ public class ApplicationImpl implements Application {
// Inform the logService
app.dispatcher.getEventHandler().handle(
new LogAggregatorAppFinishedEvent(app.appId));
new LogHandlerAppFinishedEvent(app.appId));
}
}

View File

@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@ -410,7 +410,7 @@ public class ContainerImpl implements Container {
// Remove the container from the resource-monitor
eventHandler.handle(new ContainerStopMonitoringEvent(containerID));
// Tell the logService too
eventHandler.handle(new LogAggregatorContainerFinishedEvent(
eventHandler.handle(new LogHandlerContainerFinishedEvent(
containerID, exitCode));
}

View File

@ -179,7 +179,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED));
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
this.appAggregationFinished.set(true);
}

View File

@ -43,19 +43,19 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LogAggregationService extends AbstractService implements
EventHandler<LogAggregatorEvent> {
LogHandler {
private static final Log LOG = LogFactory
.getLog(LogAggregationService.class);
@ -87,7 +87,6 @@ public class LogAggregationService extends AbstractService implements
Path remoteRootLogDir;
String remoteRootLogDirSuffix;
private NodeId nodeId;
private boolean isLogAggregationEnabled = false;
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
@ -117,8 +116,6 @@ public class LogAggregationService extends AbstractService implements
this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
this.isLogAggregationEnabled =
conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
super.init(conf);
}
@ -411,31 +408,30 @@ public class LogAggregationService extends AbstractService implements
}
@Override
public void handle(LogAggregatorEvent event) {
if (this.isLogAggregationEnabled) {
switch (event.getType()) {
case APPLICATION_STARTED:
LogAggregatorAppStartedEvent appStartEvent =
(LogAggregatorAppStartedEvent) event;
initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
appStartEvent.getCredentials(),
appStartEvent.getLogRetentionPolicy(),
appStartEvent.getApplicationAcls());
break;
case CONTAINER_FINISHED:
LogAggregatorContainerFinishedEvent containerFinishEvent =
(LogAggregatorContainerFinishedEvent) event;
stopContainer(containerFinishEvent.getContainerId(),
containerFinishEvent.getExitCode());
break;
case APPLICATION_FINISHED:
LogAggregatorAppFinishedEvent appFinishedEvent =
(LogAggregatorAppFinishedEvent) event;
stopApp(appFinishedEvent.getApplicationId());
break;
default:
; // Ignore
}
public void handle(LogHandlerEvent event) {
switch (event.getType()) {
case APPLICATION_STARTED:
LogHandlerAppStartedEvent appStartEvent =
(LogHandlerAppStartedEvent) event;
initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
appStartEvent.getCredentials(),
appStartEvent.getLogRetentionPolicy(),
appStartEvent.getApplicationAcls());
break;
case CONTAINER_FINISHED:
LogHandlerContainerFinishedEvent containerFinishEvent =
(LogHandlerContainerFinishedEvent) event;
stopContainer(containerFinishEvent.getContainerId(),
containerFinishEvent.getExitCode());
break;
case APPLICATION_FINISHED:
LogHandlerAppFinishedEvent appFinishedEvent =
(LogHandlerAppFinishedEvent) event;
stopApp(appFinishedEvent.getApplicationId());
break;
default:
; // Ignore
}
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
public interface LogHandler extends EventHandler<LogHandlerEvent> {
public void handle(LogHandlerEvent event);
}

View File

@ -0,0 +1,153 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Log Handler which schedules deletion of log files based on the configured log
* retention time.
*/
public class NonAggregatingLogHandler extends AbstractService implements
LogHandler {
private static final Log LOG = LogFactory
.getLog(NonAggregatingLogHandler.class);
private final Dispatcher dispatcher;
private final DeletionService delService;
private final Map<ApplicationId, String> appOwners;
private String[] rootLogDirs;
private long deleteDelaySeconds;
private ScheduledThreadPoolExecutor sched;
public NonAggregatingLogHandler(Dispatcher dispatcher,
DeletionService delService) {
super(NonAggregatingLogHandler.class.getName());
this.dispatcher = dispatcher;
this.delService = delService;
this.appOwners = new ConcurrentHashMap<ApplicationId, String>();
}
@Override
public void init(Configuration conf) {
// Default 3 hours.
this.deleteDelaySeconds =
conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 3 * 60 * 60);
this.rootLogDirs =
conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
YarnConfiguration.DEFAULT_NM_LOG_DIRS);
sched = createScheduledThreadPoolExecutor(conf);
super.init(conf);
}
@Override
public void stop() {
sched.shutdown();
boolean isShutdown = false;
try {
isShutdown = sched.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
sched.shutdownNow();
isShutdown = true;
}
if (!isShutdown) {
sched.shutdownNow();
}
super.stop();
}
@Override
public void handle(LogHandlerEvent event) {
switch (event.getType()) {
case APPLICATION_STARTED:
LogHandlerAppStartedEvent appStartedEvent =
(LogHandlerAppStartedEvent) event;
this.appOwners.put(appStartedEvent.getApplicationId(),
appStartedEvent.getUser());
break;
case CONTAINER_FINISHED:
// Ignore
break;
case APPLICATION_FINISHED:
LogHandlerAppFinishedEvent appFinishedEvent =
(LogHandlerAppFinishedEvent) event;
// Schedule - so that logs are available on the UI till they're deleted.
LOG.info("Scheduling Log Deletion for application: "
+ appFinishedEvent.getApplicationId() + ", with delay of "
+ this.deleteDelaySeconds + " seconds");
sched.schedule(
new LogDeleterRunnable(appOwners.remove(appFinishedEvent
.getApplicationId()), appFinishedEvent.getApplicationId()),
this.deleteDelaySeconds, TimeUnit.SECONDS);
break;
default:
; // Ignore
}
}
ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
Configuration conf) {
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build();
sched =
new ScheduledThreadPoolExecutor(conf.getInt(
YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT,
YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf);
return sched;
}
class LogDeleterRunnable implements Runnable {
private String user;
private ApplicationId applicationId;
public LogDeleterRunnable(String user, ApplicationId applicationId) {
this.user = user;
this.applicationId = applicationId;
}
@Override
@SuppressWarnings("unchecked")
public void run() {
Path[] localAppLogDirs =
new Path[NonAggregatingLogHandler.this.rootLogDirs.length];
int index = 0;
for (String rootLogDir : NonAggregatingLogHandler.this.rootLogDirs) {
localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString());
index++;
}
// Inform the application before the actual delete itself, so that links
// to logs will no longer be there on NM web-UI.
NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.applicationId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
NonAggregatingLogHandler.this.delService.delete(user, null,
localAppLogDirs);
}
@Override
public String toString() {
return "LogDeleter for AppId " + this.applicationId.toString()
+ ", owned by " + user;
}
}
}

View File

@ -16,16 +16,16 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class LogAggregatorAppFinishedEvent extends LogAggregatorEvent {
public class LogHandlerAppFinishedEvent extends LogHandlerEvent {
private final ApplicationId applicationId;
public LogAggregatorAppFinishedEvent(ApplicationId appId) {
super(LogAggregatorEventType.APPLICATION_FINISHED);
public LogHandlerAppFinishedEvent(ApplicationId appId) {
super(LogHandlerEventType.APPLICATION_FINISHED);
this.applicationId = appId;
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
import java.util.Map;
@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
public class LogHandlerAppStartedEvent extends LogHandlerEvent {
private final ApplicationId applicationId;
private final ContainerLogsRetentionPolicy retentionPolicy;
@ -33,10 +33,10 @@ public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
private final Credentials credentials;
private final Map<ApplicationAccessType, String> appAcls;
public LogAggregatorAppStartedEvent(ApplicationId appId, String user,
public LogHandlerAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
super(LogAggregatorEventType.APPLICATION_STARTED);
super(LogHandlerEventType.APPLICATION_STARTED);
this.applicationId = appId;
this.user = user;
this.credentials = credentials;

View File

@ -16,18 +16,18 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
import org.apache.hadoop.yarn.api.records.ContainerId;
public class LogAggregatorContainerFinishedEvent extends LogAggregatorEvent {
public class LogHandlerContainerFinishedEvent extends LogHandlerEvent {
private final ContainerId containerId;
private final int exitCode;
public LogAggregatorContainerFinishedEvent(ContainerId containerId,
public LogHandlerContainerFinishedEvent(ContainerId containerId,
int exitCode) {
super(LogAggregatorEventType.CONTAINER_FINISHED);
super(LogHandlerEventType.CONTAINER_FINISHED);
this.containerId = containerId;
this.exitCode = exitCode;
}

View File

@ -16,14 +16,14 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
public class LogAggregatorEvent extends AbstractEvent<LogAggregatorEventType>{
public class LogHandlerEvent extends AbstractEvent<LogHandlerEventType>{
public LogAggregatorEvent(LogAggregatorEventType type) {
public LogHandlerEvent(LogHandlerEventType type) {
super(type);
}
}
}

View File

@ -16,8 +16,8 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
public enum LogAggregatorEventType {
public enum LogHandlerEventType {
APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED
}

View File

@ -53,6 +53,14 @@ public class AggregatedLogsBlock extends HtmlBlock {
logEntity = containerId.toString();
}
if (!conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
html.h1()
._("Aggregation is not enabled. Try the nodemanager at " + nodeId)
._();
return;
}
Path remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
@ -69,7 +77,7 @@ public class AggregatedLogsBlock extends HtmlBlock {
._("Logs not available for "
+ logEntity
+ ". Aggregation may not be complete, "
+ "Check back later or try the nodemanager on "
+ "Check back later or try the nodemanager at "
+ nodeId)._();
return;
} catch (IOException e) {

View File

@ -86,7 +86,9 @@ public class NMController extends Controller implements NMWebParams {
ApplicationId appId =
containerId.getApplicationAttemptId().getApplicationId();
Application app = nmContext.getApplications().get(appId);
if (app == null) {
if (app == null
&& nmConf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL);
String redirectUrl = null;
if (logServerUrl == null || logServerUrl.isEmpty()) {

View File

@ -24,11 +24,9 @@ import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
@ -49,8 +47,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
public class DummyContainerManager extends ContainerManagerImpl {
@ -68,6 +66,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
}
@Override
@SuppressWarnings("unchecked")
protected ResourceLocalizationService createResourceLocalizationService(ContainerExecutor exec,
DeletionService deletionContext) {
return new ResourceLocalizationService(super.dispatcher, exec, deletionContext) {
@ -123,6 +122,7 @@ public class DummyContainerManager extends ContainerManagerImpl {
}
@Override
@SuppressWarnings("unchecked")
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, super.dispatcher, exec) {
@ -147,23 +147,23 @@ public class DummyContainerManager extends ContainerManagerImpl {
}
@Override
protected LogAggregationService createLogAggregationService(Context context,
DeletionService deletionService) {
return new LogAggregationService(new AsyncDispatcher(), context,
deletionService) {
protected LogHandler createLogHandler(Configuration conf,
Context context, DeletionService deletionService) {
return new LogHandler() {
@Override
public void handle(LogAggregatorEvent event) {
public void handle(LogHandlerEvent event) {
switch (event.getType()) {
case APPLICATION_STARTED:
break;
case CONTAINER_FINISHED:
break;
case APPLICATION_FINISHED:
break;
default:
// Ignore
}
case APPLICATION_STARTED:
break;
case CONTAINER_FINISHED:
break;
case APPLICATION_FINISHED:
break;
default:
// Ignore
}
}
};
}
}
}

View File

@ -33,8 +33,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@ -349,7 +349,7 @@ public class TestApplication {
final EventHandler<ContainersMonitorEvent> monitorBus;
final EventHandler<AuxServicesEvent> auxBus;
final EventHandler<ContainerEvent> containerBus;
final EventHandler<LogAggregatorEvent> logAggregationBus;
final EventHandler<LogHandlerEvent> logAggregationBus;
final String user;
final List<Container> containers;
final Context context;
@ -373,7 +373,7 @@ public class TestApplication {
dispatcher.register(ContainersMonitorEventType.class, monitorBus);
dispatcher.register(AuxServicesEventType.class, auxBus);
dispatcher.register(ContainerEventType.class, containerBus);
dispatcher.register(LogAggregatorEventType.class, logAggregationBus);
dispatcher.register(LogHandlerEventType.class, logAggregationBus);
context = mock(Context.class);

View File

@ -70,9 +70,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@ -114,7 +114,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
@ -133,7 +132,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
new File(localLogDir, ConverterUtils.toString(application1));
app1LogDir.mkdir();
logAggregationService
.handle(new LogAggregatorAppStartedEvent(
.handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
@ -143,9 +142,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container11, 0));
new LogHandlerContainerFinishedEvent(container11, 0));
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
logAggregationService.handle(new LogHandlerAppFinishedEvent(
application1));
logAggregationService.stop();
@ -169,7 +168,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ArgumentCaptor<ApplicationEvent> eventCaptor =
ArgumentCaptor.forClass(ApplicationEvent.class);
verify(appEventHandler).handle(eventCaptor.capture());
assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
eventCaptor.getValue().getType());
assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue()
.getApplicationID());
@ -182,7 +181,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
@ -200,11 +198,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
new File(localLogDir, ConverterUtils.toString(application1));
app1LogDir.mkdir();
logAggregationService
.handle(new LogAggregatorAppStartedEvent(
.handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
logAggregationService.handle(new LogHandlerAppFinishedEvent(
application1));
logAggregationService.stop();
@ -217,7 +215,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ArgumentCaptor<ApplicationEvent> eventCaptor =
ArgumentCaptor.forClass(ApplicationEvent.class);
verify(appEventHandler).handle(eventCaptor.capture());
assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
eventCaptor.getValue().getType());
verify(appEventHandler).handle(eventCaptor.capture());
assertEquals(application1, eventCaptor.getValue()
@ -231,7 +229,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
@ -249,7 +246,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
new File(localLogDir, ConverterUtils.toString(application1));
app1LogDir.mkdir();
logAggregationService
.handle(new LogAggregatorAppStartedEvent(
.handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
@ -260,7 +257,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container11, 0));
new LogHandlerContainerFinishedEvent(container11, 0));
ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
ApplicationAttemptId appAttemptId2 =
@ -269,7 +266,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
File app2LogDir =
new File(localLogDir, ConverterUtils.toString(application2));
app2LogDir.mkdir();
logAggregationService.handle(new LogAggregatorAppStartedEvent(
logAggregationService.handle(new LogHandlerAppStartedEvent(
application2, this.user, null,
ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls));
@ -278,13 +275,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
writeContainerLogs(app2LogDir, container21);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container21, 0));
new LogHandlerContainerFinishedEvent(container21, 0));
ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2);
writeContainerLogs(app1LogDir, container12);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container12, 0));
new LogHandlerContainerFinishedEvent(container12, 0));
ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
ApplicationAttemptId appAttemptId3 =
@ -293,7 +290,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
File app3LogDir =
new File(localLogDir, ConverterUtils.toString(application3));
app3LogDir.mkdir();
logAggregationService.handle(new LogAggregatorAppStartedEvent(application3,
logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
@ -301,28 +298,28 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
writeContainerLogs(app3LogDir, container31);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container31, 0));
new LogHandlerContainerFinishedEvent(container31, 0));
ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2);
writeContainerLogs(app3LogDir, container32);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container32, 1)); // Failed
new LogHandlerContainerFinishedEvent(container32, 1)); // Failed
ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2);
writeContainerLogs(app2LogDir, container22);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container22, 0));
new LogHandlerContainerFinishedEvent(container22, 0));
ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3);
writeContainerLogs(app3LogDir, container33);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container33, 0));
new LogHandlerContainerFinishedEvent(container33, 0));
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
logAggregationService.handle(new LogHandlerAppFinishedEvent(
application2));
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
logAggregationService.handle(new LogHandlerAppFinishedEvent(
application3));
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
logAggregationService.handle(new LogHandlerAppFinishedEvent(
application1));
logAggregationService.stop();
@ -342,7 +339,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
List<ApplicationEvent> capturedEvents = eventCaptor.getAllValues();
Set<ApplicationId> appIds = new HashSet<ApplicationId>();
for (ApplicationEvent cap : capturedEvents) {
assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
eventCaptor.getValue().getType());
appIds.add(cap.getApplicationID());
}
@ -447,7 +444,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
public void testLogAggregationForRealContainerLaunch() throws IOException,
InterruptedException {
this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
this.containerManager.start();

View File

@ -0,0 +1,187 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.io.File;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import org.mockito.exceptions.verification.WantedButNotInvoked;
public class TestNonAggregatingLogHandler {
@Test
@SuppressWarnings("unchecked")
public void testLogDeletion() {
DeletionService delService = mock(DeletionService.class);
Configuration conf = new YarnConfiguration();
String user = "testuser";
File[] localLogDirs = new File[2];
localLogDirs[0] =
new File("target", this.getClass().getName() + "-localLogDir0")
.getAbsoluteFile();
localLogDirs[1] =
new File("target", this.getClass().getName() + "-localLogDir1")
.getAbsoluteFile();
String localLogDirsString =
localLogDirs[0].getAbsolutePath() + ","
+ localLogDirs[1].getAbsolutePath();
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
DrainDispatcher dispatcher = createDispatcher(conf);
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(appId1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
NonAggregatingLogHandler logHandler =
new NonAggregatingLogHandler(dispatcher, delService);
logHandler.init(conf);
logHandler.start();
logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
Path[] localAppLogDirs = new Path[2];
localAppLogDirs[0] =
new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
localAppLogDirs[1] =
new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
// 5 seconds for the delete which is a separate thread.
long verifyStartTime = System.currentTimeMillis();
WantedButNotInvoked notInvokedException = null;
boolean matched = false;
while (!matched && System.currentTimeMillis() < verifyStartTime + 5000l) {
try {
verify(delService).delete(eq(user), (Path) eq(null),
eq(localAppLogDirs[0]), eq(localAppLogDirs[1]));
matched = true;
} catch (WantedButNotInvoked e) {
notInvokedException = e;
try {
Thread.sleep(50l);
} catch (InterruptedException i) {
}
}
}
if (!matched) {
throw notInvokedException;
}
}
@Test
@SuppressWarnings("unchecked")
public void testDelayedDelete() {
DeletionService delService = mock(DeletionService.class);
Configuration conf = new YarnConfiguration();
String user = "testuser";
File[] localLogDirs = new File[2];
localLogDirs[0] =
new File("target", this.getClass().getName() + "-localLogDir0")
.getAbsoluteFile();
localLogDirs[1] =
new File("target", this.getClass().getName() + "-localLogDir1")
.getAbsoluteFile();
String localLogDirsString =
localLogDirs[0].getAbsolutePath() + ","
+ localLogDirs[1].getAbsolutePath();
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 10800l);
DrainDispatcher dispatcher = createDispatcher(conf);
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(appId1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
NonAggregatingLogHandler logHandler =
new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService);
logHandler.init(conf);
logHandler.start();
logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
Path[] localAppLogDirs = new Path[2];
localAppLogDirs[0] =
new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
localAppLogDirs[1] =
new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
ScheduledThreadPoolExecutor mockSched =
((NonAggregatingLogHandlerWithMockExecutor) logHandler).mockSched;
verify(mockSched).schedule(any(Runnable.class), eq(10800l),
eq(TimeUnit.SECONDS));
}
private class NonAggregatingLogHandlerWithMockExecutor extends
NonAggregatingLogHandler {
private ScheduledThreadPoolExecutor mockSched;
public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher,
DeletionService delService) {
super(dispatcher, delService);
}
@Override
ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
Configuration conf) {
mockSched = mock(ScheduledThreadPoolExecutor.class);
return mockSched;
}
}
private DrainDispatcher createDispatcher(Configuration conf) {
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
dispatcher.start();
return dispatcher;
}
}

View File

@ -224,18 +224,13 @@ Hadoop MapReduce Next Generation - Cluster Setup
| | <<<ResourceManager>>> Scheduler class. | |
| | | <<<CapacityScheduler>>> (recommended) or <<<FifoScheduler>>> |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.remote-app-log-dir>>> | | |
| | </logs> | |
| | | HDFS directory where the application logs are moved on application |
| | | completion. Need to set appropriate permissions. |
*-------------------------+-------------------------+------------------------+
| <<<yarn.resourcemanager.nodes.include-path>>> / | | |
| <<<yarn.resourcemanager.nodes.exclude-path>>> | | |
| | List of permitted/excluded NodeManagers. | |
| | | If necessary, use these files to control the list of allowable |
| | | NodeManagers. |
*-------------------------+-------------------------+------------------------+
|
* Configurations for NodeManager:
*-------------------------+-------------------------+------------------------+
@ -263,6 +258,27 @@ Hadoop MapReduce Next Generation - Cluster Setup
| | are written. | |
| | | Multiple paths help spread disk i/o. |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.log-aggregation-enable>>> | | |
| | <false> | |
| | | Configuration to enable or disable log aggregation |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.log.retain-seconds>>> | | |
| | <10800> | |
| | | Default time (in seconds) to retain log files on the NodeManager |
| | | Only applicable if log-aggregation is disabled. |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.remote-app-log-dir>>> | | |
| | </logs> | |
| | | HDFS directory where the application logs are moved on application |
| | | completion. Need to set appropriate permissions. |
| | | Only applicable if log-aggregation is enabled. |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.remote-app-log-dir-suffix>>> | | |
| | <logs> | |
| | | Suffix appended to the remote log dir. Logs will be aggregated to |
| | | $\{yarn.nodemanager.remote-app-log-dir\}/$\{user\}/$\{thisParam\} |
| | | Only applicable if log-aggregation is enabled. |
*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.aux-services>>> | | |
| | mapreduce.shuffle | |
| | | Shuffle service that needs to be set for Map Reduce applications. |