YARN-3044. Made RM write app, attempt and optional container lifecycle events to timeline service v2. Contributed by Naganarasimha G R.

This commit is contained in:
Zhijie Shen 2015-06-13 11:32:41 -07:00 committed by Sangjin Lee
parent 42d7864b84
commit f3c661e8dd
17 changed files with 1470 additions and 474 deletions

View File

@ -470,4 +470,7 @@ public class TimelineEntity {
return real == null ? this : real; return real == null ? this : real;
} }
public String toString() {
return identifier.toString();
}
} }

View File

@ -500,6 +500,16 @@ public class YarnConfiguration extends Configuration {
+ "system-metrics-publisher.enabled"; + "system-metrics-publisher.enabled";
public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false; public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
/**
* The setting that controls whether yarn container metrics is published to
* the timeline server or not by RM. This configuration setting is for ATS
* V2
*/
public static final String RM_PUBLISH_CONTAINER_METRICS_ENABLED = YARN_PREFIX
+ "rm.system-metrics-publisher.emit-container-events";
public static final boolean DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED =
false;
public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size"; RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size";
public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =

View File

@ -74,6 +74,8 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.apache.hadoop.yarn.server.timeline.NameValuePair;
@ -86,7 +88,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineW
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -140,6 +141,7 @@ public class TestDistributedShell {
// disable aux-service based timeline aggregators // disable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, ""); conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
conf.set("mapreduce.jobhistory.address", conf.set("mapreduce.jobhistory.address",
@ -494,50 +496,98 @@ public class TestDistributedShell {
"/1/1/" : "/test_flow_name/test_flow_version/12345678/") + "/1/1/" : "/test_flow_name/test_flow_version/12345678/") +
appId.toString(); appId.toString();
// for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
String outputDirApp = basePath + "/DS_APP_ATTEMPT/";
File entityFolder = new File(outputDirApp);
Assert.assertTrue(entityFolder.isDirectory());
// Verify DS_APP_ATTEMPT entities posted by the client
// there will be at least one attempt, look for that file // there will be at least one attempt, look for that file
String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp() String appTimestampFileName =
+ "_000" + appId.getId() + "_000001" "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + "_000001"
String appAttemptFileName = outputDirApp + appTimestampFileName; + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File appAttemptFile = new File(appAttemptFileName); verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT",
Assert.assertTrue(appAttemptFile.exists()); appTimestampFileName);
String outputDirContainer = basePath + "/DS_CONTAINER/"; // Verify DS_CONTAINER entities posted by the client
File containerFolder = new File(outputDirContainer); String containerTimestampFileName =
Assert.assertTrue(containerFolder.isDirectory()); "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_01_000002.thist";
String containerTimestampFileName = "container_" verifyEntityTypeFileExists(basePath, "DS_CONTAINER",
+ appId.getClusterTimestamp() + "_000" + appId.getId() containerTimestampFileName);
+ "_01_000002.thist";
String containerFileName = outputDirContainer + containerTimestampFileName;
File containerFile = new File(containerFileName);
Assert.assertTrue(containerFile.exists());
// Verify NM posting container metrics info. // Verify NM posting container metrics info.
String outputDirContainerMetrics = basePath + "/" + String containerMetricsTimestampFileName =
TimelineEntityType.YARN_CONTAINER + "/"; "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
File containerMetricsFolder = new File(outputDirContainerMetrics); + "_01_000001"
Assert.assertTrue(containerMetricsFolder.isDirectory()); + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_CONTAINER.toString(),
containerMetricsTimestampFileName);
String containerMetricsTimestampFileName = "container_" // Verify RM posting Application life cycle Events are getting published
+ appId.getClusterTimestamp() + "_000" + appId.getId() String appMetricsTimestampFileName =
+ "_01_000001.thist"; "application_" + appId.getClusterTimestamp() + "_000" + appId.getId()
String containerMetricsFileName = outputDirContainerMetrics + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
containerMetricsTimestampFileName; File appEntityFile =
verifyEntityTypeFileExists(basePath,
File containerMetricsFile = new File(containerMetricsFileName); TimelineEntityType.YARN_APPLICATION.toString(),
Assert.assertTrue(containerMetricsFile.exists()); appMetricsTimestampFileName);
verifyStringExistsSpecifiedTimes(appEntityFile,
ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1,
"Application created event should be published atleast once");
verifyStringExistsSpecifiedTimes(appEntityFile,
ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1,
"Application finished event should be published atleast once");
// Verify RM posting AppAttempt life cycle Events are getting published
String appAttemptMetricsTimestampFileName =
"appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_000001"
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File appAttemptEntityFile =
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
appAttemptMetricsTimestampFileName);
verifyStringExistsSpecifiedTimes(appAttemptEntityFile,
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1,
"AppAttempt register event should be published atleast once");
verifyStringExistsSpecifiedTimes(appAttemptEntityFile,
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1,
"AppAttempt finished event should be published atleast once");
} finally { } finally {
FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
} }
} }
private File verifyEntityTypeFileExists(String basePath, String entityType,
String entityfileName) {
String outputDirPathForEntity = basePath + "/" + entityType + "/";
File outputDirForEntity = new File(outputDirPathForEntity);
Assert.assertTrue(outputDirForEntity.isDirectory());
String entityFilePath = outputDirPathForEntity + entityfileName;
File entityFile = new File(entityFilePath);
Assert.assertTrue(entityFile.exists());
return entityFile;
}
private void verifyStringExistsSpecifiedTimes(File entityFile,
String searchString, long expectedNumOfTimes, String errorMsg)
throws IOException {
BufferedReader reader = null;
String strLine;
long actualCount = 0;
try {
reader = new BufferedReader(new FileReader(entityFile));
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().contains(searchString))
actualCount++;
}
} finally {
reader.close();
}
Assert.assertEquals(errorMsg, expectedNumOfTimes, actualCount);
}
/** /**
* Utility function to merge two String arrays to form a new String array for * Utility function to merge two String arrays to form a new String array for
* our argumemts. * our argumemts.

View File

@ -27,10 +27,20 @@ public class ContainerMetricsConstants {
public static final String ENTITY_TYPE = "YARN_CONTAINER"; public static final String ENTITY_TYPE = "YARN_CONTAINER";
// Event of this type will be emitted by NM.
public static final String CREATED_EVENT_TYPE = "YARN_CONTAINER_CREATED"; public static final String CREATED_EVENT_TYPE = "YARN_CONTAINER_CREATED";
// Event of this type will be emitted by RM.
public static final String CREATED_IN_RM_EVENT_TYPE =
"YARN_RM_CONTAINER_CREATED";
// Event of this type will be emitted by NM.
public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED"; public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED";
// Event of this type will be emitted by RM.
public static final String FINISHED_IN_RM_EVENT_TYPE =
"YARN_RM_CONTAINER_FINISHED";
public static final String PARENT_PRIMARIY_FILTER = "YARN_CONTAINER_PARENT"; public static final String PARENT_PRIMARIY_FILTER = "YARN_CONTAINER_PARENT";
public static final String ALLOCATED_MEMORY_ENTITY_INFO = public static final String ALLOCATED_MEMORY_ENTITY_INFO =

View File

@ -393,7 +393,7 @@ public class TestAppLogAggregatorImpl {
new NMTokenSecretManagerInNM(), new NMTokenSecretManagerInNM(),
null, null,
new ApplicationACLsManager(conf), new ApplicationACLsManager(conf),
new NMNullStateStoreService(), false); new NMNullStateStoreService(), false, conf);
} }
private static final class AppLogAggregatorInTest extends private static final class AppLogAggregatorInTest extends
@ -431,4 +431,4 @@ public class TestAppLogAggregatorImpl {
return spy(new LogWriter(conf, remoteAppLogFile, ugi)); return spy(new LogWriter(conf, remoteAppLogFile, ugi));
} }
} }
} }

View File

@ -465,7 +465,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
protected SystemMetricsPublisher createSystemMetricsPublisher() { protected SystemMetricsPublisher createSystemMetricsPublisher() {
return new SystemMetricsPublisher(); return new SystemMetricsPublisher(rmContext);
} }
// sanity check for configurations // sanity check for configurations

View File

@ -0,0 +1,191 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.TimelineServicePublisher;
public abstract class AbstractTimelineServicePublisher extends CompositeService
implements TimelineServicePublisher, EventHandler<SystemMetricsEvent> {
private static final Log LOG = LogFactory
.getLog(TimelineServiceV2Publisher.class);
private Configuration conf;
public AbstractTimelineServicePublisher(String name) {
super(name);
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
}
@Override
public void handle(SystemMetricsEvent event) {
switch (event.getType()) {
case APP_CREATED:
publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
break;
case APP_FINISHED:
publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
break;
case APP_UPDATED:
publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event);
break;
case APP_STATE_UPDATED:
publishApplicationStateUpdatedEvent(
(ApplicaitonStateUpdatedEvent)event);
break;
case APP_ACLS_UPDATED:
publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
break;
case APP_ATTEMPT_REGISTERED:
publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
break;
case APP_ATTEMPT_FINISHED:
publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
break;
case CONTAINER_CREATED:
publishContainerCreatedEvent((ContainerCreatedEvent) event);
break;
case CONTAINER_FINISHED:
publishContainerFinishedEvent((ContainerFinishedEvent) event);
break;
default:
LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
}
}
abstract void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event);
abstract void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event);
abstract void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event);
abstract void publishApplicationStateUpdatedEvent(
ApplicaitonStateUpdatedEvent event);
abstract void publishApplicationACLsUpdatedEvent(
ApplicationACLsUpdatedEvent event);
abstract void publishApplicationFinishedEvent(ApplicationFinishedEvent event);
abstract void publishApplicationCreatedEvent(ApplicationCreatedEvent event);
abstract void publishContainerCreatedEvent(ContainerCreatedEvent event);
abstract void publishContainerFinishedEvent(ContainerFinishedEvent event);
@Override
public Dispatcher getDispatcher() {
MultiThreadedDispatcher dispatcher =
new MultiThreadedDispatcher(
conf.getInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
dispatcher.setDrainEventsOnStop();
return dispatcher;
}
@Override
public boolean publishRMContainerMetrics() {
return true;
}
@Override
public EventHandler<SystemMetricsEvent> getEventHandler() {
return this;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public static class MultiThreadedDispatcher extends CompositeService
implements Dispatcher {
private List<AsyncDispatcher> dispatchers =
new ArrayList<AsyncDispatcher>();
public MultiThreadedDispatcher(int num) {
super(MultiThreadedDispatcher.class.getName());
for (int i = 0; i < num; ++i) {
AsyncDispatcher dispatcher = createDispatcher();
dispatchers.add(dispatcher);
addIfService(dispatcher);
}
}
@Override
public EventHandler getEventHandler() {
return new CompositEventHandler();
}
@Override
public void register(Class<? extends Enum> eventType, EventHandler handler) {
for (AsyncDispatcher dispatcher : dispatchers) {
dispatcher.register(eventType, handler);
}
}
public void setDrainEventsOnStop() {
for (AsyncDispatcher dispatcher : dispatchers) {
dispatcher.setDrainEventsOnStop();
}
}
private class CompositEventHandler implements EventHandler<Event> {
@Override
public void handle(Event event) {
// Use hashCode (of ApplicationId) to dispatch the event to the child
// dispatcher, such that all the writing events of one application will
// be handled by one thread, the scheduled order of the these events
// will be preserved
int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
dispatchers.get(index).getEventHandler().handle(event);
}
}
protected AsyncDispatcher createDispatcher() {
return new AsyncDispatcher();
}
}
}

View File

@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
public class ApplicationFinishedEvent extends public class ApplicationFinishedEvent extends
@ -33,6 +35,7 @@ public class ApplicationFinishedEvent extends
private YarnApplicationState state; private YarnApplicationState state;
private ApplicationAttemptId latestAppAttemptId; private ApplicationAttemptId latestAppAttemptId;
private RMAppMetrics appMetrics; private RMAppMetrics appMetrics;
private RMAppImpl app;
public ApplicationFinishedEvent( public ApplicationFinishedEvent(
ApplicationId appId, ApplicationId appId,
@ -41,14 +44,16 @@ public class ApplicationFinishedEvent extends
YarnApplicationState state, YarnApplicationState state,
ApplicationAttemptId latestAppAttemptId, ApplicationAttemptId latestAppAttemptId,
long finishedTime, long finishedTime,
RMAppMetrics appMetrics) { RMAppMetrics appMetrics,
RMAppImpl app) {
super(SystemMetricsEventType.APP_FINISHED, finishedTime); super(SystemMetricsEventType.APP_FINISHED, finishedTime);
this.appId = appId; this.appId = appId;
this.diagnosticsInfo = diagnosticsInfo; this.diagnosticsInfo = diagnosticsInfo;
this.appStatus = appStatus; this.appStatus = appStatus;
this.latestAppAttemptId = latestAppAttemptId; this.latestAppAttemptId = latestAppAttemptId;
this.state = state; this.state = state;
this.appMetrics=appMetrics; this.appMetrics = appMetrics;
this.app = app;
} }
@Override @Override
@ -56,6 +61,10 @@ public class ApplicationFinishedEvent extends
return appId.hashCode(); return appId.hashCode();
} }
public RMAppImpl getApp() {
return app;
}
public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
return appId; return appId;
} }

View File

@ -18,10 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.metrics; package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -29,8 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -38,27 +32,25 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
/** /**
* The class that helps RM publish metrics to the timeline server V1. RM will * The class that helps RM publish metrics to the timeline server. RM will
* always invoke the methods of this class regardless the service is enabled or * always invoke the methods of this class regardless the service is enabled or
* not. If it is disabled, publishing requests will be ignored silently. * not. If it is disabled, publishing requests will be ignored silently.
*/ */
@ -70,30 +62,38 @@ public class SystemMetricsPublisher extends CompositeService {
.getLog(SystemMetricsPublisher.class); .getLog(SystemMetricsPublisher.class);
private Dispatcher dispatcher; private Dispatcher dispatcher;
private TimelineClient client; private boolean publishSystemMetrics;
private boolean publishSystemMetricsToATSv1; private boolean publishContainerMetrics;
protected RMContext rmContext;
public SystemMetricsPublisher() { public SystemMetricsPublisher(RMContext rmContext) {
super(SystemMetricsPublisher.class.getName()); super(SystemMetricsPublisher.class.getName());
this.rmContext = rmContext;
} }
@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
publishSystemMetricsToATSv1 = publishSystemMetrics =
conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
&& conf.getBoolean( if (publishSystemMetrics) {
YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, TimelineServicePublisher timelineServicePublisher =
YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); getTimelineServicePublisher(conf);
if (timelineServicePublisher != null) {
if (publishSystemMetricsToATSv1) { addService(timelineServicePublisher);
client = TimelineClient.createTimelineClient(); // init required to be called so that other methods of
addIfService(client); // TimelineServicePublisher can be utilized
timelineServicePublisher.init(conf);
dispatcher = createDispatcher(conf); dispatcher = createDispatcher(timelineServicePublisher);
dispatcher.register(SystemMetricsEventType.class, publishContainerMetrics =
new ForwardingEventHandler()); timelineServicePublisher.publishRMContainerMetrics();
addIfService(dispatcher); dispatcher.register(SystemMetricsEventType.class,
timelineServicePublisher.getEventHandler());
addIfService(dispatcher);
} else {
LOG.info("TimelineServicePublisher is not configured");
publishSystemMetrics = false;
}
LOG.info("YARN system metrics publishing service is enabled"); LOG.info("YARN system metrics publishing service is enabled");
} else { } else {
LOG.info("YARN system metrics publishing service is not enabled"); LOG.info("YARN system metrics publishing service is not enabled");
@ -101,9 +101,26 @@ public class SystemMetricsPublisher extends CompositeService {
super.serviceInit(conf); super.serviceInit(conf);
} }
@VisibleForTesting
Dispatcher createDispatcher(TimelineServicePublisher timelineServicePublisher) {
return timelineServicePublisher.getDispatcher();
}
TimelineServicePublisher getTimelineServicePublisher(Configuration conf) {
if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
return new TimelineServiceV1Publisher();
} else if (conf.getBoolean(
YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
return new TimelineServiceV2Publisher(rmContext);
}
return null;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void appCreated(RMApp app, long createdTime) { public void appCreated(RMApp app, long createdTime) {
if (publishSystemMetricsToATSv1) { if (publishSystemMetrics) {
ApplicationSubmissionContext appSubmissionContext = ApplicationSubmissionContext appSubmissionContext =
app.getApplicationSubmissionContext(); app.getApplicationSubmissionContext();
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
@ -125,7 +142,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void appUpdated(RMApp app, long updatedTime) { public void appUpdated(RMApp app, long updatedTime) {
if (publishSystemMetricsToATSv1) { if (publishSystemMetrics) {
dispatcher.getEventHandler() dispatcher.getEventHandler()
.handle(new ApplicationUpdatedEvent(app.getApplicationId(), .handle(new ApplicationUpdatedEvent(app.getApplicationId(),
app.getQueue(), updatedTime, app.getQueue(), updatedTime,
@ -135,7 +152,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void appFinished(RMApp app, RMAppState state, long finishedTime) { public void appFinished(RMApp app, RMAppState state, long finishedTime) {
if (publishSystemMetricsToATSv1) { if (publishSystemMetrics) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ApplicationFinishedEvent( new ApplicationFinishedEvent(
app.getApplicationId(), app.getApplicationId(),
@ -145,14 +162,15 @@ public class SystemMetricsPublisher extends CompositeService {
app.getCurrentAppAttempt() == null ? app.getCurrentAppAttempt() == null ?
null : app.getCurrentAppAttempt().getAppAttemptId(), null : app.getCurrentAppAttempt().getAppAttemptId(),
finishedTime, finishedTime,
app.getRMAppMetrics())); app.getRMAppMetrics(),
(RMAppImpl)app));
} }
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void appACLsUpdated(RMApp app, String appViewACLs, public void appACLsUpdated(RMApp app, String appViewACLs,
long updatedTime) { long updatedTime) {
if (publishSystemMetricsToATSv1) { if (publishSystemMetrics) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ApplicationACLsUpdatedEvent( new ApplicationACLsUpdatedEvent(
app.getApplicationId(), app.getApplicationId(),
@ -164,7 +182,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void appStateUpdated(RMApp app, YarnApplicationState appState, public void appStateUpdated(RMApp app, YarnApplicationState appState,
long updatedTime) { long updatedTime) {
if (publishSystemMetricsToATSv1) { if (publishSystemMetrics) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ApplicaitonStateUpdatedEvent( new ApplicaitonStateUpdatedEvent(
app.getApplicationId(), app.getApplicationId(),
@ -176,7 +194,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void appAttemptRegistered(RMAppAttempt appAttempt, public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) { long registeredTime) {
if (publishSystemMetricsToATSv1) { if (publishSystemMetrics) {
ContainerId container = (appAttempt.getMasterContainer() == null) ? null ContainerId container = (appAttempt.getMasterContainer() == null) ? null
: appAttempt.getMasterContainer().getId(); : appAttempt.getMasterContainer().getId();
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
@ -194,7 +212,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void appAttemptFinished(RMAppAttempt appAttempt, public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
if (publishSystemMetricsToATSv1) { if (publishSystemMetrics) {
ContainerId container = (appAttempt.getMasterContainer() == null) ? null ContainerId container = (appAttempt.getMasterContainer() == null) ? null
: appAttempt.getMasterContainer().getId(); : appAttempt.getMasterContainer().getId();
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
@ -214,7 +232,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void containerCreated(RMContainer container, long createdTime) { public void containerCreated(RMContainer container, long createdTime) {
if (publishSystemMetricsToATSv1) { if (publishContainerMetrics) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ContainerCreatedEvent( new ContainerCreatedEvent(
container.getContainerId(), container.getContainerId(),
@ -227,7 +245,7 @@ public class SystemMetricsPublisher extends CompositeService {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void containerFinished(RMContainer container, long finishedTime) { public void containerFinished(RMContainer container, long finishedTime) {
if (publishSystemMetricsToATSv1) { if (publishContainerMetrics) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ContainerFinishedEvent( new ContainerFinishedEvent(
container.getContainerId(), container.getContainerId(),
@ -238,396 +256,31 @@ public class SystemMetricsPublisher extends CompositeService {
} }
} }
protected Dispatcher createDispatcher(Configuration conf) { @VisibleForTesting
MultiThreadedDispatcher dispatcher = boolean isPublishContainerMetrics() {
new MultiThreadedDispatcher( return publishContainerMetrics;
conf.getInt( }
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); @VisibleForTesting
dispatcher.setDrainEventsOnStop(); Dispatcher getDispatcher() {
return dispatcher; return dispatcher;
} }
protected void handleSystemMetricsEvent( interface TimelineServicePublisher extends Service {
SystemMetricsEvent event) { /**
switch (event.getType()) { * @return the Dispatcher which needs to be used to dispatch events
case APP_CREATED: */
publishApplicationCreatedEvent((ApplicationCreatedEvent) event); Dispatcher getDispatcher();
break;
case APP_FINISHED: /**
publishApplicationFinishedEvent((ApplicationFinishedEvent) event); * @return true if RMContainerMetricsNeeds to be sent
break; */
case APP_ACLS_UPDATED: boolean publishRMContainerMetrics();
publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
break; /**
case APP_UPDATED: * @return EventHandler which needs to be registered to the dispatcher to
publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event); * handle the SystemMetricsEvent
break; */
case APP_STATE_UPDATED: EventHandler<SystemMetricsEvent> getEventHandler();
publishApplicationStateUpdatedEvent(
(ApplicaitonStateUpdatedEvent)event);
break;
case APP_ATTEMPT_REGISTERED:
publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
break;
case APP_ATTEMPT_FINISHED:
publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
break;
case CONTAINER_CREATED:
publishContainerCreatedEvent((ContainerCreatedEvent) event);
break;
case CONTAINER_FINISHED:
publishContainerFinishedEvent((ContainerFinishedEvent) event);
break;
default:
LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
}
} }
private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
TimelineEntity entity =
createApplicationEntity(event.getApplicationId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
event.getApplicationName());
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
event.getApplicationType());
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
event.getUser());
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
event.getSubmittedTime());
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
event.getAppTags());
entityInfo.put(
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
event.isUnmanagedApp());
entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
event.getApplicationPriority().getPriority());
entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
event.getAppNodeLabelsExpression());
entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
event.getAmNodeLabelsExpression());
if (event.getCallerContext() != null) {
if (event.getCallerContext().getContext() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT,
event.getCallerContext().getContext());
}
if (event.getCallerContext().getSignature() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE,
event.getCallerContext().getSignature());
}
}
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(
ApplicationMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
TimelineEntity entity =
createApplicationEntity(event.getApplicationId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(
ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
event.getFinalApplicationStatus().toString());
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
event.getYarnApplicationState().toString());
if (event.getLatestApplicationAttemptId() != null) {
eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
event.getLatestApplicationAttemptId().toString());
}
RMAppMetrics appMetrics = event.getAppMetrics();
entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
appMetrics.getVcoreSeconds());
entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
appMetrics.getMemorySeconds());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
.getApplicationPriority().getPriority());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishApplicationStateUpdatedEvent(
ApplicaitonStateUpdatedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
event.getAppState());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishApplicationACLsUpdatedEvent(
ApplicationACLsUpdatedEvent event) {
TimelineEntity entity =
createApplicationEntity(event.getApplicationId());
TimelineEvent tEvent = new TimelineEvent();
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
event.getViewAppACLs());
entity.setOtherInfo(entityInfo);
tEvent.setEventType(
ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
putEntity(entity);
}
private static TimelineEntity createApplicationEntity(
ApplicationId applicationId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
entity.setEntityId(applicationId.toString());
return entity;
}
private void
publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
TimelineEntity entity =
createAppAttemptEntity(event.getApplicationAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(
AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
event.getTrackingUrl());
eventInfo.put(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
event.getOriginalTrackingURL());
eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
event.getHost());
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
event.getRpcPort());
if (event.getMasterContainerId() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
event.getMasterContainerId().toString());
}
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
TimelineEntity entity =
createAppAttemptEntity(event.getApplicationAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(
AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
event.getTrackingUrl());
eventInfo.put(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
event.getOriginalTrackingURL());
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
event.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO,
event.getYarnApplicationAttemptState().toString());
if (event.getMasterContainerId() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
event.getMasterContainerId().toString());
}
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private static TimelineEntity createAppAttemptEntity(
ApplicationAttemptId appAttemptId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(
AppAttemptMetricsConstants.ENTITY_TYPE);
entity.setEntityId(appAttemptId.toString());
entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
appAttemptId.getApplicationId().toString());
return entity;
}
private void publishContainerCreatedEvent(ContainerCreatedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
event.getAllocatedResource().getMemorySize());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
event.getAllocatedResource().getVirtualCores());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
event.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
event.getAllocatedNode().getPort());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
event.getAllocatedPriority().getPriority());
entityInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
event.getNodeHttpAddress());
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishContainerFinishedEvent(ContainerFinishedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
event.getContainerExitStatus());
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
event.getContainerState().toString());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
event.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
event.getAllocatedNode().getPort());
entity.setOtherInfo(entityInfo);
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private static TimelineEntity createContainerEntity(
ContainerId containerId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(
ContainerMetricsConstants.ENTITY_TYPE);
entity.setEntityId(containerId.toString());
entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
containerId.getApplicationAttemptId().toString());
return entity;
}
@Private
@VisibleForTesting
public void putEntity(TimelineEntity entity) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Publishing the entity " + entity.getEntityId() +
", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
TimelinePutResponse response = client.putEntities(entity);
List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
if (errors.size() == 0) {
LOG.debug("Timeline entities are successfully put");
} else {
for (TimelinePutResponse.TimelinePutError error : errors) {
LOG.error(
"Error when publishing entity [" + error.getEntityType() + ","
+ error.getEntityId() + "], server side error code: "
+ error.getErrorCode());
}
}
} catch (Exception e) {
LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
+ entity.getEntityId() + "]", e);
}
}
/**
* EventHandler implementation which forward events to SystemMetricsPublisher.
* Making use of it, SystemMetricsPublisher can avoid to have a public handle
* method.
*/
private final class ForwardingEventHandler implements
EventHandler<SystemMetricsEvent> {
@Override
public void handle(SystemMetricsEvent event) {
handleSystemMetricsEvent(event);
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected static class MultiThreadedDispatcher extends CompositeService
implements Dispatcher {
private List<AsyncDispatcher> dispatchers =
new ArrayList<AsyncDispatcher>();
public MultiThreadedDispatcher(int num) {
super(MultiThreadedDispatcher.class.getName());
for (int i = 0; i < num; ++i) {
AsyncDispatcher dispatcher = createDispatcher();
dispatchers.add(dispatcher);
addIfService(dispatcher);
}
}
@Override
public EventHandler getEventHandler() {
return new CompositEventHandler();
}
@Override
public void register(Class<? extends Enum> eventType, EventHandler handler) {
for (AsyncDispatcher dispatcher : dispatchers) {
dispatcher.register(eventType, handler);
}
}
public void setDrainEventsOnStop() {
for (AsyncDispatcher dispatcher : dispatchers) {
dispatcher.setDrainEventsOnStop();
}
}
private class CompositEventHandler implements EventHandler<Event> {
@Override
public void handle(Event event) {
// Use hashCode (of ApplicationId) to dispatch the event to the child
// dispatcher, such that all the writing events of one application will
// be handled by one thread, the scheduled order of the these events
// will be preserved
int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
dispatchers.get(index).getEventHandler().handle(event);
}
}
protected AsyncDispatcher createDispatcher() {
return new AsyncDispatcher();
}
}
} }

View File

@ -0,0 +1,329 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
public class TimelineServiceV1Publisher extends
AbstractTimelineServicePublisher {
private static final Log LOG = LogFactory
.getLog(TimelineServiceV1Publisher.class);
public TimelineServiceV1Publisher() {
super("TimelineserviceV1Publisher");
}
private TimelineClient client;
@Override
public void serviceInit(Configuration conf) throws Exception {
client = TimelineClient.createTimelineClient();
addIfService(client);
super.serviceInit(conf);
}
@Override
void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
TimelineEntity entity =
createApplicationEntity(event.getApplicationId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
event.getApplicationName());
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
event.getApplicationType());
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
event.getUser());
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
event.getSubmittedTime());
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
event.getAppTags());
entityInfo.put(
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
event.isUnmanagedApp());
entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
event.getApplicationPriority().getPriority());
entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
event.getAppNodeLabelsExpression());
entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
event.getAmNodeLabelsExpression());
if (event.getCallerContext() != null) {
if (event.getCallerContext().getContext() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT,
event.getCallerContext().getContext());
}
if (event.getCallerContext().getSignature() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE,
event.getCallerContext().getSignature());
}
}
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(
ApplicationMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
putEntity(entity);
}
@Override
void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
.getFinalApplicationStatus().toString());
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
.getYarnApplicationState().toString());
if (event.getLatestApplicationAttemptId() != null) {
eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
event.getLatestApplicationAttemptId().toString());
}
RMAppMetrics appMetrics = event.getAppMetrics();
entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
appMetrics.getVcoreSeconds());
entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
appMetrics.getMemorySeconds());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
@Override
void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
.getApplicationPriority().getPriority());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
@Override
void publishApplicationStateUpdatedEvent(
ApplicaitonStateUpdatedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
event.getAppState());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
@Override
void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
TimelineEntity entity = createApplicationEntity(event.getApplicationId());
TimelineEvent tEvent = new TimelineEvent();
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
event.getViewAppACLs());
entity.setOtherInfo(entityInfo);
tEvent.setEventType(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
putEntity(entity);
}
private static TimelineEntity createApplicationEntity(
ApplicationId applicationId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
entity.setEntityId(applicationId.toString());
return entity;
}
@Override
void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
TimelineEntity entity =
createAppAttemptEntity(event.getApplicationAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(
AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
event.getTrackingUrl());
eventInfo.put(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
event.getOriginalTrackingURL());
eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
event.getHost());
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
event.getRpcPort());
if (event.getMasterContainerId() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
event.getMasterContainerId().toString());
}
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
@Override
void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
TimelineEntity entity =
createAppAttemptEntity(event.getApplicationAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
event.getTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
event.getOriginalTrackingURL());
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
.getYarnApplicationAttemptState().toString());
if (event.getMasterContainerId() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
event.getMasterContainerId().toString());
}
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private static TimelineEntity createAppAttemptEntity(
ApplicationAttemptId appAttemptId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE);
entity.setEntityId(appAttemptId.toString());
entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
appAttemptId.getApplicationId().toString());
return entity;
}
@Override
void publishContainerCreatedEvent(ContainerCreatedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
event.getAllocatedResource().getMemorySize());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
.getAllocatedResource().getVirtualCores());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
.getAllocatedNode().getPort());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
event.getAllocatedPriority().getPriority());
entityInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
event.getNodeHttpAddress());
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
putEntity(entity);
}
@Override
void publishContainerFinishedEvent(ContainerFinishedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
event.getContainerExitStatus());
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
.getContainerState().toString());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
event.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
event.getAllocatedNode().getPort());
entity.setOtherInfo(entityInfo);
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private static TimelineEntity createContainerEntity(ContainerId containerId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(ContainerMetricsConstants.ENTITY_TYPE);
entity.setEntityId(containerId.toString());
entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
containerId.getApplicationAttemptId().toString());
return entity;
}
private void putEntity(TimelineEntity entity) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Publishing the entity " + entity.getEntityId()
+ ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
client.putEntities(entity);
} catch (Exception e) {
LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
+ entity.getEntityId() + "]", e);
}
}
}

