YARN-3034. Implement RM starting its timeline collector. Contributed by Naganarasimha G R

This commit is contained in:
Junping Du 2015-03-24 13:42:14 -07:00 committed by Sangjin Lee
parent 19c1132146
commit 5e3d9a477b
9 changed files with 236 additions and 21 deletions

View File

@ -485,12 +485,20 @@ public class YarnConfiguration extends Configuration {
/**
* The setting that controls whether yarn system metrics is published on the
* timeline server or not by RM.
* timeline server or not by RM. This configuration setting is for ATS V1
*/
public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED =
RM_PREFIX + "system-metrics-publisher.enabled";
public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = RM_PREFIX
+ "system-metrics-publisher.enabled";
public static final boolean DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
/**
* The setting that controls whether yarn system metrics is published on the
* timeline server or not by RM and NM. This configuration setting is for ATS V2
*/
public static final String SYSTEM_METRICS_PUBLISHER_ENABLED = YARN_PREFIX
+ "system-metrics-publisher.enabled";
public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size";
public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =

View File

@ -784,11 +784,20 @@
<property>
<description>The setting that controls whether yarn system metrics is
published on the timeline server or not by RM.</description>
published to the Timeline server (version one) or not, by RM.
This configuration is deprecated.</description>
<name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
<value>false</value>
</property>
<property>
<description>The setting that controls whether yarn system metrics is
published on the Timeline server (version two) or not by RM And NM.</description>
<name>yarn.system-metrics-publisher.enabled</name>
<value>false</value>
</property>
<property>
<description>Number of worker threads that send the yarn system metrics
data.</description>

View File

@ -170,6 +170,10 @@
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
@ -94,6 +95,10 @@ public class RMActiveServiceContext {
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private RMTimelineCollector timelineCollector;
private RMNodeLabelsManager nodeLabelManager;
private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
private long epoch;
@ -368,6 +373,44 @@ public class RMActiveServiceContext {
return this.isWorkPreservingRecoveryEnabled;
}
@Private
@Unstable
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return rmApplicationHistoryWriter;
}
@Private
@Unstable
public RMTimelineCollector getRMTimelineCollector() {
return timelineCollector;
}
@Private
@Unstable
public void setRMTimelineCollector(RMTimelineCollector timelineCollector) {
this.timelineCollector = timelineCollector;
}
@Private
@Unstable
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher;
}
@Private
@Unstable
public SystemMetricsPublisher getSystemMetricsPublisher() {
return systemMetricsPublisher;
}
@Private
@Unstable
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
}
@Private
@Unstable
public long getEpoch() {

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
/**
* Context of the ResourceManager.
@ -113,6 +114,10 @@ public interface RMContext {
SystemMetricsPublisher getSystemMetricsPublisher();
void setRMTimelineCollector(RMTimelineCollector timelineCollector);
RMTimelineCollector getRMTimelineCollector();
ConfigurationProvider getConfigurationProvider();
boolean isWorkPreservingRecoveryEnabled();

View File

@ -22,8 +22,8 @@ import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
import org.apache.hadoop.yarn.util.Clock;
import com.google.common.annotations.VisibleForTesting;
@ -369,6 +370,17 @@ public class RMContextImpl implements RMContext {
return this.rmApplicationHistoryWriter;
}
@Override
public void setRMTimelineCollector(
RMTimelineCollector timelineCollector) {
activeServiceContext.setRMTimelineCollector(timelineCollector);
}
@Override
public RMTimelineCollector getRMTimelineCollector() {
return activeServiceContext.getRMTimelineCollector();
}
@Override
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {

View File

@ -18,7 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.AuthInfo;
@ -95,6 +104,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
@ -119,6 +129,7 @@ import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
/**
* The ResourceManager is the main class that is a set of components.
@ -449,6 +460,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new RMApplicationHistoryWriter();
}
private RMTimelineCollector createRMTimelineCollector() {
return new RMTimelineCollector();
}
protected SystemMetricsPublisher createSystemMetricsPublisher() {
return new SystemMetricsPublisher();
}
@ -565,6 +580,20 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
}
RMApplicationHistoryWriter rmApplicationHistoryWriter =
createRMApplicationHistoryWriter();
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
addService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
RMTimelineCollector timelineCollector =
createRMTimelineCollector();
addService(timelineCollector);
rmContext.setRMTimelineCollector(timelineCollector);
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);

View File

@ -58,7 +58,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
/**
* The class that helps RM publish metrics to the timeline server. RM will
* The class that helps RM publish metrics to the timeline server V1. RM will
* always invoke the methods of this class regardless the service is enabled or
* not. If it is disabled, publishing requests will be ignored silently.
*/
@ -71,7 +71,7 @@ public class SystemMetricsPublisher extends CompositeService {
private Dispatcher dispatcher;
private TimelineClient client;
private boolean publishSystemMetrics;
private boolean publishSystemMetricsToATSv1;
public SystemMetricsPublisher() {
super(SystemMetricsPublisher.class.getName());
@ -79,13 +79,14 @@ public class SystemMetricsPublisher extends CompositeService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
publishSystemMetrics =
publishSystemMetricsToATSv1 =
conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) &&
conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED);
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
&& conf.getBoolean(
YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
if (publishSystemMetrics) {
if (publishSystemMetricsToATSv1) {
client = TimelineClient.createTimelineClient();
addIfService(client);
@ -102,7 +103,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appCreated(RMApp app, long createdTime) {
if (publishSystemMetrics) {
if (publishSystemMetricsToATSv1) {
ApplicationSubmissionContext appSubmissionContext =
app.getApplicationSubmissionContext();
dispatcher.getEventHandler().handle(
@ -124,7 +125,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appUpdated(RMApp app, long updatedTime) {
if (publishSystemMetrics) {
if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler()
.handle(new ApplicationUpdatedEvent(app.getApplicationId(),
app.getQueue(), updatedTime,
@ -134,7 +135,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
if (publishSystemMetrics) {
if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler().handle(
new ApplicationFinishedEvent(
app.getApplicationId(),
@ -151,7 +152,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appACLsUpdated(RMApp app, String appViewACLs,
long updatedTime) {
if (publishSystemMetrics) {
if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler().handle(
new ApplicationACLsUpdatedEvent(
app.getApplicationId(),
@ -175,7 +176,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
if (publishSystemMetrics) {
if (publishSystemMetricsToATSv1) {
ContainerId container = (appAttempt.getMasterContainer() == null) ? null
: appAttempt.getMasterContainer().getId();
dispatcher.getEventHandler().handle(
@ -193,7 +194,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
if (publishSystemMetrics) {
if (publishSystemMetricsToATSv1) {
ContainerId container = (appAttempt.getMasterContainer() == null) ? null
: appAttempt.getMasterContainer().getId();
dispatcher.getEventHandler().handle(
@ -213,7 +214,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void containerCreated(RMContainer container, long createdTime) {
if (publishSystemMetrics) {
if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler().handle(
new ContainerCreatedEvent(
container.getContainerId(),
@ -226,7 +227,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked")
public void containerFinished(RMContainer container, long finishedTime) {
if (publishSystemMetrics) {
if (publishSystemMetricsToATSv1) {
dispatcher.getEventHandler().handle(
new ContainerFinishedEvent(
container.getContainerId(),

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
/**
* This class is responsible for posting application and appattempt lifecycle
* related events to timeline service V2
*/
@Private
@Unstable
public class RMTimelineCollector extends TimelineCollector {
private static final Log LOG = LogFactory.getLog(RMTimelineCollector.class);
public RMTimelineCollector() {
super("Resource Manager TimelineCollector");
}
private Dispatcher dispatcher;
private boolean publishSystemMetricsForV2;
@Override
protected void serviceInit(Configuration conf) throws Exception {
publishSystemMetricsForV2 =
conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
&& conf.getBoolean(
YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
if (publishSystemMetricsForV2) {
// having separate dispatcher to avoid load on RMDispatcher
LOG.info("RMTimelineCollector has been configured to publish"
+ " System Metrics in ATS V2");
dispatcher = new AsyncDispatcher();
dispatcher.register(SystemMetricsEventType.class,
new ForwardingEventHandler());
} else {
LOG.warn("RMTimelineCollector has not been configured to publish"
+ " System Metrics in ATS V2");
}
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
}
protected void handleSystemMetricsEvent(SystemMetricsEvent event) {
switch (event.getType()) {
default:
LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
}
}
/**
* EventHandler implementation which forward events to SystemMetricsPublisher.
* Making use of it, SystemMetricsPublisher can avoid to have a public handle
* method.
*/
private final class ForwardingEventHandler implements
EventHandler<SystemMetricsEvent> {
@Override
public void handle(SystemMetricsEvent event) {
handleSystemMetricsEvent(event);
}
}
}