From f3c661e8dddc80726f1084ff27815d179540889c Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Sat, 13 Jun 2015 11:32:41 -0700 Subject: [PATCH] YARN-3044. Made RM write app, attempt and optional container lifecycle events to timeline service v2. Contributed by Naganarasimha G R. --- .../timelineservice/TimelineEntity.java | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 10 + .../TestDistributedShell.java | 116 ++-- .../metrics/ContainerMetricsConstants.java | 10 + .../TestAppLogAggregatorImpl.java | 4 +- .../resourcemanager/ResourceManager.java | 2 +- .../AbstractTimelineServicePublisher.java | 191 +++++++ .../metrics/ApplicationFinishedEvent.java | 13 +- .../metrics/SystemMetricsPublisher.java | 511 +++--------------- .../metrics/TimelineServiceV1Publisher.java | 329 +++++++++++ .../metrics/TimelineServiceV2Publisher.java | 362 +++++++++++++ .../resourcemanager/rmapp/RMAppImpl.java | 2 - .../TestRMAppLogAggregationStatus.java | 2 +- .../metrics/TestSystemMetricsPublisher.java | 5 +- .../TestSystemMetricsPublisherForV2.java | 374 +++++++++++++ .../pom.xml | 6 + .../storage/FileSystemTimelineWriterImpl.java | 4 +- 17 files changed, 1470 insertions(+), 474 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java index defadec4a8e..a641f32975d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java @@ -470,4 +470,7 @@ public class TimelineEntity { return real == null ? this : real; } + public String toString() { + return identifier.toString(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4f43e3d9ffd..f06d6c95817 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -500,6 +500,16 @@ public class YarnConfiguration extends Configuration { + "system-metrics-publisher.enabled"; public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false; + /** + * The setting that controls whether yarn container metrics is published to + * the timeline server or not by RM. This configuration setting is for ATS + * V2 + */ + public static final String RM_PUBLISH_CONTAINER_METRICS_ENABLED = YARN_PREFIX + + "rm.system-metrics-publisher.emit-container-events"; + public static final boolean DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED = + false; + public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size"; public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 37dd4dea5e2..aac8c4bf31e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -74,6 +74,8 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; import org.apache.hadoop.yarn.server.timeline.NameValuePair; @@ -86,7 +88,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineW import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; - import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -140,6 +141,7 @@ public class TestDistributedShell { // disable aux-service based timeline aggregators conf.set(YarnConfiguration.NM_AUX_SERVICES, ""); + conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8"); conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); conf.set("mapreduce.jobhistory.address", @@ -494,50 +496,98 @@ public class TestDistributedShell { "/1/1/" : "/test_flow_name/test_flow_version/12345678/") + appId.toString(); // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs - String outputDirApp = basePath + "/DS_APP_ATTEMPT/"; - - File entityFolder = new File(outputDirApp); - Assert.assertTrue(entityFolder.isDirectory()); + // Verify DS_APP_ATTEMPT entities posted by the client // there will be at least one attempt, look for that file - String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp() - + "_000" + appId.getId() + "_000001" - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - String appAttemptFileName = outputDirApp + appTimestampFileName; - File appAttemptFile = new File(appAttemptFileName); - Assert.assertTrue(appAttemptFile.exists()); + String appTimestampFileName = + "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId() + + "_000001" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT", + appTimestampFileName); - String outputDirContainer = basePath + "/DS_CONTAINER/"; - File containerFolder = new File(outputDirContainer); - Assert.assertTrue(containerFolder.isDirectory()); - - String containerTimestampFileName = "container_" - + appId.getClusterTimestamp() + "_000" + appId.getId() - + "_01_000002.thist"; - String containerFileName = outputDirContainer + containerTimestampFileName; - File containerFile = new File(containerFileName); - Assert.assertTrue(containerFile.exists()); + // Verify DS_CONTAINER entities posted by the client + String containerTimestampFileName = + "container_" + appId.getClusterTimestamp() + "_000" + appId.getId() + + "_01_000002.thist"; + verifyEntityTypeFileExists(basePath, "DS_CONTAINER", + containerTimestampFileName); // Verify NM posting container metrics info. - String outputDirContainerMetrics = basePath + "/" + - TimelineEntityType.YARN_CONTAINER + "/"; - File containerMetricsFolder = new File(outputDirContainerMetrics); - Assert.assertTrue(containerMetricsFolder.isDirectory()); + String containerMetricsTimestampFileName = + "container_" + appId.getClusterTimestamp() + "_000" + appId.getId() + + "_01_000001" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + verifyEntityTypeFileExists(basePath, + TimelineEntityType.YARN_CONTAINER.toString(), + containerMetricsTimestampFileName); - String containerMetricsTimestampFileName = "container_" - + appId.getClusterTimestamp() + "_000" + appId.getId() - + "_01_000001.thist"; - String containerMetricsFileName = outputDirContainerMetrics + - containerMetricsTimestampFileName; - - File containerMetricsFile = new File(containerMetricsFileName); - Assert.assertTrue(containerMetricsFile.exists()); + // Verify RM posting Application life cycle Events are getting published + String appMetricsTimestampFileName = + "application_" + appId.getClusterTimestamp() + "_000" + appId.getId() + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File appEntityFile = + verifyEntityTypeFileExists(basePath, + TimelineEntityType.YARN_APPLICATION.toString(), + appMetricsTimestampFileName); + verifyStringExistsSpecifiedTimes(appEntityFile, + ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, + "Application created event should be published atleast once"); + verifyStringExistsSpecifiedTimes(appEntityFile, + ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, + "Application finished event should be published atleast once"); + // Verify RM posting AppAttempt life cycle Events are getting published + String appAttemptMetricsTimestampFileName = + "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId() + + "_000001" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File appAttemptEntityFile = + verifyEntityTypeFileExists(basePath, + TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), + appAttemptMetricsTimestampFileName); + verifyStringExistsSpecifiedTimes(appAttemptEntityFile, + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, + "AppAttempt register event should be published atleast once"); + verifyStringExistsSpecifiedTimes(appAttemptEntityFile, + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, + "AppAttempt finished event should be published atleast once"); } finally { FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); } } + private File verifyEntityTypeFileExists(String basePath, String entityType, + String entityfileName) { + String outputDirPathForEntity = basePath + "/" + entityType + "/"; + File outputDirForEntity = new File(outputDirPathForEntity); + Assert.assertTrue(outputDirForEntity.isDirectory()); + + String entityFilePath = outputDirPathForEntity + entityfileName; + + File entityFile = new File(entityFilePath); + Assert.assertTrue(entityFile.exists()); + return entityFile; + } + + private void verifyStringExistsSpecifiedTimes(File entityFile, + String searchString, long expectedNumOfTimes, String errorMsg) + throws IOException { + BufferedReader reader = null; + String strLine; + long actualCount = 0; + try { + reader = new BufferedReader(new FileReader(entityFile)); + while ((strLine = reader.readLine()) != null) { + if (strLine.trim().contains(searchString)) + actualCount++; + } + } finally { + reader.close(); + } + Assert.assertEquals(errorMsg, expectedNumOfTimes, actualCount); + } + /** * Utility function to merge two String arrays to form a new String array for * our argumemts. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java index 0d5540df6bc..7b429948fbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java @@ -27,10 +27,20 @@ public class ContainerMetricsConstants { public static final String ENTITY_TYPE = "YARN_CONTAINER"; + // Event of this type will be emitted by NM. public static final String CREATED_EVENT_TYPE = "YARN_CONTAINER_CREATED"; + // Event of this type will be emitted by RM. + public static final String CREATED_IN_RM_EVENT_TYPE = + "YARN_RM_CONTAINER_CREATED"; + + // Event of this type will be emitted by NM. public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED"; + // Event of this type will be emitted by RM. + public static final String FINISHED_IN_RM_EVENT_TYPE = + "YARN_RM_CONTAINER_FINISHED"; + public static final String PARENT_PRIMARIY_FILTER = "YARN_CONTAINER_PARENT"; public static final String ALLOCATED_MEMORY_ENTITY_INFO = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java index f929ca86fda..88d9688e9a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -393,7 +393,7 @@ public class TestAppLogAggregatorImpl { new NMTokenSecretManagerInNM(), null, new ApplicationACLsManager(conf), - new NMNullStateStoreService(), false); + new NMNullStateStoreService(), false, conf); } private static final class AppLogAggregatorInTest extends @@ -431,4 +431,4 @@ public class TestAppLogAggregatorImpl { return spy(new LogWriter(conf, remoteAppLogFile, ugi)); } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6bc95a5b915..97fd7e93357 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -465,7 +465,7 @@ public class ResourceManager extends CompositeService implements Recoverable { } protected SystemMetricsPublisher createSystemMetricsPublisher() { - return new SystemMetricsPublisher(); + return new SystemMetricsPublisher(rmContext); } // sanity check for configurations diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java new file mode 100644 index 00000000000..3d041f668bb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.TimelineServicePublisher; + +public abstract class AbstractTimelineServicePublisher extends CompositeService + implements TimelineServicePublisher, EventHandler { + + private static final Log LOG = LogFactory + .getLog(TimelineServiceV2Publisher.class); + + private Configuration conf; + + public AbstractTimelineServicePublisher(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + @Override + public void handle(SystemMetricsEvent event) { + switch (event.getType()) { + case APP_CREATED: + publishApplicationCreatedEvent((ApplicationCreatedEvent) event); + break; + case APP_FINISHED: + publishApplicationFinishedEvent((ApplicationFinishedEvent) event); + break; + case APP_UPDATED: + publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event); + break; + case APP_STATE_UPDATED: + publishApplicationStateUpdatedEvent( + (ApplicaitonStateUpdatedEvent)event); + break; + case APP_ACLS_UPDATED: + publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); + break; + case APP_ATTEMPT_REGISTERED: + publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); + break; + case APP_ATTEMPT_FINISHED: + publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); + break; + case CONTAINER_CREATED: + publishContainerCreatedEvent((ContainerCreatedEvent) event); + break; + case CONTAINER_FINISHED: + publishContainerFinishedEvent((ContainerFinishedEvent) event); + break; + default: + LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); + } + } + + abstract void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event); + + abstract void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event); + + abstract void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event); + + abstract void publishApplicationStateUpdatedEvent( + ApplicaitonStateUpdatedEvent event); + + abstract void publishApplicationACLsUpdatedEvent( + ApplicationACLsUpdatedEvent event); + + abstract void publishApplicationFinishedEvent(ApplicationFinishedEvent event); + + abstract void publishApplicationCreatedEvent(ApplicationCreatedEvent event); + + abstract void publishContainerCreatedEvent(ContainerCreatedEvent event); + + abstract void publishContainerFinishedEvent(ContainerFinishedEvent event); + + @Override + public Dispatcher getDispatcher() { + MultiThreadedDispatcher dispatcher = + new MultiThreadedDispatcher( + conf.getInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); + dispatcher.setDrainEventsOnStop(); + return dispatcher; + } + + @Override + public boolean publishRMContainerMetrics() { + return true; + } + + @Override + public EventHandler getEventHandler() { + return this; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static class MultiThreadedDispatcher extends CompositeService + implements Dispatcher { + + private List dispatchers = + new ArrayList(); + + public MultiThreadedDispatcher(int num) { + super(MultiThreadedDispatcher.class.getName()); + for (int i = 0; i < num; ++i) { + AsyncDispatcher dispatcher = createDispatcher(); + dispatchers.add(dispatcher); + addIfService(dispatcher); + } + } + + @Override + public EventHandler getEventHandler() { + return new CompositEventHandler(); + } + + @Override + public void register(Class eventType, EventHandler handler) { + for (AsyncDispatcher dispatcher : dispatchers) { + dispatcher.register(eventType, handler); + } + } + + public void setDrainEventsOnStop() { + for (AsyncDispatcher dispatcher : dispatchers) { + dispatcher.setDrainEventsOnStop(); + } + } + + private class CompositEventHandler implements EventHandler { + + @Override + public void handle(Event event) { + // Use hashCode (of ApplicationId) to dispatch the event to the child + // dispatcher, such that all the writing events of one application will + // be handled by one thread, the scheduled order of the these events + // will be preserved + int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size(); + dispatchers.get(index).getEventHandler().handle(event); + } + } + + protected AsyncDispatcher createDispatcher() { + return new AsyncDispatcher(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java index 8d75f92e426..d9241b23c79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java @@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; public class ApplicationFinishedEvent extends @@ -33,6 +35,7 @@ public class ApplicationFinishedEvent extends private YarnApplicationState state; private ApplicationAttemptId latestAppAttemptId; private RMAppMetrics appMetrics; + private RMAppImpl app; public ApplicationFinishedEvent( ApplicationId appId, @@ -41,14 +44,16 @@ public class ApplicationFinishedEvent extends YarnApplicationState state, ApplicationAttemptId latestAppAttemptId, long finishedTime, - RMAppMetrics appMetrics) { + RMAppMetrics appMetrics, + RMAppImpl app) { super(SystemMetricsEventType.APP_FINISHED, finishedTime); this.appId = appId; this.diagnosticsInfo = diagnosticsInfo; this.appStatus = appStatus; this.latestAppAttemptId = latestAppAttemptId; this.state = state; - this.appMetrics=appMetrics; + this.appMetrics = appMetrics; + this.app = app; } @Override @@ -56,6 +61,10 @@ public class ApplicationFinishedEvent extends return appId.hashCode(); } + public RMAppImpl getApp() { + return app; + } + public ApplicationId getApplicationId() { return appId; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java index 37708546701..15960f70365 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java @@ -18,10 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.metrics; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,8 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -38,27 +32,25 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting; /** - * The class that helps RM publish metrics to the timeline server V1. RM will + * The class that helps RM publish metrics to the timeline server. RM will * always invoke the methods of this class regardless the service is enabled or * not. If it is disabled, publishing requests will be ignored silently. */ @@ -70,30 +62,38 @@ public class SystemMetricsPublisher extends CompositeService { .getLog(SystemMetricsPublisher.class); private Dispatcher dispatcher; - private TimelineClient client; - private boolean publishSystemMetricsToATSv1; + private boolean publishSystemMetrics; + private boolean publishContainerMetrics; + protected RMContext rmContext; - public SystemMetricsPublisher() { + public SystemMetricsPublisher(RMContext rmContext) { super(SystemMetricsPublisher.class.getName()); + this.rmContext = rmContext; } @Override protected void serviceInit(Configuration conf) throws Exception { - publishSystemMetricsToATSv1 = + publishSystemMetrics = conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) - && conf.getBoolean( - YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, - YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); - - if (publishSystemMetricsToATSv1) { - client = TimelineClient.createTimelineClient(); - addIfService(client); - - dispatcher = createDispatcher(conf); - dispatcher.register(SystemMetricsEventType.class, - new ForwardingEventHandler()); - addIfService(dispatcher); + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); + if (publishSystemMetrics) { + TimelineServicePublisher timelineServicePublisher = + getTimelineServicePublisher(conf); + if (timelineServicePublisher != null) { + addService(timelineServicePublisher); + // init required to be called so that other methods of + // TimelineServicePublisher can be utilized + timelineServicePublisher.init(conf); + dispatcher = createDispatcher(timelineServicePublisher); + publishContainerMetrics = + timelineServicePublisher.publishRMContainerMetrics(); + dispatcher.register(SystemMetricsEventType.class, + timelineServicePublisher.getEventHandler()); + addIfService(dispatcher); + } else { + LOG.info("TimelineServicePublisher is not configured"); + publishSystemMetrics = false; + } LOG.info("YARN system metrics publishing service is enabled"); } else { LOG.info("YARN system metrics publishing service is not enabled"); @@ -101,9 +101,26 @@ public class SystemMetricsPublisher extends CompositeService { super.serviceInit(conf); } + @VisibleForTesting + Dispatcher createDispatcher(TimelineServicePublisher timelineServicePublisher) { + return timelineServicePublisher.getDispatcher(); + } + + TimelineServicePublisher getTimelineServicePublisher(Configuration conf) { + if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) { + return new TimelineServiceV1Publisher(); + } else if (conf.getBoolean( + YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) { + return new TimelineServiceV2Publisher(rmContext); + } + return null; + } + @SuppressWarnings("unchecked") public void appCreated(RMApp app, long createdTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { ApplicationSubmissionContext appSubmissionContext = app.getApplicationSubmissionContext(); dispatcher.getEventHandler().handle( @@ -125,7 +142,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void appUpdated(RMApp app, long updatedTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler() .handle(new ApplicationUpdatedEvent(app.getApplicationId(), app.getQueue(), updatedTime, @@ -135,7 +152,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void appFinished(RMApp app, RMAppState state, long finishedTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler().handle( new ApplicationFinishedEvent( app.getApplicationId(), @@ -145,14 +162,15 @@ public class SystemMetricsPublisher extends CompositeService { app.getCurrentAppAttempt() == null ? null : app.getCurrentAppAttempt().getAppAttemptId(), finishedTime, - app.getRMAppMetrics())); + app.getRMAppMetrics(), + (RMAppImpl)app)); } } @SuppressWarnings("unchecked") public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler().handle( new ApplicationACLsUpdatedEvent( app.getApplicationId(), @@ -164,7 +182,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void appStateUpdated(RMApp app, YarnApplicationState appState, long updatedTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler().handle( new ApplicaitonStateUpdatedEvent( app.getApplicationId(), @@ -176,7 +194,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void appAttemptRegistered(RMAppAttempt appAttempt, long registeredTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { ContainerId container = (appAttempt.getMasterContainer() == null) ? null : appAttempt.getMasterContainer().getId(); dispatcher.getEventHandler().handle( @@ -194,7 +212,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void appAttemptFinished(RMAppAttempt appAttempt, RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { ContainerId container = (appAttempt.getMasterContainer() == null) ? null : appAttempt.getMasterContainer().getId(); dispatcher.getEventHandler().handle( @@ -214,7 +232,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void containerCreated(RMContainer container, long createdTime) { - if (publishSystemMetricsToATSv1) { + if (publishContainerMetrics) { dispatcher.getEventHandler().handle( new ContainerCreatedEvent( container.getContainerId(), @@ -227,7 +245,7 @@ public class SystemMetricsPublisher extends CompositeService { @SuppressWarnings("unchecked") public void containerFinished(RMContainer container, long finishedTime) { - if (publishSystemMetricsToATSv1) { + if (publishContainerMetrics) { dispatcher.getEventHandler().handle( new ContainerFinishedEvent( container.getContainerId(), @@ -238,396 +256,31 @@ public class SystemMetricsPublisher extends CompositeService { } } - protected Dispatcher createDispatcher(Configuration conf) { - MultiThreadedDispatcher dispatcher = - new MultiThreadedDispatcher( - conf.getInt( - YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, - YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); - dispatcher.setDrainEventsOnStop(); + @VisibleForTesting + boolean isPublishContainerMetrics() { + return publishContainerMetrics; + } + + @VisibleForTesting + Dispatcher getDispatcher() { return dispatcher; } - protected void handleSystemMetricsEvent( - SystemMetricsEvent event) { - switch (event.getType()) { - case APP_CREATED: - publishApplicationCreatedEvent((ApplicationCreatedEvent) event); - break; - case APP_FINISHED: - publishApplicationFinishedEvent((ApplicationFinishedEvent) event); - break; - case APP_ACLS_UPDATED: - publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); - break; - case APP_UPDATED: - publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event); - break; - case APP_STATE_UPDATED: - publishApplicationStateUpdatedEvent( - (ApplicaitonStateUpdatedEvent)event); - break; - case APP_ATTEMPT_REGISTERED: - publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); - break; - case APP_ATTEMPT_FINISHED: - publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); - break; - case CONTAINER_CREATED: - publishContainerCreatedEvent((ContainerCreatedEvent) event); - break; - case CONTAINER_FINISHED: - publishContainerFinishedEvent((ContainerFinishedEvent) event); - break; - default: - LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); - } + interface TimelineServicePublisher extends Service { + /** + * @return the Dispatcher which needs to be used to dispatch events + */ + Dispatcher getDispatcher(); + + /** + * @return true if RMContainerMetricsNeeds to be sent + */ + boolean publishRMContainerMetrics(); + + /** + * @return EventHandler which needs to be registered to the dispatcher to + * handle the SystemMetricsEvent + */ + EventHandler getEventHandler(); } - - private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); - Map entityInfo = new HashMap(); - entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, - event.getApplicationName()); - entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, - event.getApplicationType()); - entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, - event.getUser()); - entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, - event.getQueue()); - entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, - event.getSubmittedTime()); - entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, - event.getAppTags()); - entityInfo.put( - ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, - event.isUnmanagedApp()); - entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, - event.getApplicationPriority().getPriority()); - entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, - event.getAppNodeLabelsExpression()); - entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, - event.getAmNodeLabelsExpression()); - if (event.getCallerContext() != null) { - if (event.getCallerContext().getContext() != null) { - entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT, - event.getCallerContext().getContext()); - } - if (event.getCallerContext().getSignature() != null) { - entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE, - event.getCallerContext().getSignature()); - } - } - entity.setOtherInfo(entityInfo); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - ApplicationMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, - event.getFinalApplicationStatus().toString()); - eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, - event.getYarnApplicationState().toString()); - if (event.getLatestApplicationAttemptId() != null) { - eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, - event.getLatestApplicationAttemptId().toString()); - } - RMAppMetrics appMetrics = event.getAppMetrics(); - entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS, - appMetrics.getVcoreSeconds()); - entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS, - appMetrics.getMemorySeconds()); - - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) { - TimelineEntity entity = createApplicationEntity(event.getApplicationId()); - Map eventInfo = new HashMap(); - eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, - event.getQueue()); - eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event - .getApplicationPriority().getPriority()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishApplicationStateUpdatedEvent( - ApplicaitonStateUpdatedEvent event) { - TimelineEntity entity = createApplicationEntity(event.getApplicationId()); - Map eventInfo = new HashMap(); - eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, - event.getAppState()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishApplicationACLsUpdatedEvent( - ApplicationACLsUpdatedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); - TimelineEvent tEvent = new TimelineEvent(); - Map entityInfo = new HashMap(); - entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, - event.getViewAppACLs()); - entity.setOtherInfo(entityInfo); - tEvent.setEventType( - ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - entity.addEvent(tEvent); - putEntity(entity); - } - - private static TimelineEntity createApplicationEntity( - ApplicationId applicationId) { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); - entity.setEntityId(applicationId.toString()); - return entity; - } - - private void - publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { - TimelineEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put( - AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); - eventInfo.put( - AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); - eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, - event.getHost()); - eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, - event.getRpcPort()); - if (event.getMasterContainerId() != null) { - eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, - event.getMasterContainerId().toString()); - } - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { - TimelineEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put( - AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); - eventInfo.put( - AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); - eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, - event.getFinalApplicationStatus().toString()); - eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, - event.getYarnApplicationAttemptState().toString()); - if (event.getMasterContainerId() != null) { - eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, - event.getMasterContainerId().toString()); - } - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private static TimelineEntity createAppAttemptEntity( - ApplicationAttemptId appAttemptId) { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityType( - AppAttemptMetricsConstants.ENTITY_TYPE); - entity.setEntityId(appAttemptId.toString()); - entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, - appAttemptId.getApplicationId().toString()); - return entity; - } - - private void publishContainerCreatedEvent(ContainerCreatedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - Map entityInfo = new HashMap(); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, - event.getAllocatedResource().getMemorySize()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, - event.getAllocatedResource().getVirtualCores()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, - event.getAllocatedNode().getHost()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, - event.getAllocatedNode().getPort()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, - event.getAllocatedPriority().getPriority()); - entityInfo.put( - ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, - event.getNodeHttpAddress()); - entity.setOtherInfo(entityInfo); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishContainerFinishedEvent(ContainerFinishedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, - event.getContainerExitStatus()); - eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, - event.getContainerState().toString()); - Map entityInfo = new HashMap(); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, - event.getAllocatedNode().getHost()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, - event.getAllocatedNode().getPort()); - entity.setOtherInfo(entityInfo); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private static TimelineEntity createContainerEntity( - ContainerId containerId) { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityType( - ContainerMetricsConstants.ENTITY_TYPE); - entity.setEntityId(containerId.toString()); - entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER, - containerId.getApplicationAttemptId().toString()); - return entity; - } - - @Private - @VisibleForTesting - public void putEntity(TimelineEntity entity) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Publishing the entity " + entity.getEntityId() + - ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); - } - TimelinePutResponse response = client.putEntities(entity); - List errors = response.getErrors(); - if (errors.size() == 0) { - LOG.debug("Timeline entities are successfully put"); - } else { - for (TimelinePutResponse.TimelinePutError error : errors) { - LOG.error( - "Error when publishing entity [" + error.getEntityType() + "," - + error.getEntityId() + "], server side error code: " - + error.getErrorCode()); - } - } - } catch (Exception e) { - LOG.error("Error when publishing entity [" + entity.getEntityType() + "," - + entity.getEntityId() + "]", e); - } - } - - /** - * EventHandler implementation which forward events to SystemMetricsPublisher. - * Making use of it, SystemMetricsPublisher can avoid to have a public handle - * method. - */ - private final class ForwardingEventHandler implements - EventHandler { - - @Override - public void handle(SystemMetricsEvent event) { - handleSystemMetricsEvent(event); - } - - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - protected static class MultiThreadedDispatcher extends CompositeService - implements Dispatcher { - - private List dispatchers = - new ArrayList(); - - public MultiThreadedDispatcher(int num) { - super(MultiThreadedDispatcher.class.getName()); - for (int i = 0; i < num; ++i) { - AsyncDispatcher dispatcher = createDispatcher(); - dispatchers.add(dispatcher); - addIfService(dispatcher); - } - } - - @Override - public EventHandler getEventHandler() { - return new CompositEventHandler(); - } - - @Override - public void register(Class eventType, EventHandler handler) { - for (AsyncDispatcher dispatcher : dispatchers) { - dispatcher.register(eventType, handler); - } - } - - public void setDrainEventsOnStop() { - for (AsyncDispatcher dispatcher : dispatchers) { - dispatcher.setDrainEventsOnStop(); - } - } - - private class CompositEventHandler implements EventHandler { - - @Override - public void handle(Event event) { - // Use hashCode (of ApplicationId) to dispatch the event to the child - // dispatcher, such that all the writing events of one application will - // be handled by one thread, the scheduled order of the these events - // will be preserved - int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size(); - dispatchers.get(index).getEventHandler().handle(event); - } - - } - - protected AsyncDispatcher createDispatcher() { - return new AsyncDispatcher(); - } - - } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java new file mode 100644 index 00000000000..6cada5561f9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +public class TimelineServiceV1Publisher extends + AbstractTimelineServicePublisher { + + private static final Log LOG = LogFactory + .getLog(TimelineServiceV1Publisher.class); + + public TimelineServiceV1Publisher() { + super("TimelineserviceV1Publisher"); + } + + private TimelineClient client; + + @Override + public void serviceInit(Configuration conf) throws Exception { + client = TimelineClient.createTimelineClient(); + addIfService(client); + super.serviceInit(conf); + } + + @Override + void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { + TimelineEntity entity = + createApplicationEntity(event.getApplicationId()); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, + event.getApplicationName()); + entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, + event.getApplicationType()); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, + event.getUser()); + entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + event.getQueue()); + entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, + event.getSubmittedTime()); + entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, + event.getAppTags()); + entityInfo.put( + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, + event.isUnmanagedApp()); + entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + event.getApplicationPriority().getPriority()); + entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, + event.getAppNodeLabelsExpression()); + entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, + event.getAmNodeLabelsExpression()); + if (event.getCallerContext() != null) { + if (event.getCallerContext().getContext() != null) { + entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT, + event.getCallerContext().getContext()); + } + if (event.getCallerContext().getSignature() != null) { + entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE, + event.getCallerContext().getSignature()); + } + } + entity.setOtherInfo(entityInfo); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType( + ApplicationMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + putEntity(entity); + } + + @Override + void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { + TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event + .getFinalApplicationStatus().toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event + .getYarnApplicationState().toString()); + if (event.getLatestApplicationAttemptId() != null) { + eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, + event.getLatestApplicationAttemptId().toString()); + } + RMAppMetrics appMetrics = event.getAppMetrics(); + entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS, + appMetrics.getVcoreSeconds()); + entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS, + appMetrics.getMemorySeconds()); + tEvent.setEventInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity); + } + + @Override + void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) { + TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + event.getQueue()); + eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event + .getApplicationPriority().getPriority()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + @Override + void publishApplicationStateUpdatedEvent( + ApplicaitonStateUpdatedEvent event) { + TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + event.getAppState()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + @Override + void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) { + TimelineEntity entity = createApplicationEntity(event.getApplicationId()); + + TimelineEvent tEvent = new TimelineEvent(); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, + event.getViewAppACLs()); + entity.setOtherInfo(entityInfo); + tEvent.setEventType(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + + entity.addEvent(tEvent); + putEntity(entity); + } + + private static TimelineEntity createApplicationEntity( + ApplicationId applicationId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); + entity.setEntityId(applicationId.toString()); + return entity; + } + + @Override + void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { + TimelineEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType( + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put( + AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, + event.getHost()); + eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, + event.getRpcPort()); + if (event.getMasterContainerId() != null) { + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + event.getMasterContainerId().toString()); + } + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + @Override + void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { + TimelineEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event + .getFinalApplicationStatus().toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event + .getYarnApplicationAttemptState().toString()); + if (event.getMasterContainerId() != null) { + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + event.getMasterContainerId().toString()); + } + tEvent.setEventInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity); + } + + private static TimelineEntity createAppAttemptEntity( + ApplicationAttemptId appAttemptId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE); + entity.setEntityId(appAttemptId.toString()); + entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, + appAttemptId.getApplicationId().toString()); + return entity; + } + + @Override + void publishContainerCreatedEvent(ContainerCreatedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + event.getAllocatedResource().getMemorySize()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event + .getAllocatedResource().getVirtualCores()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event + .getAllocatedNode().getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event + .getAllocatedNode().getPort()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + event.getAllocatedPriority().getPriority()); + entityInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + event.getNodeHttpAddress()); + entity.setOtherInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + + entity.addEvent(tEvent); + putEntity(entity); + } + + @Override + void publishContainerFinishedEvent(ContainerFinishedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + event.getContainerExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event + .getContainerState().toString()); + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + event.getAllocatedNode().getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + event.getAllocatedNode().getPort()); + entity.setOtherInfo(entityInfo); + tEvent.setEventInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity); + } + + private static TimelineEntity createContainerEntity(ContainerId containerId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(ContainerMetricsConstants.ENTITY_TYPE); + entity.setEntityId(containerId.toString()); + entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER, + containerId.getApplicationAttemptId().toString()); + return entity; + } + + private void putEntity(TimelineEntity entity) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing the entity " + entity.getEntityId() + + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + client.putEntities(entity); + } catch (Exception e) { + LOG.error("Error when publishing entity [" + entity.getEntityType() + "," + + entity.getEntityId() + "]", e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java new file mode 100644 index 00000000000..bbd32f325c0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +/** + * This class is responsible for posting application, appattempt & Container + * lifecycle related events to timeline service V2 + */ +@Private +@Unstable +public class TimelineServiceV2Publisher extends + AbstractTimelineServicePublisher { + private static final Log LOG = LogFactory + .getLog(TimelineServiceV2Publisher.class); + protected RMTimelineCollectorManager rmTimelineCollectorManager; + + public TimelineServiceV2Publisher(RMContext rmContext) { + super("TimelineserviceV2Publisher"); + rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager(); + } + + private boolean publishContainerMetrics; + + @Override + protected void serviceInit(Configuration conf) throws Exception { + publishContainerMetrics = + conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED, + YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED); + super.serviceInit(conf); + } + + @Override + void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { + TimelineEntity entity = + createApplicationEntity(event.getApplicationId()); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, + event.getApplicationName()); + entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, + event.getApplicationType()); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, + event.getUser()); + entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + event.getQueue()); + entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, + event.getSubmittedTime()); + entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO, + event.getAppTags()); + entityInfo.put( + ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, + event.isUnmanagedApp()); + entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, + event.getApplicationPriority().getPriority()); + entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, + event.getAppNodeLabelsExpression()); + entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, + event.getAmNodeLabelsExpression()); + if (event.getCallerContext() != null) { + if (event.getCallerContext().getContext() != null) { + entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_CONTEXT, + event.getCallerContext().getContext()); + } + if (event.getCallerContext().getSignature() != null) { + entityInfo.put(ApplicationMetricsConstants.YARN_APP_CALLER_SIGNATURE, + event.getCallerContext().getSignature()); + } + } + entity.setInfo(entityInfo); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + + putEntity(entity, event.getApplicationId()); + } + + @Override + void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { + ApplicationEntity entity = + createApplicationEntity(event.getApplicationId()); + RMAppMetrics appMetrics = event.getAppMetrics(); + entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS, + appMetrics.getVcoreSeconds()); + entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS, + appMetrics.getMemorySeconds()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event + .getFinalApplicationStatus().toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event + .getYarnApplicationState().toString()); + if (event.getLatestApplicationAttemptId() != null) { + eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, + event.getLatestApplicationAttemptId().toString()); + } + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity, event.getApplicationId()); + + //cleaning up the collector cached + event.getApp().stopTimelineCollector(); + } + + @Override + void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) { + ApplicationEntity entity = + createApplicationEntity(event.getApplicationId()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + event.getQueue()); + eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event + .getApplicationPriority().getPriority()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity, event.getApplicationId()); + } + + @Override + void publishApplicationStateUpdatedEvent( + ApplicaitonStateUpdatedEvent event) { + ApplicationEntity entity = + createApplicationEntity(event.getApplicationId()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + event.getAppState()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.STATE_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity, event.getApplicationId()); + } + + @Override + void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) { + ApplicationEntity entity = + createApplicationEntity(event.getApplicationId()); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, + event.getViewAppACLs()); + entity.setInfo(entityInfo); + + putEntity(entity, event.getApplicationId()); + } + + private static ApplicationEntity createApplicationEntity( + ApplicationId applicationId) { + ApplicationEntity entity = new ApplicationEntity(); + entity.setId(applicationId.toString()); + return entity; + } + + @Override + void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { + TimelineEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put( + AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, + event.getHost()); + eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, + event.getRpcPort()); + if (event.getMasterContainerId() != null) { + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + event.getMasterContainerId().toString()); + } + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity, event.getApplicationAttemptId().getApplicationId()); + } + + @Override + void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { + ApplicationAttemptEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event + .getFinalApplicationStatus().toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event + .getYarnApplicationAttemptState().toString()); + if (event.getMasterContainerId() != null) { + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + event.getMasterContainerId().toString()); + } + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity, event.getApplicationAttemptId().getApplicationId()); + } + + @Override + void publishContainerCreatedEvent(ContainerCreatedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + // updated as event info instead of entity info, as entity info is updated + // by NM + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event + .getAllocatedResource().getMemorySize()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event + .getAllocatedResource().getVirtualCores()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event + .getAllocatedNode().getHost()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event + .getAllocatedNode().getPort()); + eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + event.getAllocatedPriority().getPriority()); + eventInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + event.getNodeHttpAddress()); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity, event.getContainerId().getApplicationAttemptId() + .getApplicationId()); + } + + @Override + void publishContainerFinishedEvent(ContainerFinishedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + event.getContainerExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event + .getContainerState().toString()); + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + event.getAllocatedNode().getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + event.getAllocatedNode().getPort()); + entity.setInfo(entityInfo); + tEvent.setInfo(eventInfo); + + entity.addEvent(tEvent); + putEntity(entity, event.getContainerId().getApplicationAttemptId() + .getApplicationId()); + } + + private static ContainerEntity createContainerEntity(ContainerId containerId) { + ContainerEntity entity = new ContainerEntity(); + entity.setId(containerId.toString()); + entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION_ATTEMPT + .name(), containerId.getApplicationAttemptId().toString())); + return entity; + } + + private void putEntity(TimelineEntity entity, ApplicationId appId) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing the entity " + entity + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + TimelineCollector timelineCollector = + rmTimelineCollectorManager.get(appId); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + timelineCollector.putEntities(entities, + UserGroupInformation.getCurrentUser()); + } catch (Exception e) { + LOG.error("Error when publishing entity " + entity, e); + } + } + + private static ApplicationAttemptEntity createAppAttemptEntity( + ApplicationAttemptId appAttemptId) { + ApplicationAttemptEntity entity = new ApplicationAttemptEntity(); + entity.setId(appAttemptId.toString()); + entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(), + appAttemptId.getApplicationId().toString())); + return entity; + } + + @Override + public boolean publishRMContainerMetrics() { + return publishContainerMetrics; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index efbf0e05576..19b2aeb9c4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1379,8 +1379,6 @@ public class RMAppImpl implements RMApp, Recoverable { .applicationFinished(app, finalState); app.rmContext.getSystemMetricsPublisher() .appFinished(app, finalState, app.finishTime); - - app.stopTimelineCollector(); }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index ef44bad8156..7dafc220979 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -90,7 +90,7 @@ public class TestRMAppLogAggregationStatus { rmContext = new RMContextImpl(rmDispatcher, null, null, null, null, null, null, null, null); - rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher()); + rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class)); rmContext diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java index 97f5486f0cb..31a93097b30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistor import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -75,7 +76,7 @@ public class TestSystemMetricsPublisher { public static void setup() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true); conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, MemoryTimelineStore.class, TimelineStore.class); conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, @@ -89,7 +90,7 @@ public class TestSystemMetricsPublisher { timelineServer.start(); store = timelineServer.getTimelineStore(); - metricsPublisher = new SystemMetricsPublisher(); + metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class)); metricsPublisher.init(conf); metricsPublisher.start(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java new file mode 100644 index 00000000000..9830a807a6c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher.MultiThreadedDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestSystemMetricsPublisherForV2 { + + /** + * is the folder where the FileSystemTimelineWriterImpl writes the entities + */ + protected static File testRootDir = new File("target", + TestSystemMetricsPublisherForV2.class.getName() + "-localDir") + .getAbsoluteFile(); + + private static SystemMetricsPublisher metricsPublisher; + private static DrainDispatcher dispatcher = new DrainDispatcher(); + private static final String DEFAULT_FLOW_VERSION = "1"; + private static final long DEFAULT_FLOW_RUN = 1; + + private static ConcurrentMap rmAppsMapInContext; + + private static RMTimelineCollectorManager rmTimelineCollectorManager; + + @BeforeClass + public static void setup() throws Exception { + if (testRootDir.exists()) { + //cleanup before hand + FileContext.getLocalFSFileContext().delete( + new Path(testRootDir.getAbsolutePath()), true); + } + + RMContext rmContext = mock(RMContext.class); + rmAppsMapInContext = new ConcurrentHashMap(); + when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext); + rmTimelineCollectorManager = new RMTimelineCollectorManager(rmContext); + when(rmContext.getRMTimelineCollectorManager()).thenReturn( + rmTimelineCollectorManager); + + Configuration conf = getTimelineV2Conf(); + rmTimelineCollectorManager.init(conf); + rmTimelineCollectorManager.start(); + + metricsPublisher = new SystemMetricsPublisher(rmContext) { + @Override + Dispatcher createDispatcher( + TimelineServicePublisher timelineServicePublisher) { + return dispatcher; + } + }; + metricsPublisher.init(conf); + metricsPublisher.start(); + } + + @AfterClass + public static void tearDown() throws Exception { + if (testRootDir.exists()) { + FileContext.getLocalFSFileContext().delete( + new Path(testRootDir.getAbsolutePath()), true); + } + if (rmTimelineCollectorManager != null) { + rmTimelineCollectorManager.stop(); + } + if (metricsPublisher != null) { + metricsPublisher.stop(); + } + } + + private static Configuration getTimelineV2Conf() { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); + conf.setInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2); + conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED, + true); + try { + conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + testRootDir.getCanonicalPath()); + } catch (IOException e) { + e.printStackTrace(); + Assert + .fail("Exception while setting the TIMELINE_SERVICE_STORAGE_DIR_ROOT "); + } + return conf; + } + + @Test + public void testSystemMetricPublisherInitialization() { + @SuppressWarnings("resource") + SystemMetricsPublisher metricsPublisher = + new SystemMetricsPublisher(mock(RMContext.class)); + try { + Configuration conf = getTimelineV2Conf(); + conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED, + false); + metricsPublisher.init(conf); + assertFalse( + "Default configuration should not publish container Metrics from RM", + metricsPublisher.isPublishContainerMetrics()); + + metricsPublisher.stop(); + + metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class)); + conf = getTimelineV2Conf(); + metricsPublisher.init(conf); + assertTrue("Expected to publish container Metrics from RM", + metricsPublisher.isPublishContainerMetrics()); + assertTrue( + "MultiThreadedDispatcher expected when container Metrics is not published", + metricsPublisher.getDispatcher() instanceof MultiThreadedDispatcher); + } finally { + metricsPublisher.stop(); + } + } + + @Test(timeout = 1000000) + public void testPublishApplicationMetrics() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + RMApp app = createAppAndRegister(appId); + + metricsPublisher.appCreated(app, app.getStartTime()); + metricsPublisher.appACLsUpdated(app, "user1,user2", 4L); + metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime()); + dispatcher.await(); + + String outputDirApp = + getTimelineEntityDir(app) + "/" + TimelineEntityType.YARN_APPLICATION + + "/"; + + File entityFolder = new File(outputDirApp); + Assert.assertTrue(entityFolder.isDirectory()); + + // file name is .thist + String timelineServiceFileName = + appId.toString() + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File appFile = new File(outputDirApp, timelineServiceFileName); + Assert.assertTrue(appFile.exists()); + Assert.assertEquals("Expected 3 events to be published", 3, + getNumOfNonEmptyLines(appFile)); + } + + @Test(timeout = 10000) + public void testPublishAppAttemptMetrics() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + RMApp app = rmAppsMapInContext.get(appId); + if (app == null) { + app = createAppAndRegister(appId); + } + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId); + metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L); + when(app.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, + app, Integer.MAX_VALUE + 2L); + + dispatcher.await(); + + String outputDirApp = + getTimelineEntityDir(app) + "/" + + TimelineEntityType.YARN_APPLICATION_ATTEMPT + "/"; + + File entityFolder = new File(outputDirApp); + Assert.assertTrue(entityFolder.isDirectory()); + + // file name is .thist + String timelineServiceFileName = + appAttemptId.toString() + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File appFile = new File(outputDirApp, timelineServiceFileName); + Assert.assertTrue(appFile.exists()); + Assert.assertEquals("Expected 2 events to be published", 2, + getNumOfNonEmptyLines(appFile)); + } + + @Test(timeout = 10000) + public void testPublishContainerMetrics() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + RMApp app = rmAppsMapInContext.get(appId); + if (app == null) { + app = createAppAndRegister(appId); + } + ContainerId containerId = + ContainerId.newContainerId(ApplicationAttemptId.newInstance( + appId, 1), 1); + RMContainer container = createRMContainer(containerId); + metricsPublisher.containerCreated(container, container.getCreationTime()); + metricsPublisher.containerFinished(container, container.getFinishTime()); + dispatcher.await(); + + String outputDirApp = + getTimelineEntityDir(app) + "/" + + TimelineEntityType.YARN_CONTAINER + "/"; + + File entityFolder = new File(outputDirApp); + Assert.assertTrue(entityFolder.isDirectory()); + + // file name is .thist + String timelineServiceFileName = + containerId.toString() + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File appFile = new File(outputDirApp, timelineServiceFileName); + Assert.assertTrue(appFile.exists()); + Assert.assertEquals("Expected 2 events to be published", 2, + getNumOfNonEmptyLines(appFile)); + } + + private RMApp createAppAndRegister(ApplicationId appId) { + RMApp app = createRMApp(appId); + + // some stuff which are currently taken care in RMAppImpl + rmAppsMapInContext.putIfAbsent(appId, app); + AppLevelTimelineCollector collector = new AppLevelTimelineCollector(appId); + rmTimelineCollectorManager.putIfAbsent(appId, collector); + return app; + } + + private long getNumOfNonEmptyLines(File entityFile) throws IOException { + BufferedReader reader = null; + String strLine; + long count = 0; + try { + reader = new BufferedReader(new FileReader(entityFile)); + while ((strLine = reader.readLine()) != null) { + if (strLine.trim().length() > 0) + count++; + } + } finally { + reader.close(); + } + return count; + } + + private String getTimelineEntityDir(RMApp app) { + String outputDirApp = + testRootDir.getAbsolutePath()+"/" + + FileSystemTimelineWriterImpl.ENTITIES_DIR + + "/" + + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + + "/" + + app.getUser() + + "/" + + TimelineUtils.generateDefaultFlowIdBasedOnAppId(app + .getApplicationId()) + "/" + DEFAULT_FLOW_VERSION + "/" + + DEFAULT_FLOW_RUN + "/" + app.getApplicationId(); + return outputDirApp; + } + + private static RMApp createRMApp(ApplicationId appId) { + RMApp app = mock(RMAppImpl.class); + when(app.getApplicationId()).thenReturn(appId); + when(app.getName()).thenReturn("test app"); + when(app.getApplicationType()).thenReturn("test app type"); + when(app.getUser()).thenReturn("testUser"); + when(app.getQueue()).thenReturn("test queue"); + when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L); + when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L); + when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L); + when(app.getDiagnostics()).thenReturn( + new StringBuilder("test diagnostics info")); + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn( + ApplicationAttemptId.newInstance(appId, 1)); + when(app.getCurrentAppAttempt()).thenReturn(appAttempt); + when(app.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + when(app.getRMAppMetrics()).thenReturn( + new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE)); + when(app.getApplicationTags()).thenReturn(Collections. emptySet()); + return app; + } + + private static RMAppAttempt createRMAppAttempt( + ApplicationAttemptId appAttemptId) { + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId); + when(appAttempt.getHost()).thenReturn("test host"); + when(appAttempt.getRpcPort()).thenReturn(-100); + Container container = mock(Container.class); + when(container.getId()).thenReturn( + ContainerId.newContainerId(appAttemptId, 1)); + when(appAttempt.getMasterContainer()).thenReturn(container); + when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info"); + when(appAttempt.getTrackingUrl()).thenReturn("test tracking url"); + when(appAttempt.getOriginalTrackingUrl()).thenReturn( + "test original tracking url"); + return appAttempt; + } + + private static RMContainer createRMContainer(ContainerId containerId) { + RMContainer container = mock(RMContainer.class); + when(container.getContainerId()).thenReturn(containerId); + when(container.getAllocatedNode()).thenReturn( + NodeId.newInstance("test host", -100)); + when(container.getAllocatedResource()).thenReturn( + Resource.newInstance(-1, -1)); + when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED); + when(container.getCreationTime()).thenReturn(Integer.MAX_VALUE + 1L); + when(container.getFinishTime()).thenReturn(Integer.MAX_VALUE + 2L); + when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info"); + when(container.getContainerExitStatus()).thenReturn(-1); + when(container.getContainerState()).thenReturn(ContainerState.COMPLETE); + Container mockContainer = mock(Container.class); + when(container.getContainer()).thenReturn(mockContainer); + when(mockContainer.getNodeHttpAddress()) + .thenReturn("http://localhost:1234"); + return container; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index 79562f719b0..a878fa6b8e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -124,6 +124,12 @@ org.apache.phoenix phoenix-core + + + joda-time + joda-time + + org.apache.hbase diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index ee1515dfced..34a6b7c2f91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -55,7 +55,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT = "/tmp/timeline_service_data"; - private static final String ENTITIES_DIR = "entities"; + public static final String ENTITIES_DIR = "entities"; /** Default extension for output files */ public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; @@ -76,7 +76,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService return response; } - private void write(String clusterId, String userId, String flowName, + private synchronized void write(String clusterId, String userId, String flowName, String flowVersion, long flowRun, String appId, TimelineEntity entity, TimelineWriteResponse response) throws IOException { PrintWriter out = null;