View File

@ -0,0 +1,362 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* This class is responsible for posting application, appattempt & Container
* lifecycle related events to timeline service V2
*/
@Private
@Unstable
public class TimelineServiceV2Publisher extends
AbstractTimelineServicePublisher {
private static final Log LOG = LogFactory
.getLog(TimelineServiceV2Publisher.class);
protected RMTimelineCollectorManager rmTimelineCollectorManager;
public TimelineServiceV2Publisher(RMContext rmContext) {
super("TimelineserviceV2Publisher");
rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
}
private boolean publishContainerMetrics;
@Override
protected void serviceInit(Configuration conf) throws Exception {
publishContainerMetrics =
conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
super.serviceInit(conf);
}
@Override
void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
TimelineEntity entity =
createApplicationEntity(event.getApplicationId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
event.getApplicationName());
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
event.getApplicationType());
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
event.getUser());
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
event.getSubmittedTime());
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
event.getAppTags());
entityInfo.put(
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
event.isUnmanagedApp());
entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
event.getApplicationPriority().getPriority());
entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
event.getAppNodeLabelsExpression());
entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
event.getAmNodeLabelsExpression());
if (event.getCallerContext() != null) {
if (event.getCallerContext().getContext() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT,
event.getCallerContext().getContext());
}
if (event.getCallerContext().getSignature() != null) {
entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE,
event.getCallerContext().getSignature());
}
}
entity.setInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
putEntity(entity, event.getApplicationId());
}
@Override
void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
ApplicationEntity entity =
createApplicationEntity(event.getApplicationId());
RMAppMetrics appMetrics = event.getAppMetrics();
entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
appMetrics.getVcoreSeconds());
entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
appMetrics.getMemorySeconds());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
.getFinalApplicationStatus().toString());
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
.getYarnApplicationState().toString());
if (event.getLatestApplicationAttemptId() != null) {
eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
event.getLatestApplicationAttemptId().toString());
}
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getApplicationId());
//cleaning up the collector cached
event.getApp().stopTimelineCollector();
}
@Override
void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
ApplicationEntity entity =
createApplicationEntity(event.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
.getApplicationPriority().getPriority());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getApplicationId());
}
@Override
void publishApplicationStateUpdatedEvent(
ApplicaitonStateUpdatedEvent event) {
ApplicationEntity entity =
createApplicationEntity(event.getApplicationId());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
event.getAppState());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getApplicationId());
}
@Override
void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
ApplicationEntity entity =
createApplicationEntity(event.getApplicationId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
event.getViewAppACLs());
entity.setInfo(entityInfo);
putEntity(entity, event.getApplicationId());
}
private static ApplicationEntity createApplicationEntity(
ApplicationId applicationId) {
ApplicationEntity entity = new ApplicationEntity();
entity.setId(applicationId.toString());
return entity;
}
@Override
void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
TimelineEntity entity =
createAppAttemptEntity(event.getApplicationAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(
AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
event.getTrackingUrl());
eventInfo.put(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
event.getOriginalTrackingURL());
eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
event.getHost());
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
event.getRpcPort());
if (event.getMasterContainerId() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
event.getMasterContainerId().toString());
}
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getApplicationAttemptId().getApplicationId());
}
@Override
void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
ApplicationAttemptEntity entity =
createAppAttemptEntity(event.getApplicationAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
event.getTrackingUrl());
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
event.getOriginalTrackingURL());
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
.getYarnApplicationAttemptState().toString());
if (event.getMasterContainerId() != null) {
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
event.getMasterContainerId().toString());
}
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getApplicationAttemptId().getApplicationId());
}
@Override
void publishContainerCreatedEvent(ContainerCreatedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
// updated as event info instead of entity info, as entity info is updated
// by NM
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event
.getAllocatedResource().getMemorySize());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
.getAllocatedResource().getVirtualCores());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
.getAllocatedNode().getHost());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
.getAllocatedNode().getPort());
eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
event.getAllocatedPriority().getPriority());
eventInfo.put(
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
event.getNodeHttpAddress());
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getContainerId().getApplicationAttemptId()
.getApplicationId());
}
@Override
void publishContainerFinishedEvent(ContainerFinishedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
event.getContainerExitStatus());
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
.getContainerState().toString());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
event.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
event.getAllocatedNode().getPort());
entity.setInfo(entityInfo);
tEvent.setInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity, event.getContainerId().getApplicationAttemptId()
.getApplicationId());
}
private static ContainerEntity createContainerEntity(ContainerId containerId) {
ContainerEntity entity = new ContainerEntity();
entity.setId(containerId.toString());
entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION_ATTEMPT
.name(), containerId.getApplicationAttemptId().toString()));
return entity;
}
private void putEntity(TimelineEntity entity, ApplicationId appId) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
TimelineCollector timelineCollector =
rmTimelineCollectorManager.get(appId);
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entity);
timelineCollector.putEntities(entities,
UserGroupInformation.getCurrentUser());
} catch (Exception e) {
LOG.error("Error when publishing entity " + entity, e);
}
}
private static ApplicationAttemptEntity createAppAttemptEntity(
ApplicationAttemptId appAttemptId) {
ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
entity.setId(appAttemptId.toString());
entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
appAttemptId.getApplicationId().toString()));
return entity;
}
@Override
public boolean publishRMContainerMetrics() {
return publishContainerMetrics;
}
}

