YARN-6736. Consider writing to both ats v1 & v2 from RM for smoother upgrades. Contributed by Aaron Gresch.

This commit is contained in:
Rohith Sharma K S 2018-01-16 07:58:29 +05:30
parent a0c71dcc33
commit d09058b2fd
12 changed files with 913 additions and 69 deletions

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.yarn.conf;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -2271,6 +2273,9 @@ public class YarnConfiguration extends Configuration {
+ "version";
public static final float DEFAULT_TIMELINE_SERVICE_VERSION = 1.0f;
public static final String TIMELINE_SERVICE_VERSIONS =
TIMELINE_SERVICE_PREFIX + "versions";
/**
* Comma separated list of names for UIs hosted in the timeline server
* (For pluggable UIs).
@ -3636,8 +3641,60 @@ public class YarnConfiguration extends Configuration {
* version greater than equal to 2 but smaller than 3.
*/
public static boolean timelineServiceV2Enabled(Configuration conf) {
return timelineServiceEnabled(conf) &&
(int)getTimelineServiceVersion(conf) == 2;
boolean enabled = false;
if (timelineServiceEnabled(conf)) {
Collection<Float> versions = getTimelineServiceVersions(conf);
for (Float version : versions) {
if (version.intValue() == 2) {
enabled = true;
break;
}
}
}
return enabled;
}
/**
* Returns whether the timeline service v.1 is enabled via configuration.
*
* @param conf the configuration
* @return whether the timeline service v.1 is enabled. V.1 refers to a
* version greater than equal to 1 but smaller than 2.
*/
public static boolean timelineServiceV1Enabled(Configuration conf) {
boolean enabled = false;
if (timelineServiceEnabled(conf)) {
Collection<Float> versions = getTimelineServiceVersions(conf);
for (Float version : versions) {
if (version.intValue() == 1) {
enabled = true;
break;
}
}
}
return enabled;
}
/**
* Returns all the active timeline service versions. It does not check
* whether the timeline service itself is enabled.
*
* @param conf the configuration
* @return the timeline service versions as a collection of floats.
*/
private static Collection<Float> getTimelineServiceVersions(
Configuration conf) {
String versions = conf.get(TIMELINE_SERVICE_VERSIONS);
if (versions == null) {
versions = Float.toString(getTimelineServiceVersion(conf));
}
List<String> stringList = Arrays.asList(versions.split(","));
List<Float> floatList = new ArrayList<Float>();
for (String s : stringList) {
Float f = Float.parseFloat(s);
floatList.add(f);
}
return floatList;
}
/**

View File

@ -323,7 +323,8 @@ public class ApplicationMaster {
TimelineClient timelineClient;
// Timeline v2 Client
private TimelineV2Client timelineV2Client;
@VisibleForTesting
TimelineV2Client timelineV2Client;
static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
static final String APPID_TIMELINE_FILTER_NAME = "appId";
@ -632,11 +633,7 @@ public class ApplicationMaster {
containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
"container_retry_interval", "0"));
if (YarnConfiguration.timelineServiceEnabled(conf)) {
timelineServiceV2Enabled =
((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
timelineServiceV1Enabled = !timelineServiceV2Enabled;
} else {
if (!YarnConfiguration.timelineServiceEnabled(conf)) {
timelineClient = null;
timelineV2Client = null;
LOG.warn("Timeline service is not enabled");
@ -704,12 +701,11 @@ public class ApplicationMaster {
if (timelineServiceV2Enabled) {
// need to bind timelineClient
amRMClient.registerTimelineV2Client(timelineV2Client);
}
if (timelineServiceV2Enabled) {
publishApplicationAttemptEventOnTimelineServiceV2(
DSEvent.DS_APP_ATTEMPT_START);
} else if (timelineServiceV1Enabled) {
}
if (timelineServiceV1Enabled) {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
}
@ -784,18 +780,23 @@ public class ApplicationMaster {
@Override
public Void run() throws Exception {
if (YarnConfiguration.timelineServiceEnabled(conf)) {
timelineServiceV1Enabled =
YarnConfiguration.timelineServiceV1Enabled(conf);
timelineServiceV2Enabled =
YarnConfiguration.timelineServiceV2Enabled(conf);
// Creating the Timeline Client
if (timelineServiceV1Enabled) {
timelineClient = TimelineClient.createTimelineClient();
timelineClient.init(conf);
timelineClient.start();
LOG.info("Timeline service V1 client is enabled");
}
if (timelineServiceV2Enabled) {
timelineV2Client = TimelineV2Client.createTimelineClient(
appAttemptID.getApplicationId());
timelineV2Client.init(conf);
timelineV2Client.start();
LOG.info("Timeline service V2 client is enabled");
} else {
timelineClient = TimelineClient.createTimelineClient();
timelineClient.init(conf);
timelineClient.start();
LOG.info("Timeline service V1 client is enabled");
}
} else {
timelineClient = null;
@ -825,12 +826,14 @@ public class ApplicationMaster {
} catch (InterruptedException ex) {}
}
if (timelineServiceV1Enabled) {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
}
if (timelineServiceV2Enabled) {
publishApplicationAttemptEventOnTimelineServiceV2(
DSEvent.DS_APP_ATTEMPT_END);
} else if (timelineServiceV1Enabled) {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
}
// Join all launched threads
@ -881,7 +884,8 @@ public class ApplicationMaster {
// Stop Timeline Client
if(timelineServiceV1Enabled) {
timelineClient.stop();
} else if (timelineServiceV2Enabled) {
}
if (timelineServiceV2Enabled) {
timelineV2Client.stop();
}
@ -947,7 +951,8 @@ public class ApplicationMaster {
}
publishContainerEndEventOnTimelineServiceV2(containerStatus,
containerStartTime);
} else if (timelineServiceV1Enabled) {
}
if (timelineServiceV1Enabled) {
publishContainerEndEvent(timelineClient, containerStatus, domainId,
appSubmitterUgi);
}
@ -1113,7 +1118,8 @@ public class ApplicationMaster {
applicationMaster.getContainerStartTimes().put(containerId, startTime);
applicationMaster.publishContainerStartEventOnTimelineServiceV2(
container, startTime);
} else if (applicationMaster.timelineServiceV1Enabled) {
}
if (applicationMaster.timelineServiceV1Enabled) {
applicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container,
applicationMaster.domainId, applicationMaster.appSubmitterUgi);

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.applications.distributedshell;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -33,6 +35,8 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
@ -167,14 +171,82 @@ public class TestDSAppMaster {
}
@Test
public void testTimelineClientInDSAppMaster() throws Exception {
public void testTimelineClientInDSAppMasterV1() throws Exception {
runTimelineClientInDSAppMaster(true, false);
}
@Test
public void testTimelineClientInDSAppMasterV2() throws Exception {
runTimelineClientInDSAppMaster(false, true);
}
@Test
public void testTimelineClientInDSAppMasterV1V2() throws Exception {
runTimelineClientInDSAppMaster(true, true);
}
@Test
public void testTimelineClientInDSAppMasterDisabled() throws Exception {
runTimelineClientInDSAppMaster(false, false);
}
private void runTimelineClientInDSAppMaster(boolean v1Enabled,
boolean v2Enabled) throws Exception {
ApplicationMaster appMaster = createAppMasterWithStartedTimelineService(
v1Enabled, v2Enabled);
validateAppMasterTimelineService(v1Enabled, v2Enabled, appMaster);
}
private void validateAppMasterTimelineService(boolean v1Enabled,
boolean v2Enabled, ApplicationMaster appMaster) {
if (v1Enabled) {
Assert.assertEquals(appMaster.appSubmitterUgi,
((TimelineClientImpl)appMaster.timelineClient).getUgi());
} else {
Assert.assertNull(appMaster.timelineClient);
}
if (v2Enabled) {
Assert.assertNotNull(appMaster.timelineV2Client);
} else {
Assert.assertNull(appMaster.timelineV2Client);
}
}
private ApplicationMaster createAppMasterWithStartedTimelineService(
boolean v1Enabled, boolean v2Enabled) throws Exception {
ApplicationMaster appMaster = new ApplicationMaster();
appMaster.appSubmitterUgi =
UserGroupInformation.createUserForTesting("foo", new String[]{"bar"});
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
appMaster.appSubmitterUgi = UserGroupInformation
.createUserForTesting("foo", new String[] {"bar"});
Configuration conf = this.getTimelineServiceConf(v1Enabled, v2Enabled);
ApplicationId appId = ApplicationId.newInstance(1L, 1);
appMaster.appAttemptID = ApplicationAttemptId.newInstance(appId, 1);
appMaster.startTimelineClient(conf);
Assert.assertEquals(appMaster.appSubmitterUgi,
((TimelineClientImpl)appMaster.timelineClient).getUgi());
return appMaster;
}
private Configuration getTimelineServiceConf(boolean v1Enabled,
boolean v2Enabled) {
Configuration conf = new YarnConfiguration(new Configuration(false));
Assert.assertFalse(YarnConfiguration.timelineServiceEnabled(conf));
if (v1Enabled || v2Enabled) {
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
}
if (v1Enabled) {
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
}
if (v2Enabled) {
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
}
if (v1Enabled && v2Enabled) {
conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f");
}
return conf;
}
}

View File

@ -177,13 +177,7 @@ public class YarnClientImpl extends YarnClient {
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
}
float timelineServiceVersion =
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
&& ((Float.compare(timelineServiceVersion, 1.0f) == 0)
|| (Float.compare(timelineServiceVersion, 1.5f) == 0))) {
if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
timelineV1ServiceEnabled = true;
timelineDTRenewer = getTimelineDelegationTokenRenewer(conf);
timelineService = TimelineUtils.buildTimelineTokenService(conf);

View File

@ -99,14 +99,12 @@ public class TimelineClientImpl extends TimelineClient {
timelineServiceVersion =
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
LOG.info("Timeline service address: " + getTimelineServiceAddress());
if (!YarnConfiguration.timelineServiceEnabled(conf)
|| !((Float.compare(this.timelineServiceVersion, 1.0f) == 0)
|| (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) {
if (!YarnConfiguration.timelineServiceV1Enabled(conf)) {
throw new IOException("Timeline V1 client is not properly configured. "
+ "Either timeline service is not enabled or version is not set to"
+ " 1.x");
}
LOG.info("Timeline service address: " + getTimelineServiceAddress());
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
if (realUgi != null) {

View File

@ -94,8 +94,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
}
protected void serviceInit(Configuration conf) throws Exception {
if (!YarnConfiguration.timelineServiceEnabled(conf)
|| (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) {
if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
throw new IOException("Timeline V2 client is not properly configured. "
+ "Either timeline service is not enabled or version is not set to"
+ " 2");

View File

@ -118,7 +118,8 @@ public class TimelineUtils {
}
/**
* Returns whether the timeline service v.1.5 is enabled via configuration.
* Returns whether the timeline service v.1.5 is enabled by default via
* configuration.
*
* @param conf the configuration
* @return whether the timeline service v.1.5 is enabled. V.1.5 refers to a

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPub
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
@ -513,26 +514,33 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected SystemMetricsPublisher createSystemMetricsPublisher() {
SystemMetricsPublisher publisher;
if (YarnConfiguration.timelineServiceEnabled(conf) &&
YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// we're dealing with the v.2.x publisher
LOG.info("system metrics publisher with the timeline service V2 is " +
"configured");
publisher = new TimelineServiceV2Publisher(
rmContext.getRMTimelineCollectorManager());
} else {
// we're dealing with the v.1.x publisher
LOG.info("system metrics publisher with the timeline service V1 is " +
"configured");
publisher = new TimelineServiceV1Publisher();
}
} else {
LOG.info("TimelineServicePublisher is not configured");
publisher = new NoOpSystemMetricPublisher();
List<SystemMetricsPublisher> publishers =
new ArrayList<SystemMetricsPublisher>();
if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
SystemMetricsPublisher publisherV1 = new TimelineServiceV1Publisher();
publishers.add(publisherV1);
}
return publisher;
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// we're dealing with the v.2.x publisher
LOG.info("system metrics publisher with the timeline service V2 is "
+ "configured");
SystemMetricsPublisher publisherV2 = new TimelineServiceV2Publisher(
rmContext.getRMTimelineCollectorManager());
publishers.add(publisherV2);
}
if (publishers.isEmpty()) {
LOG.info("TimelineServicePublisher is not configured");
SystemMetricsPublisher noopPublisher = new NoOpSystemMetricPublisher();
publishers.add(noopPublisher);
}
for (SystemMetricsPublisher publisher : publishers) {
addIfService(publisher);
}
SystemMetricsPublisher combinedPublisher =
new CombinedSystemMetricsPublisher(publishers);
return combinedPublisher;
}
// sanity check for configurations

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.Collection;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* A metrics publisher that can publish for a collection of publishers.
*/
public class CombinedSystemMetricsPublisher implements SystemMetricsPublisher {
private Collection<SystemMetricsPublisher> publishers;
public CombinedSystemMetricsPublisher(Collection<SystemMetricsPublisher>
publishers) {
this.publishers = publishers;
}
@Override
public void appCreated(RMApp app, long createdTime) {
for (SystemMetricsPublisher publisher : this.publishers) {
publisher.appCreated(app, createdTime);
}
}
@Override
public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
for (SystemMetricsPublisher publisher : this.publishers) {
publisher.appACLsUpdated(app, appViewACLs, updatedTime);
}
}
@Override
public void appUpdated(RMApp app, long updatedTime) {
for (SystemMetricsPublisher publisher : this.publishers) {
publisher.appUpdated(app, updatedTime);
}
}
@Override
public void appStateUpdated(RMApp app, YarnApplicationState appState,
long updatedTime) {
for (SystemMetricsPublisher publisher : this.publishers) {
publisher.appStateUpdated(app, appState, updatedTime);
}
}
@Override
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
for (SystemMetricsPublisher publisher : this.publishers) {
publisher.appFinished(app, state, finishedTime);
}
}
@Override
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
for (SystemMetricsPublisher publisher : this.publishers) {
publisher.appAttemptRegistered(appAttempt, registeredTime);
}
}
@Override
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
for (SystemMetricsPublisher publisher : this.publishers) {
publisher.appAttemptFinished(appAttempt, appAttemtpState, app,
finishedTime);
}
}
@Override
public void containerCreated(RMContainer container, long createdTime) {
for (SystemMetricsPublisher publisher : this.publishers) {
publisher.containerCreated(container, createdTime);
}
}
@Override
public void containerFinished(RMContainer container, long finishedTime) {
for (SystemMetricsPublisher publisher : this.publishers) {
publisher.containerFinished(container, finishedTime);
}
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests that the RM creates timeline services (v1/v2) as specified by the
* configuration.
*/
public class TestRMTimelineService {
private static MockRM rm;
private void setup(boolean v1Enabled, boolean v2Enabled) {
Configuration conf = new YarnConfiguration(new Configuration(false));
Assert.assertFalse(YarnConfiguration.timelineServiceEnabled(conf));
if (v1Enabled || v2Enabled) {
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
}
if (v1Enabled) {
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
}
if (v2Enabled) {
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
}
if (v1Enabled && v2Enabled) {
conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
conf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f");
}
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
rm = new MockRM(conf, memStore);
rm.start();
}
// validate RM services exist or not as we specified
private void validate(boolean v1Enabled, boolean v2Enabled) {
boolean v1PublisherServiceFound = false;
boolean v2PublisherServiceFound = false;
List<Service> services = rm.getServices();
for (Service service : services) {
if (service instanceof TimelineServiceV1Publisher) {
v1PublisherServiceFound = true;
} else if (service instanceof TimelineServiceV2Publisher) {
v2PublisherServiceFound = true;
}
}
Assert.assertEquals(v1Enabled, v1PublisherServiceFound);
Assert.assertEquals(v2Enabled, v2PublisherServiceFound);
}
private void cleanup() throws Exception {
rm.close();
rm.stop();
}
// runs test to validate RM creates a timeline service publisher if and
// only if the service is enabled for v1 and v2 (independently).
private void runTest(boolean v1Enabled, boolean v2Enabled) throws Exception {
setup(v1Enabled, v2Enabled);
validate(v1Enabled, v2Enabled);
cleanup();
}
@Test
public void testTimelineServiceV1V2Enabled() throws Exception {
runTest(true, true);
}
@Test
public void testTimelineServiceV1Enabled() throws Exception {
runTest(true, false);
}
@Test
public void testTimelineServiceV2Enabled() throws Exception {
runTest(false, true);
}
@Test
public void testTimelineServiceDisabled() throws Exception {
runTest(false, false);
}
}

View File

@ -0,0 +1,476 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests that a CombinedSystemMetricsPublisher publishes metrics for timeline
* services (v1/v2) as specified by the configuration.
*/
public class TestCombinedSystemMetricsPublisher {
/**
* The folder where the FileSystemTimelineWriterImpl writes the entities.
*/
private static File testRootDir = new File("target",
TestCombinedSystemMetricsPublisher.class.getName() + "-localDir")
.getAbsoluteFile();
private static ApplicationHistoryServer timelineServer;
private static CombinedSystemMetricsPublisher metricsPublisher;
private static TimelineStore store;
private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext;
private static RMTimelineCollectorManager rmTimelineCollectorManager;
private static DrainDispatcher dispatcher;
private static YarnConfiguration conf;
private static TimelineServiceV1Publisher publisherV1;
private static TimelineServiceV2Publisher publisherV2;
private static ApplicationAttemptId appAttemptId;
private static RMApp app;
private void testSetup(boolean enableV1, boolean enableV2) throws Exception {
if (testRootDir.exists()) {
//cleanup before hand
FileContext.getLocalFSFileContext().delete(
new Path(testRootDir.getAbsolutePath()), true);
}
conf = getConf(enableV1, enableV2);
RMContext rmContext = mock(RMContext.class);
rmAppsMapInContext = new ConcurrentHashMap<ApplicationId, RMApp>();
when(rmContext.getRMApps()).thenReturn(rmAppsMapInContext);
ResourceManager rm = mock(ResourceManager.class);
when(rm.getRMContext()).thenReturn(rmContext);
if (enableV2) {
dispatcher = new DrainDispatcher();
rmTimelineCollectorManager = new RMTimelineCollectorManager(rm);
when(rmContext.getRMTimelineCollectorManager()).thenReturn(
rmTimelineCollectorManager);
rmTimelineCollectorManager.init(conf);
rmTimelineCollectorManager.start();
} else {
dispatcher = null;
rmTimelineCollectorManager = null;
}
timelineServer = new ApplicationHistoryServer();
timelineServer.init(conf);
timelineServer.start();
store = timelineServer.getTimelineStore();
if (enableV2) {
dispatcher.init(conf);
dispatcher.start();
}
List<SystemMetricsPublisher> publishers =
new ArrayList<SystemMetricsPublisher>();
if (YarnConfiguration.timelineServiceV1Enabled(conf)) {
Assert.assertTrue(enableV1);
publisherV1 = new TimelineServiceV1Publisher();
publishers.add(publisherV1);
publisherV1.init(conf);
publisherV1.start();
} else {
Assert.assertFalse(enableV1);
publisherV1 = null;
}
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
Assert.assertTrue(enableV2);
publisherV2 = new TimelineServiceV2Publisher(
rmTimelineCollectorManager) {
@Override
protected Dispatcher getDispatcher() {
return dispatcher;
}
};
publishers.add(publisherV2);
publisherV2.init(conf);
publisherV2.start();
} else {
Assert.assertFalse(enableV2);
publisherV2 = null;
}
if (publishers.isEmpty()) {
NoOpSystemMetricPublisher noopPublisher =
new NoOpSystemMetricPublisher();
publishers.add(noopPublisher);
}
metricsPublisher = new CombinedSystemMetricsPublisher(publishers);
}
private void testCleanup() throws Exception {
if (publisherV1 != null) {
publisherV1.stop();
}
if (publisherV2 != null) {
publisherV2.stop();
}
if (timelineServer != null) {
timelineServer.stop();
}
if (testRootDir.exists()) {
FileContext.getLocalFSFileContext().delete(
new Path(testRootDir.getAbsolutePath()), true);
}
if (rmTimelineCollectorManager != null) {
rmTimelineCollectorManager.stop();
}
}
private static YarnConfiguration getConf(boolean v1Enabled,
boolean v2Enabled) {
YarnConfiguration yarnConf = new YarnConfiguration();
if (v1Enabled || v2Enabled) {
yarnConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
} else {
yarnConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
}
if (v1Enabled) {
yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class);
yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
MemoryTimelineStateStore.class, TimelineStateStore.class);
}
if (v2Enabled) {
yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "2.0");
yarnConf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
true);
yarnConf.setBoolean(
YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED, true);
yarnConf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
try {
yarnConf.set(
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
testRootDir.getCanonicalPath());
} catch (IOException e) {
e.printStackTrace();
Assert.fail("Exception while setting the " +
"TIMELINE_SERVICE_STORAGE_DIR_ROOT ");
}
}
if (v1Enabled && v2Enabled) {
yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSION, "1.0");
yarnConf.set(YarnConfiguration.TIMELINE_SERVICE_VERSIONS, "1.0,2.0f");
}
yarnConf.setInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 2);
return yarnConf;
}
// runs test to validate timeline events are published if and only if the
// service is enabled for v1 and v2 (independently).
private void runTest(boolean v1Enabled, boolean v2Enabled) throws Exception {
testSetup(v1Enabled, v2Enabled);
publishEvents(v1Enabled, v2Enabled);
validateV1(v1Enabled);
validateV2(v2Enabled);
testCleanup();
}
@Test(timeout = 10000)
public void testTimelineServiceEventPublishingV1V2Enabled()
throws Exception {
runTest(true, true);
}
@Test(timeout = 10000)
public void testTimelineServiceEventPublishingV1Enabled() throws Exception {
runTest(true, false);
}
@Test(timeout = 10000)
public void testTimelineServiceEventPublishingV2Enabled() throws Exception {
runTest(false, true);
}
@Test(timeout = 10000)
public void testTimelineServiceEventPublishingNoService() throws Exception {
runTest(false, false);
}
private void publishEvents(boolean v1Enabled, boolean v2Enabled) {
long timestamp = (v1Enabled) ? 1 : 2;
int id = (v2Enabled) ? 3 : 4;
ApplicationId appId = ApplicationId.newInstance(timestamp, id);
app = createRMApp(appId);
rmAppsMapInContext.putIfAbsent(appId, app);
if (v2Enabled) {
AppLevelTimelineCollector collector =
new AppLevelTimelineCollector(appId);
rmTimelineCollectorManager.putIfAbsent(appId, collector);
}
appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
RMAppAttempt appAttempt = createRMAppAttempt(true);
metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED,
app, Integer.MAX_VALUE + 2L);
if (v2Enabled) {
dispatcher.await();
}
}
private void validateV1(boolean v1Enabled) throws Exception {
TimelineEntity entity = null;
if (!v1Enabled) {
Thread.sleep(1000);
entity =
store.getEntity(appAttemptId.toString(),
AppAttemptMetricsConstants.ENTITY_TYPE,
EnumSet.allOf(Field.class));
Assert.assertNull(entity);
return;
}
do {
entity =
store.getEntity(appAttemptId.toString(),
AppAttemptMetricsConstants.ENTITY_TYPE,
EnumSet.allOf(Field.class));
Thread.sleep(100);
// ensure two events are both published before leaving the loop
} while (entity == null || entity.getEvents().size() < 2);
boolean hasRegisteredEvent = false;
boolean hasFinishedEvent = false;
for (org.apache.hadoop.yarn.api.records.timeline.TimelineEvent event :
entity.getEvents()) {
if (event.getEventType().equals(
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) {
hasRegisteredEvent = true;
} else if (event.getEventType().equals(
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) {
hasFinishedEvent = true;
Assert.assertEquals(
FinalApplicationStatus.UNDEFINED.toString(),
event.getEventInfo().get(
AppAttemptMetricsConstants.FINAL_STATUS_INFO));
Assert.assertEquals(
YarnApplicationAttemptState.FINISHED.toString(),
event.getEventInfo().get(
AppAttemptMetricsConstants.STATE_INFO));
}
Assert
.assertEquals(appAttemptId.toString(), entity.getEntityId());
}
Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent);
}
private void validateV2(boolean v2Enabled) throws Exception {
String outputDirApp =
getTimelineEntityDir() + "/"
+ TimelineEntityType.YARN_APPLICATION_ATTEMPT + "/";
File entityFolder = new File(outputDirApp);
Assert.assertEquals(v2Enabled, entityFolder.isDirectory());
if (v2Enabled) {
String timelineServiceFileName = appAttemptId.toString()
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File entityFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(entityFile.exists());
long idPrefix = TimelineServiceHelper
.invertLong(appAttemptId.getAttemptId());
verifyEntity(entityFile, 2,
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 0, idPrefix);
}
}
private void verifyEntity(File entityFile, long expectedEvents,
String eventForCreatedTime, long expectedMetrics, long idPrefix)
throws IOException {
BufferedReader reader = null;
String strLine;
long count = 0;
long metricsCount = 0;
try {
reader = new BufferedReader(new FileReader(entityFile));
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().length() > 0) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
entity = FileSystemTimelineReaderImpl
.getTimelineRecordFromJSON(strLine.trim(),
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.class);
metricsCount = entity.getMetrics().size();
assertEquals(idPrefix, entity.getIdPrefix());
for (TimelineEvent event : entity.getEvents()) {
if (event.getId().equals(eventForCreatedTime)) {
assertTrue(entity.getCreatedTime() > 0);
break;
}
}
count++;
}
}
} finally {
reader.close();
}
assertEquals("Expected " + expectedEvents + " events to be published",
expectedEvents, count);
assertEquals("Expected " + expectedMetrics + " metrics is incorrect",
expectedMetrics, metricsCount);
}
private String getTimelineEntityDir() {
String outputDirApp =
testRootDir.getAbsolutePath() + "/"
+ FileSystemTimelineWriterImpl.ENTITIES_DIR + "/"
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/"
+ app.getUser() + "/"
+ app.getName() + "/"
+ TimelineUtils.DEFAULT_FLOW_VERSION + "/"
+ app.getStartTime() + "/"
+ app.getApplicationId();
return outputDirApp;
}
private static RMAppAttempt createRMAppAttempt(boolean unmanagedAMAttempt) {
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
when(appAttempt.getHost()).thenReturn("test host");
when(appAttempt.getRpcPort()).thenReturn(-100);
if (!unmanagedAMAttempt) {
Container container = mock(Container.class);
when(container.getId())
.thenReturn(ContainerId.newContainerId(appAttemptId, 1));
when(appAttempt.getMasterContainer()).thenReturn(container);
}
when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
when(appAttempt.getOriginalTrackingUrl()).thenReturn(
"test original tracking url");
return appAttempt;
}
private static RMApp createRMApp(ApplicationId appId) {
RMApp rmApp = mock(RMAppImpl.class);
when(rmApp.getApplicationId()).thenReturn(appId);
when(rmApp.getName()).thenReturn("test app");
when(rmApp.getApplicationType()).thenReturn("test app type");
when(rmApp.getUser()).thenReturn("testUser");
when(rmApp.getQueue()).thenReturn("test queue");
when(rmApp.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L);
when(rmApp.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L);
when(rmApp.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L);
when(rmApp.getDiagnostics()).thenReturn(
new StringBuilder("test diagnostics info"));
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(
ApplicationAttemptId.newInstance(appId, 1));
when(rmApp.getCurrentAppAttempt()).thenReturn(appAttempt);
when(rmApp.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
Map<String, Long> resourceMap = new HashMap<>();
resourceMap
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
resourceMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
Map<String, Long> preemptedMap = new HashMap<>();
preemptedMap
.put(ResourceInformation.MEMORY_MB.getName(), (long) Integer.MAX_VALUE);
preemptedMap.put(ResourceInformation.VCORES.getName(), Long.MAX_VALUE);
when(rmApp.getRMAppMetrics()).thenReturn(
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, resourceMap,
preemptedMap));
when(rmApp.getApplicationTags()).thenReturn(
Collections.<String> emptySet());
ApplicationSubmissionContext appSubmissionContext =
mock(ApplicationSubmissionContext.class);
when(appSubmissionContext.getPriority())
.thenReturn(Priority.newInstance(0));
ContainerLaunchContext containerLaunchContext =
mock(ContainerLaunchContext.class);
when(containerLaunchContext.getCommands())
.thenReturn(Collections.singletonList("java -Xmx1024m"));
when(appSubmissionContext.getAMContainerSpec())
.thenReturn(containerLaunchContext);
when(rmApp.getApplicationPriority()).thenReturn(Priority.newInstance(10));
when(rmApp.getApplicationSubmissionContext())
.thenReturn(appSubmissionContext);
return rmApp;
}
}

View File

@ -251,6 +251,15 @@ public class MiniYARNCluster extends CompositeService {
useFixedPorts = conf.getBoolean(
YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS);
if (!useFixedPorts) {
String hostname = MiniYARNCluster.getHostname();
conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0");
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
hostname + ":" + ServerSocketUtil.getPort(9188, 10));
}
useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC,
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
@ -808,12 +817,6 @@ public class MiniYARNCluster extends CompositeService {
}
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
MemoryTimelineStateStore.class, TimelineStateStore.class);
if (!useFixedPorts) {
String hostname = MiniYARNCluster.getHostname();
conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, hostname + ":0");
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
hostname + ":" + ServerSocketUtil.getPort(9188, 10));
}
appHistoryServer.init(conf);
super.serviceInit(conf);
}