From 8985fccbbcc2a9769fdf890b06e083fc263e4cd8 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Tue, 16 Jan 2018 07:58:29 +0530 Subject: [PATCH] YARN-6736. Consider writing to both ats v1 & v2 from RM for smoother upgrades. Contributed by Aaron Gresch. (cherry picked from commit d09058b2fd18803d12f0835fdf78aef5e0b99c90) --- .../hadoop/yarn/conf/YarnConfiguration.java | 61 ++- .../distributedshell/ApplicationMaster.java | 48 +- .../distributedshell/TestDSAppMaster.java | 86 +++- .../yarn/client/api/impl/YarnClientImpl.java | 8 +- .../client/api/impl/TimelineClientImpl.java | 6 +- .../client/api/impl/TimelineV2ClientImpl.java | 3 +- .../yarn/util/timeline/TimelineUtils.java | 3 +- .../resourcemanager/ResourceManager.java | 46 +- .../CombinedSystemMetricsPublisher.java | 108 ++++ .../TestRMTimelineService.java | 122 +++++ .../TestCombinedSystemMetricsPublisher.java | 480 ++++++++++++++++++ .../hadoop/yarn/server/MiniYARNCluster.java | 15 +- 12 files changed, 917 insertions(+), 69 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 77fdf184da9..7c25be3db67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -19,7 +19,9 @@ package org.apache.hadoop.yarn.conf; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; @@ -2068,6 +2070,9 @@ public class YarnConfiguration extends Configuration { + "version"; public static final float DEFAULT_TIMELINE_SERVICE_VERSION = 1.0f; + public static final String TIMELINE_SERVICE_VERSIONS = + TIMELINE_SERVICE_PREFIX + "versions"; + /** * Comma seperated list of names for UIs hosted in the timeline server * (For pluggable UIs). @@ -3444,8 +3449,60 @@ public class YarnConfiguration extends Configuration { * version greater than equal to 2 but smaller than 3. */ public static boolean timelineServiceV2Enabled(Configuration conf) { - return timelineServiceEnabled(conf) && - (int)getTimelineServiceVersion(conf) == 2; + boolean enabled = false; + if (timelineServiceEnabled(conf)) { + Collection versions = getTimelineServiceVersions(conf); + for (Float version : versions) { + if (version.intValue() == 2) { + enabled = true; + break; + } + } + } + return enabled; + } + + /** + * Returns whether the timeline service v.1 is enabled via configuration. + * + * @param conf the configuration + * @return whether the timeline service v.1 is enabled. V.1 refers to a + * version greater than equal to 1 but smaller than 2. + */ + public static boolean timelineServiceV1Enabled(Configuration conf) { + boolean enabled = false; + if (timelineServiceEnabled(conf)) { + Collection versions = getTimelineServiceVersions(conf); + for (Float version : versions) { + if (version.intValue() == 1) { + enabled = true; + break; + } + } + } + return enabled; + } + + /** + * Returns all the active timeline service versions. It does not check + * whether the timeline service itself is enabled. + * + * @param conf the configuration + * @return the timeline service versions as a collection of floats. + */ + private static Collection getTimelineServiceVersions( + Configuration conf) { + String versions = conf.get(TIMELINE_SERVICE_VERSIONS); + if (versions == null) { + versions = Float.toString(getTimelineServiceVersion(conf)); + } + List stringList = Arrays.asList(versions.split(",")); + List floatList = new ArrayList(); + for (String s : stringList) { + Float f = Float.parseFloat(s); + floatList.add(f); + } + return floatList; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 3de87c96aca..2a5d89d6d90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -300,7 +300,8 @@ public class ApplicationMaster { TimelineClient timelineClient; // Timeline v2 Client - private TimelineV2Client timelineV2Client; + @VisibleForTesting + TimelineV2Client timelineV2Client; static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS"; static final String APPID_TIMELINE_FILTER_NAME = "appId"; @@ -575,11 +576,7 @@ public class ApplicationMaster { containrRetryInterval = Integer.parseInt(cliParser.getOptionValue( "container_retry_interval", "0")); - if (YarnConfiguration.timelineServiceEnabled(conf)) { - timelineServiceV2Enabled = - ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2); - timelineServiceV1Enabled = !timelineServiceV2Enabled; - } else { + if (!YarnConfiguration.timelineServiceEnabled(conf)) { timelineClient = null; timelineV2Client = null; LOG.warn("Timeline service is not enabled"); @@ -647,12 +644,11 @@ public class ApplicationMaster { if (timelineServiceV2Enabled) { // need to bind timelineClient amRMClient.registerTimelineV2Client(timelineV2Client); - } - - if (timelineServiceV2Enabled) { publishApplicationAttemptEventOnTimelineServiceV2( DSEvent.DS_APP_ATTEMPT_START); - } else if (timelineServiceV1Enabled) { + } + + if (timelineServiceV1Enabled) { publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); } @@ -725,18 +721,23 @@ public class ApplicationMaster { @Override public Void run() throws Exception { if (YarnConfiguration.timelineServiceEnabled(conf)) { + timelineServiceV1Enabled = + YarnConfiguration.timelineServiceV1Enabled(conf); + timelineServiceV2Enabled = + YarnConfiguration.timelineServiceV2Enabled(conf); // Creating the Timeline Client + if (timelineServiceV1Enabled) { + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + timelineClient.start(); + LOG.info("Timeline service V1 client is enabled"); + } if (timelineServiceV2Enabled) { timelineV2Client = TimelineV2Client.createTimelineClient( appAttemptID.getApplicationId()); timelineV2Client.init(conf); timelineV2Client.start(); LOG.info("Timeline service V2 client is enabled"); - } else { - timelineClient = TimelineClient.createTimelineClient(); - timelineClient.init(conf); - timelineClient.start(); - LOG.info("Timeline service V1 client is enabled"); } } else { timelineClient = null; @@ -766,12 +767,14 @@ public class ApplicationMaster { } catch (InterruptedException ex) {} } + if (timelineServiceV1Enabled) { + publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), + DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); + } + if (timelineServiceV2Enabled) { publishApplicationAttemptEventOnTimelineServiceV2( DSEvent.DS_APP_ATTEMPT_END); - } else if (timelineServiceV1Enabled) { - publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), - DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); } // Join all launched threads @@ -822,7 +825,8 @@ public class ApplicationMaster { // Stop Timeline Client if(timelineServiceV1Enabled) { timelineClient.stop(); - } else if (timelineServiceV2Enabled) { + } + if (timelineServiceV2Enabled) { timelineV2Client.stop(); } @@ -888,7 +892,8 @@ public class ApplicationMaster { } publishContainerEndEventOnTimelineServiceV2(containerStatus, containerStartTime); - } else if (timelineServiceV1Enabled) { + } + if (timelineServiceV1Enabled) { publishContainerEndEvent(timelineClient, containerStatus, domainId, appSubmitterUgi); } @@ -1019,7 +1024,8 @@ public class ApplicationMaster { applicationMaster.getContainerStartTimes().put(containerId, startTime); applicationMaster.publishContainerStartEventOnTimelineServiceV2( container, startTime); - } else if (applicationMaster.timelineServiceV1Enabled) { + } + if (applicationMaster.timelineServiceV1Enabled) { applicationMaster.publishContainerStartEvent( applicationMaster.timelineClient, container, applicationMaster.domainId, applicationMaster.appSubmitterUgi); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java index 2789d047fb3..f11bdf8d2fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSAppMaster.java @@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.applications.distributedshell; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -33,6 +35,8 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Test; @@ -167,14 +171,82 @@ public class TestDSAppMaster { } @Test - public void testTimelineClientInDSAppMaster() throws Exception { + public void testTimelineClientInDSAppMasterV1() throws Exception { + runTimelineClientInDSAppMaster(true, false); + } + + @Test + public void testTimelineClientInDSAppMasterV2() throws Exception { + runTimelineClientInDSAppMaster(false, true); + } + + @Test + public void testTimelineClientInDSAppMasterV1V2() throws Exception { + runTimelineClientInDSAppMaster(true, true); + } + + @Test + public void testTimelineClientInDSAppMasterDisabled() throws Exception { + runTimelineClientInDSAppMaster(false, false); + } + + private void runTimelineClientInDSAppMaster(boolean v1Enabled, + boolean v2Enabled) throws Exception { + ApplicationMaster appMaster = createAppMasterWithStartedTimelineService( + v1Enabled, v2Enabled); + validateAppMasterTimelineService(v1Enabled, v2Enabled, appMaster); + } + + private void validateAppMasterTimelineService(boolean v1Enabled, + boolean v2Enabled, ApplicationMaster appMaster) { + if (v1Enabled) { + Assert.assertEquals(appMaster.appSubmitterUgi, + ((TimelineClientImpl)appMaster.timelineClient).getUgi()); + } else { + Assert.assertNull(appMaster.timelineClient); + } + if (v2Enabled) { + Assert.assertNotNull(appMaster.timelineV2Client); + } else { + Assert.assertNull(appMaster.timelineV2Client); + } + } + + private ApplicationMaster createAppMasterWithStartedTimelineService( + boolean v1Enabled, boolean v2Enabled) throws Exception { ApplicationMaster appMaster = new ApplicationMaster(); - appMaster.appSubmitterUgi = - UserGroupInformation.createUserForTesting("foo", new String[]{"bar"}); - Configuration conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + appMaster.appSubmitterUgi = UserGroupInformation + .createUserForTesting("foo", new String[] {"bar"}); + Configuration conf = this.getTimelineServiceConf(v1Enabled, v2Enabled); + ApplicationId appId = ApplicationId.newInstance(1L, 1); + appMaster.appAttemptID = ApplicationAttemptId.newInstance(appId, 1); appMaster.startTimelineClient(conf); - Assert.assertEquals(appMaster.appSubmitterUgi, - ((TimelineClientImpl)appMaster.timelineClient).getUgi()); + return appMaster; + } + + private Configuration getTimelineServiceConf(boolean v1Enabled, + boolean v2Enabled) { + Configuration conf = new YarnConfiguration(new Configuration(false)); + Assert.assertFalse(YarnConfiguration.timelineServiceEnabled(conf)); + + if (v1Enabled || v2Enabled) { + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + } + + if (v1Enabled) { + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); + } + + if (v2Enabled) { + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + } + + if (v1Enabled && v2Enabled) { + conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0"); + conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f"); + } + return conf; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 83210bd28a5..9133c1e4d60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -167,13 +167,7 @@ public class YarnClientImpl extends YarnClient { YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); } - float timelineServiceVersion = - conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); - if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) - && ((Float.compare(timelineServiceVersion, 1.0f) == 0) - || (Float.compare(timelineServiceVersion, 1.5f) == 0))) { + if (YarnConfiguration.timelineServiceV1Enabled(conf)) { timelineV1ServiceEnabled = true; timelineDTRenewer = getTimelineDelegationTokenRenewer(conf); timelineService = TimelineUtils.buildTimelineTokenService(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 48d720a28c9..3e60b716cf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -99,14 +99,12 @@ public class TimelineClientImpl extends TimelineClient { timelineServiceVersion = conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION); - LOG.info("Timeline service address: " + getTimelineServiceAddress()); - if (!YarnConfiguration.timelineServiceEnabled(conf) - || !((Float.compare(this.timelineServiceVersion, 1.0f) == 0) - || (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) { + if (!YarnConfiguration.timelineServiceV1Enabled(conf)) { throw new IOException("Timeline V1 client is not properly configured. " + "Either timeline service is not enabled or version is not set to" + " 1.x"); } + LOG.info("Timeline service address: " + getTimelineServiceAddress()); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); UserGroupInformation realUgi = ugi.getRealUser(); if (realUgi != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java index bb505955a50..09bfd5832d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java @@ -94,8 +94,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client { } protected void serviceInit(Configuration conf) throws Exception { - if (!YarnConfiguration.timelineServiceEnabled(conf) - || (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) { + if (!YarnConfiguration.timelineServiceV2Enabled(conf)) { throw new IOException("Timeline V2 client is not properly configured. " + "Either timeline service is not enabled or version is not set to" + " 2"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java index b618ac16c43..ac421d33d91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java @@ -118,7 +118,8 @@ public class TimelineUtils { } /** - * Returns whether the timeline service v.1.5 is enabled via configuration. + * Returns whether the timeline service v.1.5 is enabled by default via + * configuration. * * @param conf the configuration * @return whether the timeline service v.1.5 is enabled. V.1.5 refers to a diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index add1f91276a..58b70cc2ce9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPub import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; @@ -497,26 +498,33 @@ public class ResourceManager extends CompositeService implements Recoverable { } protected SystemMetricsPublisher createSystemMetricsPublisher() { - SystemMetricsPublisher publisher; - if (YarnConfiguration.timelineServiceEnabled(conf) && - YarnConfiguration.systemMetricsPublisherEnabled(conf)) { - if (YarnConfiguration.timelineServiceV2Enabled(conf)) { - // we're dealing with the v.2.x publisher - LOG.info("system metrics publisher with the timeline service V2 is " + - "configured"); - publisher = new TimelineServiceV2Publisher( - rmContext.getRMTimelineCollectorManager()); - } else { - // we're dealing with the v.1.x publisher - LOG.info("system metrics publisher with the timeline service V1 is " + - "configured"); - publisher = new TimelineServiceV1Publisher(); - } - } else { - LOG.info("TimelineServicePublisher is not configured"); - publisher = new NoOpSystemMetricPublisher(); + List publishers = + new ArrayList(); + if (YarnConfiguration.timelineServiceV1Enabled(conf)) { + SystemMetricsPublisher publisherV1 = new TimelineServiceV1Publisher(); + publishers.add(publisherV1); } - return publisher; + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + // we're dealing with the v.2.x publisher + LOG.info("system metrics publisher with the timeline service V2 is " + + "configured"); + SystemMetricsPublisher publisherV2 = new TimelineServiceV2Publisher( + rmContext.getRMTimelineCollectorManager()); + publishers.add(publisherV2); + } + if (publishers.isEmpty()) { + LOG.info("TimelineServicePublisher is not configured"); + SystemMetricsPublisher noopPublisher = new NoOpSystemMetricPublisher(); + publishers.add(noopPublisher); + } + + for (SystemMetricsPublisher publisher : publishers) { + addIfService(publisher); + } + + SystemMetricsPublisher combinedPublisher = + new CombinedSystemMetricsPublisher(publishers); + return combinedPublisher; } // sanity check for configurations diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java new file mode 100644 index 00000000000..96467477986 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/CombinedSystemMetricsPublisher.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.Collection; + +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +/** + * A metrics publisher that can publish for a collection of publishers. + */ +public class CombinedSystemMetricsPublisher implements SystemMetricsPublisher { + private Collection publishers; + + public CombinedSystemMetricsPublisher(Collection + publishers) { + this.publishers = publishers; + } + + @Override + public void appCreated(RMApp app, long createdTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appCreated(app, createdTime); + } + } + + @Override + public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appACLsUpdated(app, appViewACLs, updatedTime); + } + } + + @Override + public void appUpdated(RMApp app, long updatedTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appUpdated(app, updatedTime); + } + } + + @Override + public void appStateUpdated(RMApp app, YarnApplicationState appState, + long updatedTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appStateUpdated(app, appState, updatedTime); + } + } + + @Override + public void appFinished(RMApp app, RMAppState state, long finishedTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appFinished(app, state, finishedTime); + } + } + + @Override + public void appAttemptRegistered(RMAppAttempt appAttempt, + long registeredTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appAttemptRegistered(appAttempt, registeredTime); + } + } + + @Override + public void appAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.appAttemptFinished(appAttempt, appAttemtpState, app, + finishedTime); + } + } + + @Override + public void containerCreated(RMContainer container, long createdTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.containerCreated(container, createdTime); + } + } + + @Override + public void containerFinished(RMContainer container, long finishedTime) { + for (SystemMetricsPublisher publisher : this.publishers) { + publisher.containerFinished(container, finishedTime); + } + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java new file mode 100644 index 00000000000..f824fa1b93a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMTimelineService.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests that the RM creates timeline services (v1/v2) as specified by the + * configuration. + */ +public class TestRMTimelineService { + private static MockRM rm; + + private void setup(boolean v1Enabled, boolean v2Enabled) { + Configuration conf = new YarnConfiguration(new Configuration(false)); + Assert.assertFalse(YarnConfiguration.timelineServiceEnabled(conf)); + + if (v1Enabled || v2Enabled) { + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + } + + if (v1Enabled) { + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); + } + + if (v2Enabled) { + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + } + + if (v1Enabled && v2Enabled) { + conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0"); + conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f"); + } + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + rm = new MockRM(conf, memStore); + rm.start(); + } + + // validate RM services exist or not as we specified + private void validate(boolean v1Enabled, boolean v2Enabled) { + boolean v1PublisherServiceFound = false; + boolean v2PublisherServiceFound = false; + List services = rm.getServices(); + for (Service service : services) { + if (service instanceof TimelineServiceV1Publisher) { + v1PublisherServiceFound = true; + } else if (service instanceof TimelineServiceV2Publisher) { + v2PublisherServiceFound = true; + } + } + + Assert.assertEquals(v1Enabled, v1PublisherServiceFound); + Assert.assertEquals(v2Enabled, v2PublisherServiceFound); + } + + private void cleanup() throws Exception { + rm.close(); + rm.stop(); + } + + // runs test to validate RM creates a timeline service publisher if and + // only if the service is enabled for v1 and v2 (independently). + private void runTest(boolean v1Enabled, boolean v2Enabled) throws Exception { + setup(v1Enabled, v2Enabled); + validate(v1Enabled, v2Enabled); + cleanup(); + } + + @Test + public void testTimelineServiceV1V2Enabled() throws Exception { + runTest(true, true); + } + + @Test + public void testTimelineServiceV1Enabled() throws Exception { + runTest(true, false); + } + + @Test + public void testTimelineServiceV2Enabled() throws Exception { + runTest(false, true); + } + + @Test + public void testTimelineServiceDisabled() throws Exception { + runTest(false, false); + } +} + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java new file mode 100644 index 00000000000..cefa6e84276 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestCombinedSystemMetricsPublisher.java @@ -0,0 +1,480 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; +import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; +import org.apache.hadoop.yarn.server.timeline.TimelineStore; +import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; +import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; +import org.apache.hadoop.yarn.util.TimelineServiceHelper; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests that a CombinedSystemMetricsPublisher publishes metrics for timeline + * services (v1/v2) as specified by the configuration. + */ +public class TestCombinedSystemMetricsPublisher { + /** + * The folder where the FileSystemTimelineWriterImpl writes the entities. + */ + private static File testRootDir = new File("target", + TestCombinedSystemMetricsPublisher.class.getName() + "-localDir") + .getAbsoluteFile(); + + private static ApplicationHistoryServer timelineServer; + private static CombinedSystemMetricsPublisher metricsPublisher; + private static TimelineStore store; + private static ConcurrentMap rmAppsMapInContext; + private static RMTimelineCollectorManager rmTimelineCollectorManager; + private static DrainDispatcher dispatcher; + private static YarnConfiguration conf; + private static TimelineServiceV1Publisher publisherV1; + private static TimelineServiceV2Publisher publisherV2; + private static ApplicationAttemptId appAttemptId; + private static RMApp app; + + private void testSetup(boolean enableV1, boolean enableV2) throws Exception { + + if (testRootDir.exists()) { + //cleanup before hand + FileContext.getLocalFSFileContext().delete( + new Path(testRootDir.getAbsolutePath()), true); + } + + conf = getConf(enableV1, enableV2); + + RMContext rmContext = mock(RMContext.class); + rmAppsMapInContext = new ConcurrentHashMap(); + when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext); + ResourceManager rm = mock(ResourceManager.class); + when(rm.getRMContext()).thenReturn(rmContext); + + if (enableV2) { + dispatcher = new DrainDispatcher(); + rmTimelineCollectorManager = new RMTimelineCollectorManager(rm); + when(rmContext.getRMTimelineCollectorManager()).thenReturn( + rmTimelineCollectorManager); + + rmTimelineCollectorManager.init(conf); + rmTimelineCollectorManager.start(); + } else { + dispatcher = null; + rmTimelineCollectorManager = null; + } + + timelineServer = new ApplicationHistoryServer(); + timelineServer.init(conf); + timelineServer.start(); + store = timelineServer.getTimelineStore(); + + if (enableV2) { + dispatcher.init(conf); + dispatcher.start(); + } + + List publishers = + new ArrayList(); + + if (YarnConfiguration.timelineServiceV1Enabled(conf)) { + Assert.assertTrue(enableV1); + publisherV1 = new TimelineServiceV1Publisher(); + publishers.add(publisherV1); + publisherV1.init(conf); + publisherV1.start(); + } else { + Assert.assertFalse(enableV1); + publisherV1 = null; + } + + if (YarnConfiguration.timelineServiceV2Enabled(conf)) { + Assert.assertTrue(enableV2); + publisherV2 = new TimelineServiceV2Publisher( + rmTimelineCollectorManager) { + @Override + protected Dispatcher getDispatcher() { + return dispatcher; + } + }; + publishers.add(publisherV2); + publisherV2.init(conf); + publisherV2.start(); + } else { + Assert.assertFalse(enableV2); + publisherV2 = null; + } + + if (publishers.isEmpty()) { + NoOpSystemMetricPublisher noopPublisher = + new NoOpSystemMetricPublisher(); + publishers.add(noopPublisher); + } + + metricsPublisher = new CombinedSystemMetricsPublisher(publishers); + } + + private void testCleanup() throws Exception { + if (publisherV1 != null) { + publisherV1.stop(); + } + if (publisherV2 != null) { + publisherV2.stop(); + } + if (timelineServer != null) { + timelineServer.stop(); + } + if (testRootDir.exists()) { + FileContext.getLocalFSFileContext().delete( + new Path(testRootDir.getAbsolutePath()), true); + } + if (rmTimelineCollectorManager != null) { + rmTimelineCollectorManager.stop(); + } + } + + private static YarnConfiguration getConf(boolean v1Enabled, + boolean v2Enabled) { + YarnConfiguration yarnConf = new YarnConfiguration(); + + if (v1Enabled || v2Enabled) { + yarnConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + } else { + yarnConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false); + } + + if (v1Enabled) { + yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0"); + yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, + MemoryTimelineStore.class, TimelineStore.class); + yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, + MemoryTimelineStateStore.class, TimelineStateStore.class); + } + + if (v2Enabled) { + yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "2.0"); + yarnConf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, + true); + yarnConf.setBoolean( + YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED, true); + yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class); + + try { + yarnConf.set( + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + testRootDir.getCanonicalPath()); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail("Exception while setting the " + + "TIMELINE_SERVICE_STORAGE_DIR_ROOT "); + } + } + + if (v1Enabled && v2Enabled) { + yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0"); + yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f"); + } + + yarnConf.setInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2); + + return yarnConf; + } + + // runs test to validate timeline events are published if and only if the + // service is enabled for v1 and v2 (independently). + private void runTest(boolean v1Enabled, boolean v2Enabled) throws Exception { + testSetup(v1Enabled, v2Enabled); + publishEvents(v1Enabled, v2Enabled); + validateV1(v1Enabled); + validateV2(v2Enabled); + testCleanup(); + } + + @Test(timeout = 10000) + public void testTimelineServiceEventPublishingV1V2Enabled() + throws Exception { + runTest(true, true); + } + + @Test(timeout = 10000) + public void testTimelineServiceEventPublishingV1Enabled() throws Exception { + runTest(true, false); + } + + @Test(timeout = 10000) + public void testTimelineServiceEventPublishingV2Enabled() throws Exception { + runTest(false, true); + } + + @Test(timeout = 10000) + public void testTimelineServiceEventPublishingNoService() throws Exception { + runTest(false, false); + } + + private void publishEvents(boolean v1Enabled, boolean v2Enabled) { + long timestamp = (v1Enabled) ? 1 : 2; + int id = (v2Enabled) ? 3 : 4; + ApplicationId appId = ApplicationId.newInstance(timestamp, id); + + app = createRMApp(appId); + rmAppsMapInContext.putIfAbsent(appId, app); + + if (v2Enabled) { + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + rmTimelineCollectorManager.putIfAbsent(appId, collector); + } + appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + RMAppAttempt appAttempt = createRMAppAttempt(true); + + metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L); + metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, + app, Integer.MAX_VALUE + 2L); + if (v2Enabled) { + dispatcher.await(); + } + } + + private void validateV1(boolean v1Enabled) throws Exception { + TimelineEntity entity = null; + + if (!v1Enabled) { + Thread.sleep(1000); + entity = + store.getEntity(appAttemptId.toString(), + AppAttemptMetricsConstants.ENTITY_TYPE, + EnumSet.allOf(Field.class)); + Assert.assertNull(entity); + return; + } + + do { + entity = + store.getEntity(appAttemptId.toString(), + AppAttemptMetricsConstants.ENTITY_TYPE, + EnumSet.allOf(Field.class)); + Thread.sleep(100); + // ensure two events are both published before leaving the loop + } while (entity == null || entity.getEvents().size() < 2); + + boolean hasRegisteredEvent = false; + boolean hasFinishedEvent = false; + for (org.apache.hadoop.yarn.api.records.timeline.TimelineEvent event : + entity.getEvents()) { + if (event.getEventType().equals( + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) { + hasRegisteredEvent = true; + } else if (event.getEventType().equals( + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) { + hasFinishedEvent = true; + Assert.assertEquals( + FinalApplicationStatus.UNDEFINED.toString(), + event.getEventInfo().get( + AppAttemptMetricsConstants.FINAL_STATUS_INFO)); + Assert.assertEquals( + YarnApplicationAttemptState.FINISHED.toString(), + event.getEventInfo().get( + AppAttemptMetricsConstants.STATE_INFO)); + } + Assert + .assertEquals(appAttemptId.toString(), entity.getEntityId()); + } + Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent); + } + + private void validateV2(boolean v2Enabled) throws Exception { + String outputDirApp = + getTimelineEntityDir() + "/" + + TimelineEntityType.YARN_APPLICATION_ATTEMPT + "/"; + + File entityFolder = new File(outputDirApp); + Assert.assertEquals(v2Enabled, entityFolder.isDirectory()); + + if (v2Enabled) { + String timelineServiceFileName = appAttemptId.toString() + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + File entityFile = new File(outputDirApp, timelineServiceFileName); + Assert.assertTrue(entityFile.exists()); + long idPrefix = TimelineServiceHelper + .invertLong(appAttemptId.getAttemptId()); + verifyEntity(entityFile, 2, + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 0, idPrefix); + } + } + + private void verifyEntity(File entityFile, long expectedEvents, + String eventForCreatedTime, long expectedMetrics, long idPrefix) + throws IOException { + + BufferedReader reader = null; + String strLine; + long count = 0; + long metricsCount = 0; + try { + reader = new BufferedReader(new FileReader(entityFile)); + while ((strLine = reader.readLine()) != null) { + if (strLine.trim().length() > 0) { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity + entity = FileSystemTimelineReaderImpl + .getTimelineRecordFromJSON(strLine.trim(), + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.class); + metricsCount = entity.getMetrics().size(); + assertEquals(idPrefix, entity.getIdPrefix()); + for (TimelineEvent event : entity.getEvents()) { + if (event.getId().equals(eventForCreatedTime)) { + assertTrue(entity.getCreatedTime() > 0); + break; + } + } + count++; + } + } + } finally { + reader.close(); + } + assertEquals("Expected " + expectedEvents + " events to be published", + expectedEvents, count); + assertEquals("Expected " + expectedMetrics + " metrics is incorrect", + expectedMetrics, metricsCount); + } + + private String getTimelineEntityDir() { + String outputDirApp = + testRootDir.getAbsolutePath() + "/" + + FileSystemTimelineWriterImpl.ENTITIES_DIR + "/" + + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + + app.getUser() + "/" + + app.getName() + "/" + + TimelineUtils.DEFAULT_FLOW_VERSION + "/" + + app.getStartTime() + "/" + + app.getApplicationId(); + return outputDirApp; + } + + private static RMAppAttempt createRMAppAttempt(boolean unmanagedAMAttempt) { + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId); + when(appAttempt.getHost()).thenReturn("test host"); + when(appAttempt.getRpcPort()).thenReturn(-100); + if (!unmanagedAMAttempt) { + Container container = mock(Container.class); + when(container.getId()) + .thenReturn(ContainerId.newContainerId(appAttemptId, 1)); + when(appAttempt.getMasterContainer()).thenReturn(container); + } + when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info"); + when(appAttempt.getTrackingUrl()).thenReturn("test tracking url"); + when(appAttempt.getOriginalTrackingUrl()).thenReturn( + "test original tracking url"); + return appAttempt; + } + + private static RMApp createRMApp(ApplicationId appId) { + RMApp rmApp = mock(RMAppImpl.class); + when(rmApp.getApplicationId()).thenReturn(appId); + when(rmApp.getName()).thenReturn("test app"); + when(rmApp.getApplicationType()).thenReturn("test app type"); + when(rmApp.getUser()).thenReturn("testUser"); + when(rmApp.getQueue()).thenReturn("test queue"); + when(rmApp.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L); + when(rmApp.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L); + when(rmApp.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L); + when(rmApp.getDiagnostics()).thenReturn( + new StringBuilder("test diagnostics info")); + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn( + ApplicationAttemptId.newInstance(appId, 1)); + when(rmApp.getCurrentAppAttempt()).thenReturn(appAttempt); + when(rmApp.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + when(rmApp.getRMAppMetrics()).thenReturn( + new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, Integer.MAX_VALUE, + Long.MAX_VALUE, Integer.MAX_VALUE, Long.MAX_VALUE)); + when(rmApp.getApplicationTags()).thenReturn( + Collections. emptySet()); + ApplicationSubmissionContext appSubmissionContext = + mock(ApplicationSubmissionContext.class); + when(appSubmissionContext.getPriority()) + .thenReturn(Priority.newInstance(0)); + + ContainerLaunchContext containerLaunchContext = + mock(ContainerLaunchContext.class); + when(containerLaunchContext.getCommands()) + .thenReturn(Collections.singletonList("java -Xmx1024m")); + when(appSubmissionContext.getAMContainerSpec()) + .thenReturn(containerLaunchContext); + when(rmApp.getApplicationPriority()).thenReturn(Priority.newInstance(10)); + when(rmApp.getApplicationSubmissionContext()) + .thenReturn(appSubmissionContext); + return rmApp; + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 9834b3a6c25..117bb7156d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -247,6 +247,15 @@ public class MiniYARNCluster extends CompositeService { useFixedPorts = conf.getBoolean( YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS); + + if (!useFixedPorts) { + String hostname = MiniYARNCluster.getHostname(); + conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0"); + + conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + hostname + ":" + ServerSocketUtil.getPort(9188, 10)); + } + useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC); failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, @@ -800,12 +809,6 @@ public class MiniYARNCluster extends CompositeService { } conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, MemoryTimelineStateStore.class, TimelineStateStore.class); - if (!useFixedPorts) { - String hostname = MiniYARNCluster.getHostname(); - conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0"); - conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - hostname + ":" + ServerSocketUtil.getPort(9188, 10)); - } appHistoryServer.init(conf); super.serviceInit(conf); }