View File

@ -1379,8 +1379,6 @@ public class RMAppImpl implements RMApp, Recoverable {
.applicationFinished(app, finalState); .applicationFinished(app, finalState);
app.rmContext.getSystemMetricsPublisher() app.rmContext.getSystemMetricsPublisher()
.appFinished(app, finalState, app.finishTime); .appFinished(app, finalState, app.finishTime);
app.stopTimelineCollector();
}; };
} }

View File

@ -90,7 +90,7 @@ public class TestRMAppLogAggregationStatus {
rmContext = rmContext =
new RMContextImpl(rmDispatcher, null, null, null, new RMContextImpl(rmDispatcher, null, null, null,
null, null, null, null, null); null, null, null, null, null);
rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher()); rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class)); rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
rmContext rmContext

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistor
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@ -75,7 +76,7 @@ public class TestSystemMetricsPublisher {
public static void setup() throws Exception { public static void setup() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class); MemoryTimelineStore.class, TimelineStore.class);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
@ -89,7 +90,7 @@ public class TestSystemMetricsPublisher {
timelineServer.start(); timelineServer.start();
store = timelineServer.getTimelineStore(); store = timelineServer.getTimelineStore();
metricsPublisher = new SystemMetricsPublisher(); metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
metricsPublisher.init(conf); metricsPublisher.init(conf);
metricsPublisher.start(); metricsPublisher.start();
} }

View File

@ -0,0 +1,374 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher.MultiThreadedDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestSystemMetricsPublisherForV2 {
/**
* is the folder where the FileSystemTimelineWriterImpl writes the entities
*/
protected static File testRootDir = new File("target",
TestSystemMetricsPublisherForV2.class.getName() + "-localDir")
.getAbsoluteFile();
private static SystemMetricsPublisher metricsPublisher;
private static DrainDispatcher dispatcher = new DrainDispatcher();
private static final String DEFAULT_FLOW_VERSION = "1";
private static final long DEFAULT_FLOW_RUN = 1;
private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext;
private static RMTimelineCollectorManager rmTimelineCollectorManager;
@BeforeClass
public static void setup() throws Exception {
if (testRootDir.exists()) {
//cleanup before hand
FileContext.getLocalFSFileContext().delete(
new Path(testRootDir.getAbsolutePath()), true);
}
RMContext rmContext = mock(RMContext.class);
rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
rmTimelineCollectorManager = new RMTimelineCollectorManager(rmContext);
when(rmContext.getRMTimelineCollectorManager()).thenReturn(
rmTimelineCollectorManager);
Configuration conf = getTimelineV2Conf();
rmTimelineCollectorManager.init(conf);
rmTimelineCollectorManager.start();
metricsPublisher = new SystemMetricsPublisher(rmContext) {
@Override
Dispatcher createDispatcher(
TimelineServicePublisher timelineServicePublisher) {
return dispatcher;
}
};
metricsPublisher.init(conf);
metricsPublisher.start();
}
@AfterClass
public static void tearDown() throws Exception {
if (testRootDir.exists()) {
FileContext.getLocalFSFileContext().delete(
new Path(testRootDir.getAbsolutePath()), true);
}
if (rmTimelineCollectorManager != null) {
rmTimelineCollectorManager.stop();
}
if (metricsPublisher != null) {
metricsPublisher.stop();
}
}
private static Configuration getTimelineV2Conf() {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
conf.setInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2);
conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
true);
try {
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
testRootDir.getCanonicalPath());
} catch (IOException e) {
e.printStackTrace();
Assert
.fail("Exception while setting the TIMELINE_SERVICE_STORAGE_DIR_ROOT ");
}
return conf;
}
@Test
public void testSystemMetricPublisherInitialization() {
@SuppressWarnings("resource")
SystemMetricsPublisher metricsPublisher =
new SystemMetricsPublisher(mock(RMContext.class));
try {
Configuration conf = getTimelineV2Conf();
conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
false);
metricsPublisher.init(conf);
assertFalse(
"Default configuration should not publish container Metrics from RM",
metricsPublisher.isPublishContainerMetrics());
metricsPublisher.stop();
metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
conf = getTimelineV2Conf();
metricsPublisher.init(conf);
assertTrue("Expected to publish container Metrics from RM",
metricsPublisher.isPublishContainerMetrics());
assertTrue(
"MultiThreadedDispatcher expected when container Metrics is not published",
metricsPublisher.getDispatcher() instanceof MultiThreadedDispatcher);
} finally {
metricsPublisher.stop();
}
}
@Test(timeout = 1000000)
public void testPublishApplicationMetrics() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
RMApp app = createAppAndRegister(appId);
metricsPublisher.appCreated(app, app.getStartTime());
metricsPublisher.appACLsUpdated(app, "user1,user2", 4L);
metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
dispatcher.await();
String outputDirApp =
getTimelineEntityDir(app) + "/" + TimelineEntityType.YARN_APPLICATION
+ "/";
File entityFolder = new File(outputDirApp);
Assert.assertTrue(entityFolder.isDirectory());
// file name is <entityId>.thist
String timelineServiceFileName =
appId.toString()
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists());
Assert.assertEquals("Expected 3 events to be published", 3,
getNumOfNonEmptyLines(appFile));
}
@Test(timeout = 10000)
public void testPublishAppAttemptMetrics() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
RMApp app = rmAppsMapInContext.get(appId);
if (app == null) {
app = createAppAndRegister(appId);
}
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId);
metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
when(app.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED,
app, Integer.MAX_VALUE + 2L);
dispatcher.await();
String outputDirApp =
getTimelineEntityDir(app) + "/"
+ TimelineEntityType.YARN_APPLICATION_ATTEMPT + "/";
File entityFolder = new File(outputDirApp);
Assert.assertTrue(entityFolder.isDirectory());
// file name is <entityId>.thist
String timelineServiceFileName =
appAttemptId.toString()
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists());
Assert.assertEquals("Expected 2 events to be published", 2,
getNumOfNonEmptyLines(appFile));
}
@Test(timeout = 10000)
public void testPublishContainerMetrics() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
RMApp app = rmAppsMapInContext.get(appId);
if (app == null) {
app = createAppAndRegister(appId);
}
ContainerId containerId =
ContainerId.newContainerId(ApplicationAttemptId.newInstance(
appId, 1), 1);
RMContainer container = createRMContainer(containerId);
metricsPublisher.containerCreated(container, container.getCreationTime());
metricsPublisher.containerFinished(container, container.getFinishTime());
dispatcher.await();
String outputDirApp =
getTimelineEntityDir(app) + "/"
+ TimelineEntityType.YARN_CONTAINER + "/";
File entityFolder = new File(outputDirApp);
Assert.assertTrue(entityFolder.isDirectory());
// file name is <entityId>.thist
String timelineServiceFileName =
containerId.toString()
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists());
Assert.assertEquals("Expected 2 events to be published", 2,
getNumOfNonEmptyLines(appFile));
}
private RMApp createAppAndRegister(ApplicationId appId) {
RMApp app = createRMApp(appId);
// some stuff which are currently taken care in RMAppImpl
rmAppsMapInContext.putIfAbsent(appId, app);
AppLevelTimelineCollector collector = new AppLevelTimelineCollector(appId);
rmTimelineCollectorManager.putIfAbsent(appId, collector);
return app;
}
private long getNumOfNonEmptyLines(File entityFile) throws IOException {
BufferedReader reader = null;
String strLine;
long count = 0;
try {
reader = new BufferedReader(new FileReader(entityFile));
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().length() > 0)
count++;
}
} finally {
reader.close();
}
return count;
}
private String getTimelineEntityDir(RMApp app) {
String outputDirApp =
testRootDir.getAbsolutePath()+"/"
+ FileSystemTimelineWriterImpl.ENTITIES_DIR
+ "/"
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID
+ "/"
+ app.getUser()
+ "/"
+ TimelineUtils.generateDefaultFlowIdBasedOnAppId(app
.getApplicationId()) + "/" + DEFAULT_FLOW_VERSION + "/"
+ DEFAULT_FLOW_RUN + "/" + app.getApplicationId();
return outputDirApp;
}
private static RMApp createRMApp(ApplicationId appId) {
RMApp app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
when(app.getName()).thenReturn("test app");
when(app.getApplicationType()).thenReturn("test app type");
when(app.getUser()).thenReturn("testUser");
when(app.getQueue()).thenReturn("test queue");
when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L);
when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L);
when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L);
when(app.getDiagnostics()).thenReturn(
new StringBuilder("test diagnostics info"));
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(
ApplicationAttemptId.newInstance(appId, 1));
when(app.getCurrentAppAttempt()).thenReturn(appAttempt);
when(app.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
when(app.getRMAppMetrics()).thenReturn(
new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE));
when(app.getApplicationTags()).thenReturn(Collections.<String> emptySet());
return app;
}
private static RMAppAttempt createRMAppAttempt(
ApplicationAttemptId appAttemptId) {
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
when(appAttempt.getHost()).thenReturn("test host");
when(appAttempt.getRpcPort()).thenReturn(-100);
Container container = mock(Container.class);
when(container.getId()).thenReturn(
ContainerId.newContainerId(appAttemptId, 1));
when(appAttempt.getMasterContainer()).thenReturn(container);
when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
when(appAttempt.getOriginalTrackingUrl()).thenReturn(
"test original tracking url");
return appAttempt;
}
private static RMContainer createRMContainer(ContainerId containerId) {
RMContainer container = mock(RMContainer.class);
when(container.getContainerId()).thenReturn(containerId);
when(container.getAllocatedNode()).thenReturn(
NodeId.newInstance("test host", -100));
when(container.getAllocatedResource()).thenReturn(
Resource.newInstance(-1, -1));
when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED);
when(container.getCreationTime()).thenReturn(Integer.MAX_VALUE + 1L);
when(container.getFinishTime()).thenReturn(Integer.MAX_VALUE + 2L);
when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info");
when(container.getContainerExitStatus()).thenReturn(-1);
when(container.getContainerState()).thenReturn(ContainerState.COMPLETE);
Container mockContainer = mock(Container.class);
when(container.getContainer()).thenReturn(mockContainer);
when(mockContainer.getNodeHttpAddress())
.thenReturn("http://localhost:1234");
return container;
}
}

View File

@ -124,6 +124,12 @@
<dependency> <dependency>
<groupId>org.apache.phoenix</groupId> <groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId> <artifactId>phoenix-core</artifactId>
<exclusions>
<exclusion>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>

View File

@ -55,7 +55,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService
public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
= "/tmp/timeline_service_data"; = "/tmp/timeline_service_data";
private static final String ENTITIES_DIR = "entities"; public static final String ENTITIES_DIR = "entities";
/** Default extension for output files */ /** Default extension for output files */
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
@ -76,7 +76,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService
return response; return response;
} }
private void write(String clusterId, String userId, String flowName, private synchronized void write(String clusterId, String userId, String flowName,
String flowVersion, long flowRun, String appId, TimelineEntity entity, String flowVersion, long flowRun, String appId, TimelineEntity entity,
TimelineWriteResponse response) throws IOException { TimelineWriteResponse response) throws IOException {
PrintWriter out = null; PrintWriter out = null;