YARN-2033. Merging generic-history into the Timeline Store (Contributed by Zhijie Shen)

(cherry picked from commit 6b8b1608e6)
This commit is contained in:
junping_du 2014-09-12 10:04:51 +08:00
parent d22cd065ed
commit efa9ab3638
57 changed files with 2848 additions and 142 deletions

View File

@ -50,6 +50,9 @@ Release 2.6.0 - UNRELEASED
YARN-2440. Enabled Nodemanagers to limit the aggregate cpu usage across all
containers to a preconfigured limit. (Varun Vasudev via vinodkv)
YARN-2033. YARN-2033. Merging generic-history into the Timeline Store
(Zhijie Shen via junping_du)
IMPROVEMENTS
YARN-2242. Improve exception information on AM launch crashes. (Li Lu

View File

@ -51,14 +51,15 @@ public abstract class ApplicationAttemptReport {
@Unstable
public static ApplicationAttemptReport newInstance(
ApplicationAttemptId applicationAttemptId, String host, int rpcPort,
String url, String diagnostics, YarnApplicationAttemptState state,
ContainerId amContainerId) {
String url, String oUrl, String diagnostics,
YarnApplicationAttemptState state, ContainerId amContainerId) {
ApplicationAttemptReport report =
Records.newRecord(ApplicationAttemptReport.class);
report.setApplicationAttemptId(applicationAttemptId);
report.setHost(host);
report.setRpcPort(rpcPort);
report.setTrackingUrl(url);
report.setOriginalTrackingUrl(oUrl);
report.setDiagnostics(diagnostics);
report.setYarnApplicationAttemptState(state);
report.setAMContainerId(amContainerId);
@ -135,6 +136,19 @@ public abstract class ApplicationAttemptReport {
@Unstable
public abstract void setTrackingUrl(String url);
/**
* Get the <em>original tracking url</em> for the application attempt.
*
* @return <em>original tracking url</em> for the application attempt
*/
@Public
@Unstable
public abstract String getOriginalTrackingUrl();
@Private
@Unstable
public abstract void setOriginalTrackingUrl(String oUrl);
/**
* Get the <code>ApplicationAttemptId</code> of this attempt of the
* application

View File

@ -316,6 +316,19 @@ public class YarnConfiguration extends Configuration {
public static final int DEFAULT_RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
10;
/**
* The setting that controls whether yarn system metrics is published on the
* timeline server or not by RM.
*/
public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED =
RM_PREFIX + "system-metrics-publisher.enabled";
public static final boolean DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size";
public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
10;
//Delegation token related keys
public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
RM_PREFIX + "delegation.key.update-interval";

View File

@ -202,6 +202,7 @@ message ApplicationAttemptReportProto {
optional string diagnostics = 5 [default = "N/A"];
optional YarnApplicationAttemptStateProto yarn_application_attempt_state = 6;
optional ContainerIdProto am_container_id = 7;
optional string original_tracking_url = 8;
}
enum NodeStateProto {

View File

@ -676,7 +676,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
public ApplicationAttemptReport createFakeApplicationAttemptReport() {
return ApplicationAttemptReport.newInstance(
createFakeApplicationAttemptId(), "localhost", 0, "", "",
createFakeApplicationAttemptId(), "localhost", 0, "", "", "",
YarnApplicationAttemptState.RUNNING, createFakeContainerId());
}

View File

@ -346,6 +346,7 @@ public class TestAHSClient {
"host",
124,
"url",
"oUrl",
"diagnostics",
YarnApplicationAttemptState.FINISHED,
ContainerId.newInstance(
@ -357,6 +358,7 @@ public class TestAHSClient {
"host",
124,
"url",
"oUrl",
"diagnostics",
YarnApplicationAttemptState.FINISHED,
ContainerId.newInstance(

View File

@ -457,6 +457,7 @@ public class TestYarnClient {
"host",
124,
"url",
"oUrl",
"diagnostics",
YarnApplicationAttemptState.FINISHED,
ContainerId.newInstance(
@ -467,6 +468,7 @@ public class TestYarnClient {
"host",
124,
"url",
"oUrl",
"diagnostics",
YarnApplicationAttemptState.FINISHED,
ContainerId.newInstance(

View File

@ -133,7 +133,7 @@ public class TestYarnCLI {
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
applicationId, 1);
ApplicationAttemptReport attemptReport = ApplicationAttemptReport
.newInstance(attemptId, "host", 124, "url", "diagnostics",
.newInstance(attemptId, "host", 124, "url", "oUrl", "diagnostics",
YarnApplicationAttemptState.FINISHED, ContainerId.newInstance(
attemptId, 1));
when(
@ -169,11 +169,11 @@ public class TestYarnCLI {
ApplicationAttemptId attemptId1 = ApplicationAttemptId.newInstance(
applicationId, 2);
ApplicationAttemptReport attemptReport = ApplicationAttemptReport
.newInstance(attemptId, "host", 124, "url", "diagnostics",
.newInstance(attemptId, "host", 124, "url", "oUrl", "diagnostics",
YarnApplicationAttemptState.FINISHED, ContainerId.newInstance(
attemptId, 1));
ApplicationAttemptReport attemptReport1 = ApplicationAttemptReport
.newInstance(attemptId1, "host", 124, "url", "diagnostics",
.newInstance(attemptId1, "host", 124, "url", "oUrl", "diagnostics",
YarnApplicationAttemptState.FINISHED, ContainerId.newInstance(
attemptId1, 1));
List<ApplicationAttemptReport> reports = new ArrayList<ApplicationAttemptReport>();

View File

@ -87,6 +87,15 @@ public class ApplicationAttemptReportPBImpl extends ApplicationAttemptReport {
return p.getTrackingUrl();
}
@Override
public String getOriginalTrackingUrl() {
ApplicationAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasOriginalTrackingUrl()) {
return null;
}
return p.getOriginalTrackingUrl();
}
@Override
public String getDiagnostics() {
ApplicationAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
@ -160,6 +169,16 @@ public class ApplicationAttemptReportPBImpl extends ApplicationAttemptReport {
builder.setTrackingUrl(url);
}
@Override
public void setOriginalTrackingUrl(String oUrl) {
maybeInitBuilder();
if (oUrl == null) {
builder.clearOriginalTrackingUrl();
return;
}
builder.setOriginalTrackingUrl(oUrl);
}
@Override
public void setDiagnostics(String diagnostics) {
maybeInitBuilder();

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.webapp.view;
import java.io.PrintWriter;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.webapp.MimeType;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.WebAppException;
@ -81,4 +82,15 @@ public abstract class HtmlBlock extends TextView implements SubView {
* @param html the block to render
*/
protected abstract void render(Block html);
protected UserGroupInformation getCallerUGI() {
// Check for the authorization.
String remoteUser = request().getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
return callerUGI;
}
}

View File

@ -615,24 +615,32 @@
<value>org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy</value>
</property>
<property>
<description>Number of worker threads that write the history data.</description>
<name>yarn.resourcemanager.history-writer.multi-threaded-dispatcher.pool-size</name>
<value>10</value>
</property>
<property>
<description>The class to use as the configuration provider.
If org.apache.hadoop.yarn.LocalConfigurationProvider is used,
the local configuration will be loaded.
If org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider is used,
the configuration which will be loaded should be uploaded to remote File system first.
</description>>
</description>
<name>yarn.resourcemanager.configuration.provider-class</name>
<value>org.apache.hadoop.yarn.LocalConfigurationProvider</value>
<!-- <value>org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider</value> -->
</property>
<property>
<description>The setting that controls whether yarn system metrics is
published on the timeline server or not by RM.</description>
<name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
<value>false</value>
</property>
<property>
<description>Number of worker threads that send the yarn system metrics
data.</description>
<name>yarn.resourcemanager.system-metrics-publisher.dispatcher.pool-size</name>
<value>10</value>
</property>
<!-- Node Manager Configs -->
<property>
<description>The hostname of the NM.</description>
@ -1318,38 +1326,6 @@
<value>/etc/krb5.keytab</value>
</property>
<property>
<description>Indicate to ResourceManager as well as clients whether
history-service is enabled or not. If enabled, ResourceManager starts
recording historical data that ApplicationHistory service can consume.
Similarly, clients can redirect to the history service when applications
finish if this is enabled.</description>
<name>yarn.timeline-service.generic-application-history.enabled</name>
<value>false</value>
</property>
<property>
<description>URI pointing to the location of the FileSystem path where
the history will be persisted. This must be supplied when using
org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore
as the value for yarn.timeline-service.generic-application-history.store-class</description>
<name>yarn.timeline-service.generic-application-history.fs-history-store.uri</name>
<value>${hadoop.tmp.dir}/yarn/timeline/generic-history</value>
</property>
<property>
<description>T-file compression types used to compress history data.</description>
<name>yarn.timeline-service.generic-application-history.fs-history-store.compression-type</name>
<value>none</value>
</property>
<property>
<description>Store class name for history store, defaulting to file
system store </description>
<name>yarn.timeline-service.generic-application-history.store-class</name>
<value>org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore</value>
</property>
<!-- Other configuration -->
<property>
<description>The interval that the yarn client library uses to poll the

View File

@ -163,7 +163,7 @@ public class ApplicationHistoryManagerImpl extends AbstractService implements
ApplicationAttemptHistoryData appAttemptHistory) {
return ApplicationAttemptReport.newInstance(
appAttemptHistory.getApplicationAttemptId(), appAttemptHistory.getHost(),
appAttemptHistory.getRPCPort(), appAttemptHistory.getTrackingURL(),
appAttemptHistory.getRPCPort(), appAttemptHistory.getTrackingURL(), null,
appAttemptHistory.getDiagnosticsInfo(),
appAttemptHistory.getYarnApplicationAttemptState(),
appAttemptHistory.getMasterContainerId());

View File

@ -0,0 +1,511 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
implements
ApplicationHistoryManager {
private TimelineDataManager timelineDataManager;
private String serverHttpAddress;
public ApplicationHistoryManagerOnTimelineStore(
TimelineDataManager timelineDataManager) {
super(ApplicationHistoryManagerOnTimelineStore.class.getName());
this.timelineDataManager = timelineDataManager;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
serverHttpAddress = WebAppUtils.getHttpSchemePrefix(conf) +
WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
super.serviceInit(conf);
}
@Override
public ApplicationReport getApplication(ApplicationId appId)
throws YarnException, IOException {
return getApplication(appId, ApplicationReportField.ALL);
}
@Override
public Map<ApplicationId, ApplicationReport> getAllApplications()
throws YarnException, IOException {
TimelineEntities entities = timelineDataManager.getEntities(
ApplicationMetricsConstants.ENTITY_TYPE, null, null, null, null,
null, null, Long.MAX_VALUE, EnumSet.allOf(Field.class),
UserGroupInformation.getLoginUser());
Map<ApplicationId, ApplicationReport> apps =
new HashMap<ApplicationId, ApplicationReport>();
if (entities != null && entities.getEntities() != null) {
for (TimelineEntity entity : entities.getEntities()) {
ApplicationReport app =
generateApplicationReport(entity, ApplicationReportField.ALL);
apps.put(app.getApplicationId(), app);
}
}
return apps;
}
@Override
public Map<ApplicationAttemptId, ApplicationAttemptReport>
getApplicationAttempts(ApplicationId appId)
throws YarnException, IOException {
TimelineEntities entities = timelineDataManager.getEntities(
AppAttemptMetricsConstants.ENTITY_TYPE,
new NameValuePair(
AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, appId
.toString()), null, null, null, null, null,
Long.MAX_VALUE, EnumSet.allOf(Field.class),
UserGroupInformation.getLoginUser());
Map<ApplicationAttemptId, ApplicationAttemptReport> appAttempts =
new HashMap<ApplicationAttemptId, ApplicationAttemptReport>();
if (entities != null && entities.getEntities() != null) {
for (TimelineEntity entity : entities.getEntities()) {
ApplicationAttemptReport appAttempt =
convertToApplicationAttemptReport(entity);
appAttempts.put(appAttempt.getApplicationAttemptId(), appAttempt);
}
} else {
// It is likely that the attemtps are not found due to non-existing
// application. In this case, we need to throw ApplicationNotFoundException.
getApplication(appId, ApplicationReportField.NONE);
}
return appAttempts;
}
@Override
public ApplicationAttemptReport getApplicationAttempt(
ApplicationAttemptId appAttemptId) throws YarnException, IOException {
TimelineEntity entity = timelineDataManager.getEntity(
AppAttemptMetricsConstants.ENTITY_TYPE,
appAttemptId.toString(), EnumSet.allOf(Field.class),
UserGroupInformation.getLoginUser());
if (entity == null) {
// Will throw ApplicationNotFoundException first
getApplication(appAttemptId.getApplicationId(), ApplicationReportField.NONE);
throw new ApplicationAttemptNotFoundException(
"The entity for application attempt " + appAttemptId +
" doesn't exist in the timeline store");
} else {
return convertToApplicationAttemptReport(entity);
}
}
@Override
public ContainerReport getContainer(ContainerId containerId)
throws YarnException, IOException {
ApplicationReport app = getApplication(
containerId.getApplicationAttemptId().getApplicationId(),
ApplicationReportField.USER);
TimelineEntity entity = timelineDataManager.getEntity(
ContainerMetricsConstants.ENTITY_TYPE,
containerId.toString(), EnumSet.allOf(Field.class),
UserGroupInformation.getLoginUser());
if (entity == null) {
throw new ContainerNotFoundException(
"The entity for container " + containerId +
" doesn't exist in the timeline store");
} else {
return convertToContainerReport(entity, serverHttpAddress, app.getUser());
}
}
@Override
public ContainerReport getAMContainer(ApplicationAttemptId appAttemptId)
throws YarnException, IOException {
ApplicationAttemptReport appAttempt = getApplicationAttempt(appAttemptId);
return getContainer(appAttempt.getAMContainerId());
}
@Override
public Map<ContainerId, ContainerReport> getContainers(
ApplicationAttemptId appAttemptId) throws YarnException, IOException {
ApplicationReport app = getApplication(
appAttemptId.getApplicationId(), ApplicationReportField.USER);
TimelineEntities entities = timelineDataManager.getEntities(
ContainerMetricsConstants.ENTITY_TYPE,
new NameValuePair(
ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
appAttemptId.toString()), null, null, null,
null, null, Long.MAX_VALUE, EnumSet.allOf(Field.class),
UserGroupInformation.getLoginUser());
Map<ContainerId, ContainerReport> containers =
new HashMap<ContainerId, ContainerReport>();
if (entities != null && entities.getEntities() != null) {
for (TimelineEntity entity : entities.getEntities()) {
ContainerReport container =
convertToContainerReport(entity, serverHttpAddress, app.getUser());
containers.put(container.getContainerId(), container);
}
}
return containers;
}
private static ApplicationReport convertToApplicationReport(
TimelineEntity entity, ApplicationReportField field) {
String user = null;
String queue = null;
String name = null;
String type = null;
long createdTime = 0;
long finishedTime = 0;
ApplicationAttemptId latestApplicationAttemptId = null;
String diagnosticsInfo = null;
FinalApplicationStatus finalStatus = FinalApplicationStatus.UNDEFINED;
YarnApplicationState state = null;
if (field == ApplicationReportField.NONE) {
return ApplicationReport.newInstance(
ConverterUtils.toApplicationId(entity.getEntityId()),
latestApplicationAttemptId, user, queue, name, null, -1, null, state,
diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null,
null, 1.0F, type, null);
}
Map<String, Object> entityInfo = entity.getOtherInfo();
if (entityInfo != null) {
if (entityInfo.containsKey(ApplicationMetricsConstants.USER_ENTITY_INFO)) {
user =
entityInfo.get(ApplicationMetricsConstants.USER_ENTITY_INFO)
.toString();
}
if (field == ApplicationReportField.USER) {
return ApplicationReport.newInstance(
ConverterUtils.toApplicationId(entity.getEntityId()),
latestApplicationAttemptId, user, queue, name, null, -1, null, state,
diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null,
null, 1.0F, type, null);
}
if (entityInfo.containsKey(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)) {
queue =
entityInfo.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)
.toString();
}
if (entityInfo.containsKey(ApplicationMetricsConstants.NAME_ENTITY_INFO)) {
name =
entityInfo.get(ApplicationMetricsConstants.NAME_ENTITY_INFO)
.toString();
}
if (entityInfo.containsKey(ApplicationMetricsConstants.TYPE_ENTITY_INFO)) {
type =
entityInfo.get(ApplicationMetricsConstants.TYPE_ENTITY_INFO)
.toString();
}
}
List<TimelineEvent> events = entity.getEvents();
if (events != null) {
for (TimelineEvent event : events) {
if (event.getEventType().equals(
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
createdTime = event.getTimestamp();
} else if (event.getEventType().equals(
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
finishedTime = event.getTimestamp();
Map<String, Object> eventInfo = event.getEventInfo();
if (eventInfo == null) {
continue;
}
if (eventInfo
.containsKey(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO)) {
latestApplicationAttemptId =
ConverterUtils
.toApplicationAttemptId(
eventInfo
.get(
ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO)
.toString());
}
if (eventInfo
.containsKey(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)) {
diagnosticsInfo =
eventInfo.get(
ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)
.toString();
}
if (eventInfo
.containsKey(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)) {
finalStatus =
FinalApplicationStatus.valueOf(eventInfo.get(
ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)
.toString());
}
if (eventInfo
.containsKey(ApplicationMetricsConstants.STATE_EVENT_INFO)) {
state =
YarnApplicationState.valueOf(eventInfo.get(
ApplicationMetricsConstants.STATE_EVENT_INFO).toString());
}
}
}
}
return ApplicationReport.newInstance(
ConverterUtils.toApplicationId(entity.getEntityId()),
latestApplicationAttemptId, user, queue, name, null, -1, null, state,
diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null,
null, 1.0F, type, null);
}
private static ApplicationAttemptReport convertToApplicationAttemptReport(
TimelineEntity entity) {
String host = null;
int rpcPort = -1;
ContainerId amContainerId = null;
String trackingUrl = null;
String originalTrackingUrl = null;
String diagnosticsInfo = null;
YarnApplicationAttemptState state = null;
List<TimelineEvent> events = entity.getEvents();
if (events != null) {
for (TimelineEvent event : events) {
if (event.getEventType().equals(
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) {
Map<String, Object> eventInfo = event.getEventInfo();
if (eventInfo == null) {
continue;
}
if (eventInfo.containsKey(AppAttemptMetricsConstants.HOST_EVENT_INFO)) {
host =
eventInfo.get(AppAttemptMetricsConstants.HOST_EVENT_INFO)
.toString();
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO)) {
rpcPort = (Integer) eventInfo.get(
AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO);
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO)) {
amContainerId =
ConverterUtils.toContainerId(eventInfo.get(
AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO)
.toString());
}
} else if (event.getEventType().equals(
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) {
Map<String, Object> eventInfo = event.getEventInfo();
if (eventInfo == null) {
continue;
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO)) {
trackingUrl =
eventInfo.get(
AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO)
.toString();
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO)) {
originalTrackingUrl =
eventInfo
.get(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO)
.toString();
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)) {
diagnosticsInfo =
eventInfo.get(
AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)
.toString();
}
if (eventInfo
.containsKey(AppAttemptMetricsConstants.STATE_EVENT_INFO)) {
state =
YarnApplicationAttemptState.valueOf(eventInfo.get(
AppAttemptMetricsConstants.STATE_EVENT_INFO)
.toString());
}
}
}
}
return ApplicationAttemptReport.newInstance(
ConverterUtils.toApplicationAttemptId(entity.getEntityId()),
host, rpcPort, trackingUrl, originalTrackingUrl, diagnosticsInfo,
state, amContainerId);
}
private static ContainerReport convertToContainerReport(
TimelineEntity entity, String serverHttpAddress, String user) {
int allocatedMem = 0;
int allocatedVcore = 0;
String allocatedHost = null;
int allocatedPort = -1;
int allocatedPriority = 0;
long createdTime = 0;
long finishedTime = 0;
String diagnosticsInfo = null;
int exitStatus = ContainerExitStatus.INVALID;
ContainerState state = null;
Map<String, Object> entityInfo = entity.getOtherInfo();
if (entityInfo != null) {
if (entityInfo
.containsKey(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO)) {
allocatedMem = (Integer) entityInfo.get(
ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO);
}
if (entityInfo
.containsKey(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO)) {
allocatedVcore = (Integer) entityInfo.get(
ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO);
}
if (entityInfo
.containsKey(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO)) {
allocatedHost =
entityInfo
.get(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO)
.toString();
}
if (entityInfo
.containsKey(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO)) {
allocatedPort = (Integer) entityInfo.get(
ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO);
}
if (entityInfo
.containsKey(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO)) {
allocatedPriority = (Integer) entityInfo.get(
ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO);
}
}
List<TimelineEvent> events = entity.getEvents();
if (events != null) {
for (TimelineEvent event : events) {
if (event.getEventType().equals(
ContainerMetricsConstants.CREATED_EVENT_TYPE)) {
createdTime = event.getTimestamp();
} else if (event.getEventType().equals(
ContainerMetricsConstants.FINISHED_EVENT_TYPE)) {
finishedTime = event.getTimestamp();
Map<String, Object> eventInfo = event.getEventInfo();
if (eventInfo == null) {
continue;
}
if (eventInfo
.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)) {
diagnosticsInfo =
eventInfo.get(
ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)
.toString();
}
if (eventInfo
.containsKey(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO)) {
exitStatus = (Integer) eventInfo.get(
ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO);
}
if (eventInfo
.containsKey(ContainerMetricsConstants.STATE_EVENT_INFO)) {
state =
ContainerState.valueOf(eventInfo.get(
ContainerMetricsConstants.STATE_EVENT_INFO).toString());
}
}
}
}
NodeId allocatedNode = NodeId.newInstance(allocatedHost, allocatedPort);
ContainerId containerId =
ConverterUtils.toContainerId(entity.getEntityId());
String logUrl = WebAppUtils.getAggregatedLogURL(
serverHttpAddress,
allocatedNode.toString(),
containerId.toString(),
containerId.toString(),
user);
return ContainerReport.newInstance(
ConverterUtils.toContainerId(entity.getEntityId()),
Resource.newInstance(allocatedMem, allocatedVcore),
NodeId.newInstance(allocatedHost, allocatedPort),
Priority.newInstance(allocatedPriority),
createdTime, finishedTime, diagnosticsInfo, logUrl, exitStatus, state);
}
private ApplicationReport generateApplicationReport(TimelineEntity entity,
ApplicationReportField field) throws YarnException, IOException {
ApplicationReport app = convertToApplicationReport(entity, field);
if (field == ApplicationReportField.ALL &&
app != null && app.getCurrentApplicationAttemptId() != null) {
ApplicationAttemptReport appAttempt =
getApplicationAttempt(app.getCurrentApplicationAttemptId());
if (appAttempt != null) {
app.setHost(appAttempt.getHost());
app.setRpcPort(appAttempt.getRpcPort());
app.setTrackingUrl(appAttempt.getTrackingUrl());
app.setOriginalTrackingUrl(appAttempt.getOriginalTrackingUrl());
}
}
return app;
}
private ApplicationReport getApplication(ApplicationId appId,
ApplicationReportField field) throws YarnException, IOException {
TimelineEntity entity = timelineDataManager.getEntity(
ApplicationMetricsConstants.ENTITY_TYPE,
appId.toString(), EnumSet.allOf(Field.class),
UserGroupInformation.getLoginUser());
if (entity == null) {
throw new ApplicationNotFoundException("The entity for application " +
appId + " doesn't exist in the timeline store");
} else {
return generateApplicationReport(entity, field);
}
}
private static enum ApplicationReportField {
ALL, // retrieve all the fields
NONE, // retrieve no fields
USER // retrieve user info only
}
}

View File

@ -170,7 +170,16 @@ public class ApplicationHistoryServer extends CompositeService {
private ApplicationHistoryManager createApplicationHistoryManager(
Configuration conf) {
return new ApplicationHistoryManagerImpl();
// Backward compatibility:
// APPLICATION_HISTORY_STORE is neither null nor empty, it means that the
// user has enabled it explicitly.
if (conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE) == null ||
conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE).length() == 0) {
return new ApplicationHistoryManagerOnTimelineStore(timelineDataManager);
} else {
LOG.warn("The filesystem based application history store is deprecated.");
return new ApplicationHistoryManagerImpl();
}
}
private TimelineStore createTimelineStore(

View File

@ -117,7 +117,8 @@ public class FileSystemApplicationHistoryStore extends AbstractService
@Override
public void serviceInit(Configuration conf) throws Exception {
Path fsWorkingPath =
new Path(conf.get(YarnConfiguration.FS_APPLICATION_HISTORY_STORE_URI));
new Path(conf.get(YarnConfiguration.FS_APPLICATION_HISTORY_STORE_URI,
conf.get("hadoop.tmp.dir") + "/yarn/timeline/generic-history"));
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
try {
fs = getFileSystem(fsWorkingPath, conf);

View File

@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelineP
* they may encounter reading and writing history data in different memory
* store.
*
* The methods are synchronized to avoid concurrent modification on the memory.
*
*/
@Private
@Unstable
@ -65,7 +67,7 @@ public class MemoryTimelineStore
}
@Override
public TimelineEntities getEntities(String entityType, Long limit,
public synchronized TimelineEntities getEntities(String entityType, Long limit,
Long windowStart, Long windowEnd, String fromId, Long fromTs,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields) {
@ -148,7 +150,7 @@ public class MemoryTimelineStore
}
@Override
public TimelineEntity getEntity(String entityId, String entityType,
public synchronized TimelineEntity getEntity(String entityId, String entityType,
EnumSet<Field> fieldsToRetrieve) {
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.allOf(Field.class);
@ -162,7 +164,7 @@ public class MemoryTimelineStore
}
@Override
public TimelineEvents getEntityTimelines(String entityType,
public synchronized TimelineEvents getEntityTimelines(String entityType,
SortedSet<String> entityIds, Long limit, Long windowStart,
Long windowEnd,
Set<String> eventTypes) {
@ -209,7 +211,7 @@ public class MemoryTimelineStore
}
@Override
public TimelinePutResponse put(TimelineEntities data) {
public synchronized TimelinePutResponse put(TimelineEntities data) {
TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : data.getEntities()) {
EntityIdentifier entityId =

View File

@ -64,6 +64,7 @@ public class TestApplicationHistoryClientService extends
WebAppUtils.getAHSWebAppURLWithoutScheme(config) +
"/applicationhistory/logs/localhost:0/container_0_0001_01_000001/" +
"container_0_0001_01_000001/test user";
config.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true);
config.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE,
MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
historyServer.init(config);

View File

@ -0,0 +1,317 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestApplicationHistoryManagerOnTimelineStore {
private static ApplicationHistoryManagerOnTimelineStore historyManager;
private static final int SCALE = 5;
@BeforeClass
public static void setup() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
TimelineStore store = new MemoryTimelineStore();
prepareTimelineStore(store);
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
TimelineDataManager dataManager =
new TimelineDataManager(store, aclsManager);
historyManager = new ApplicationHistoryManagerOnTimelineStore(dataManager);
historyManager.init(conf);
historyManager.start();
}
@AfterClass
public static void tearDown() {
if (historyManager != null) {
historyManager.stop();
}
}
private static void prepareTimelineStore(TimelineStore store)
throws Exception {
for (int i = 1; i <= SCALE; ++i) {
TimelineEntities entities = new TimelineEntities();
ApplicationId appId = ApplicationId.newInstance(0, i);
entities.addEntity(createApplicationTimelineEntity(appId));
store.put(entities);
for (int j = 1; j <= SCALE; ++j) {
entities = new TimelineEntities();
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, j);
entities.addEntity(createAppAttemptTimelineEntity(appAttemptId));
store.put(entities);
for (int k = 1; k <= SCALE; ++k) {
entities = new TimelineEntities();
ContainerId containerId = ContainerId.newInstance(appAttemptId, k);
entities.addEntity(createContainerEntity(containerId));
store.put(entities);
}
}
}
}
@Test
public void testGetApplicationReport() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationReport app = historyManager.getApplication(appId);
Assert.assertNotNull(app);
Assert.assertEquals(appId, app.getApplicationId());
Assert.assertEquals("test app", app.getName());
Assert.assertEquals("test app type", app.getApplicationType());
Assert.assertEquals("test user", app.getUser());
Assert.assertEquals("test queue", app.getQueue());
Assert.assertEquals(Integer.MAX_VALUE + 2L, app.getStartTime());
Assert.assertEquals(Integer.MAX_VALUE + 3L, app.getFinishTime());
Assert.assertTrue(Math.abs(app.getProgress() - 1.0F) < 0.0001);
Assert.assertEquals("test host", app.getHost());
Assert.assertEquals(-100, app.getRpcPort());
Assert.assertEquals("test tracking url", app.getTrackingUrl());
Assert.assertEquals("test original tracking url",
app.getOriginalTrackingUrl());
Assert.assertEquals("test diagnostics info", app.getDiagnostics());
Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
app.getFinalApplicationStatus());
Assert.assertEquals(YarnApplicationState.FINISHED,
app.getYarnApplicationState());
}
@Test
public void testGetApplicationAttemptReport() throws Exception {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
ApplicationAttemptReport appAttempt =
historyManager.getApplicationAttempt(appAttemptId);
Assert.assertNotNull(appAttempt);
Assert.assertEquals(appAttemptId, appAttempt.getApplicationAttemptId());
Assert.assertEquals(ContainerId.newInstance(appAttemptId, 1),
appAttempt.getAMContainerId());
Assert.assertEquals("test host", appAttempt.getHost());
Assert.assertEquals(-100, appAttempt.getRpcPort());
Assert.assertEquals("test tracking url", appAttempt.getTrackingUrl());
Assert.assertEquals("test original tracking url",
appAttempt.getOriginalTrackingUrl());
Assert.assertEquals("test diagnostics info", appAttempt.getDiagnostics());
Assert.assertEquals(YarnApplicationAttemptState.FINISHED,
appAttempt.getYarnApplicationAttemptState());
}
@Test
public void testGetContainerReport() throws Exception {
ContainerId containerId =
ContainerId.newInstance(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1), 1);
ContainerReport container = historyManager.getContainer(containerId);
Assert.assertNotNull(container);
Assert.assertEquals(Integer.MAX_VALUE + 1L, container.getCreationTime());
Assert.assertEquals(Integer.MAX_VALUE + 2L, container.getFinishTime());
Assert.assertEquals(Resource.newInstance(-1, -1),
container.getAllocatedResource());
Assert.assertEquals(NodeId.newInstance("test host", -100),
container.getAssignedNode());
Assert.assertEquals(Priority.UNDEFINED, container.getPriority());
Assert
.assertEquals("test diagnostics info", container.getDiagnosticsInfo());
Assert.assertEquals(ContainerState.COMPLETE, container.getContainerState());
Assert.assertEquals(-1, container.getContainerExitStatus());
Assert.assertEquals("http://0.0.0.0:8188/applicationhistory/logs/" +
"test host:-100/container_0_0001_01_000001/"
+ "container_0_0001_01_000001/test user", container.getLogUrl());
}
@Test
public void testGetApplications() throws Exception {
Collection<ApplicationReport> apps =
historyManager.getAllApplications().values();
Assert.assertNotNull(apps);
Assert.assertEquals(SCALE, apps.size());
}
@Test
public void testGetApplicationAttempts() throws Exception {
Collection<ApplicationAttemptReport> appAttempts =
historyManager.getApplicationAttempts(ApplicationId.newInstance(0, 1))
.values();
Assert.assertNotNull(appAttempts);
Assert.assertEquals(SCALE, appAttempts.size());
}
@Test
public void testGetContainers() throws Exception {
Collection<ContainerReport> containers =
historyManager
.getContainers(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1)).values();
Assert.assertNotNull(containers);
Assert.assertEquals(SCALE, containers.size());
}
@Test
public void testGetAMContainer() throws Exception {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
ContainerReport container = historyManager.getAMContainer(appAttemptId);
Assert.assertNotNull(container);
Assert.assertEquals(appAttemptId, container.getContainerId()
.getApplicationAttemptId());
}
private static TimelineEntity createApplicationTimelineEntity(
ApplicationId appId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
entity.setEntityId(appId.toString());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, "test app");
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
"test app type");
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, "test user");
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, "test queue");
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
Integer.MAX_VALUE + 1L);
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 2L);
entity.addEvent(tEvent);
tEvent = new TimelineEvent();
tEvent.setEventType(
ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 3L);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
"test diagnostics info");
eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
FinalApplicationStatus.UNDEFINED.toString());
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
YarnApplicationState.FINISHED.toString());
eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
ApplicationAttemptId.newInstance(appId, 1));
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
return entity;
}
private static TimelineEntity createAppAttemptTimelineEntity(
ApplicationAttemptId appAttemptId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(AppAttemptMetricsConstants.ENTITY_TYPE);
entity.setEntityId(appAttemptId.toString());
entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
appAttemptId.getApplicationId().toString());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 1L);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
"test tracking url");
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
"test original tracking url");
eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, "test host");
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, -100);
eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
ContainerId.newInstance(appAttemptId, 1));
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
tEvent = new TimelineEvent();
tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 2L);
eventInfo = new HashMap<String, Object>();
eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
"test tracking url");
eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
"test original tracking url");
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
"test diagnostics info");
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
FinalApplicationStatus.UNDEFINED.toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO,
YarnApplicationAttemptState.FINISHED.toString());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
return entity;
}
private static TimelineEntity createContainerEntity(ContainerId containerId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(ContainerMetricsConstants.ENTITY_TYPE);
entity.setEntityId(containerId.toString());
entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
containerId.getApplicationAttemptId().toString());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, -1);
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, -1);
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
"test host");
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, -100);
entityInfo
.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, -1);
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 1L);
entity.addEvent(tEvent);
;
tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 2L);
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
"test diagnostics info");
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, -1);
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
ContainerState.COMPLETE.toString());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
return entity;
}
}

View File

@ -21,17 +21,14 @@ package org.apache.hadoop.yarn.server.api;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
@Public
@Unstable
public interface ApplicationContext {
/**
* This method returns Application {@link ApplicationReport} for the specified
@ -40,21 +37,21 @@ public interface ApplicationContext {
* @param appId
*
* @return {@link ApplicationReport} for the ApplicationId.
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
ApplicationReport getApplication(ApplicationId appId) throws IOException;
ApplicationReport getApplication(ApplicationId appId)
throws YarnException, IOException;
/**
* This method returns all Application {@link ApplicationReport}s
*
* @return map of {@link ApplicationId} to {@link ApplicationReport}s.
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
Map<ApplicationId, ApplicationReport> getAllApplications() throws IOException;
Map<ApplicationId, ApplicationReport> getAllApplications()
throws YarnException, IOException;
/**
* Application can have multiple application attempts
@ -64,12 +61,11 @@ public interface ApplicationContext {
* @param appId
*
* @return all {@link ApplicationAttemptReport}s for the Application.
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
Map<ApplicationAttemptId, ApplicationAttemptReport> getApplicationAttempts(
ApplicationId appId) throws IOException;
ApplicationId appId) throws YarnException, IOException;
/**
* This method returns {@link ApplicationAttemptReport} for specified
@ -78,12 +74,11 @@ public interface ApplicationContext {
* @param appAttemptId
* {@link ApplicationAttemptId}
* @return {@link ApplicationAttemptReport} for ApplicationAttemptId
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
ApplicationAttemptReport getApplicationAttempt(
ApplicationAttemptId appAttemptId) throws IOException;
ApplicationAttemptId appAttemptId) throws YarnException, IOException;
/**
* This method returns {@link ContainerReport} for specified
@ -92,11 +87,11 @@ public interface ApplicationContext {
* @param containerId
* {@link ContainerId}
* @return {@link ContainerReport} for ContainerId
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
ContainerReport getContainer(ContainerId containerId) throws IOException;
ContainerReport getContainer(ContainerId containerId)
throws YarnException, IOException;
/**
* This method returns {@link ContainerReport} for specified
@ -105,12 +100,11 @@ public interface ApplicationContext {
* @param appAttemptId
* {@link ApplicationAttemptId}
* @return {@link ContainerReport} for ApplicationAttemptId
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
ContainerReport getAMContainer(ApplicationAttemptId appAttemptId)
throws IOException;
throws YarnException, IOException;
/**
* This method returns Map of {@link ContainerId} to {@link ContainerReport}
@ -120,10 +114,9 @@ public interface ApplicationContext {
* {@link ApplicationAttemptId}
* @return Map of {@link ContainerId} to {@link ContainerReport} for
* ApplicationAttemptId
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
Map<ContainerId, ContainerReport> getContainers(
ApplicationAttemptId appAttemptId) throws IOException;
ApplicationAttemptId appAttemptId) throws YarnException, IOException;
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.metrics;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@Private
@Unstable
public class AppAttemptMetricsConstants {
public static final String ENTITY_TYPE =
"YARN_APPLICATION_ATTEMPT";
public static final String REGISTERED_EVENT_TYPE =
"YARN_APPLICATION_ATTEMPT_REGISTERED";
public static final String FINISHED_EVENT_TYPE =
"YARN_APPLICATION_ATTEMPT_FINISHED";
public static final String PARENT_PRIMARY_FILTER =
"YARN_APPLICATION_ATTEMPT_PARENT";
public static final String TRACKING_URL_EVENT_INFO =
"YARN_APPLICATION_ATTEMPT_TRACKING_URL";
public static final String ORIGINAL_TRACKING_URL_EVENT_INFO =
"YARN_APPLICATION_ATTEMPT_ORIGINAL_TRACKING_URL";
public static final String HOST_EVENT_INFO =
"YARN_APPLICATION_ATTEMPT_HOST";
public static final String RPC_PORT_EVENT_INFO =
"YARN_APPLICATION_ATTEMPT_RPC_PORT";
public static final String MASTER_CONTAINER_EVENT_INFO =
"YARN_APPLICATION_ATTEMPT_MASTER_CONTAINER";
public static final String DIAGNOSTICS_INFO_EVENT_INFO =
"YARN_APPLICATION_ATTEMPT_DIAGNOSTICS_INFO";
public static final String FINAL_STATUS_EVENT_INFO =
"YARN_APPLICATION_ATTEMPT_FINAL_STATUS";
public static final String STATE_EVENT_INFO =
"YARN_APPLICATION_ATTEMPT_STATE";
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.metrics;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@Private
@Unstable
public class ApplicationMetricsConstants {
public static final String ENTITY_TYPE =
"YARN_APPLICATION";
public static final String CREATED_EVENT_TYPE =
"YARN_APPLICATION_CREATED";
public static final String FINISHED_EVENT_TYPE =
"YARN_APPLICATION_FINISHED";
public static final String NAME_ENTITY_INFO =
"YARN_APPLICATION_NAME";
public static final String TYPE_ENTITY_INFO =
"YARN_APPLICATION_TYPE";
public static final String USER_ENTITY_INFO =
"YARN_APPLICATION_USER";
public static final String QUEUE_ENTITY_INFO =
"YARN_APPLICATION_QUEUE";
public static final String SUBMITTED_TIME_ENTITY_INFO =
"YARN_APPLICATION_SUBMITTED_TIME";
public static final String DIAGNOSTICS_INFO_EVENT_INFO =
"YARN_APPLICATION_DIAGNOSTICS_INFO";
public static final String FINAL_STATUS_EVENT_INFO =
"YARN_APPLICATION_FINAL_STATUS";
public static final String STATE_EVENT_INFO =
"YARN_APPLICATION_STATE";
public static final String LATEST_APP_ATTEMPT_EVENT_INFO =
"YARN_APPLICATION_LATEST_APP_ATTEMPT";
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.metrics;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@Private
@Unstable
public class ContainerMetricsConstants {
public static final String ENTITY_TYPE = "YARN_CONTAINER";
public static final String CREATED_EVENT_TYPE = "YARN_CONTAINER_CREATED";
public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED";
public static final String PARENT_PRIMARIY_FILTER = "YARN_CONTAINER_PARENT";
public static final String ALLOCATED_MEMORY_ENTITY_INFO =
"YARN_CONTAINER_ALLOCATED_MEMORY";
public static final String ALLOCATED_VCORE_ENTITY_INFO =
"YARN_CONTAINER_ALLOCATED_VCORE";
public static final String ALLOCATED_HOST_ENTITY_INFO =
"YARN_CONTAINER_ALLOCATED_HOST";
public static final String ALLOCATED_PORT_ENTITY_INFO =
"YARN_CONTAINER_ALLOCATED_PORT";
public static final String ALLOCATED_PRIORITY_ENTITY_INFO =
"YARN_CONTAINER_ALLOCATED_PRIORITY";
public static final String DIAGNOSTICS_INFO_EVENT_INFO =
"YARN_CONTAINER_DIAGNOSTICS_INFO";
public static final String EXIT_STATUS_EVENT_INFO =
"YARN_CONTAINER_EXIT_STATUS";
public static final String STATE_EVENT_INFO =
"YARN_CONTAINER_STATE";
}

View File

@ -20,12 +20,13 @@ package org.apache.hadoop.yarn.server.webapp;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ATTEMPT_ID;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ContainerReport;
@ -67,10 +68,22 @@ public class AppAttemptBlock extends HtmlBlock {
return;
}
final ApplicationAttemptId appAttemptIdFinal = appAttemptId;
UserGroupInformation callerUGI = getCallerUGI();
ApplicationAttemptReport appAttemptReport;
try {
appAttemptReport = appContext.getApplicationAttempt(appAttemptId);
} catch (IOException e) {
if (callerUGI == null) {
appAttemptReport = appContext.getApplicationAttempt(appAttemptId);
} else {
appAttemptReport = callerUGI.doAs(
new PrivilegedExceptionAction<ApplicationAttemptReport> () {
@Override
public ApplicationAttemptReport run() throws Exception {
return appContext.getApplicationAttempt(appAttemptIdFinal);
}
});
}
} catch (Exception e) {
String message =
"Failed to read the application attempt " + appAttemptId + ".";
LOG.error(message, e);
@ -108,8 +121,26 @@ public class AppAttemptBlock extends HtmlBlock {
Collection<ContainerReport> containers;
try {
containers = appContext.getContainers(appAttemptId).values();
} catch (IOException e) {
if (callerUGI == null) {
containers = appContext.getContainers(appAttemptId).values();
} else {
containers = callerUGI.doAs(
new PrivilegedExceptionAction<Collection<ContainerReport>> () {
@Override
public Collection<ContainerReport> run() throws Exception {
return appContext.getContainers(appAttemptIdFinal).values();
}
});
}
} catch (RuntimeException e) {
// have this block to suppress the findbugs warning
html
.p()
._(
"Sorry, Failed to get containers for application attempt" + attemptid
+ ".")._();
return;
} catch (Exception e) {
html
.p()
._(

View File

@ -21,10 +21,11 @@ package org.apache.hadoop.yarn.server.webapp;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -70,10 +71,22 @@ public class AppBlock extends HtmlBlock {
return;
}
final ApplicationId appIDFinal = appID;
UserGroupInformation callerUGI = getCallerUGI();
ApplicationReport appReport;
try {
appReport = appContext.getApplication(appID);
} catch (IOException e) {
if (callerUGI == null) {
appReport = appContext.getApplication(appID);
} else {
appReport = callerUGI.doAs(
new PrivilegedExceptionAction<ApplicationReport> () {
@Override
public ApplicationReport run() throws Exception {
return appContext.getApplication(appIDFinal);
}
});
}
} catch (Exception e) {
String message = "Failed to read the application " + appID + ".";
LOG.error(message, e);
html.p()._(message)._();
@ -106,8 +119,18 @@ public class AppBlock extends HtmlBlock {
Collection<ApplicationAttemptReport> attempts;
try {
attempts = appContext.getApplicationAttempts(appID).values();
} catch (IOException e) {
if (callerUGI == null) {
attempts = appContext.getApplicationAttempts(appID).values();
} else {
attempts = callerUGI.doAs(
new PrivilegedExceptionAction<Collection<ApplicationAttemptReport>> () {
@Override
public Collection<ApplicationAttemptReport> run() throws Exception {
return appContext.getApplicationAttempts(appIDFinal).values();
}
});
}
} catch (Exception e) {
String message =
"Failed to read the attempts of the application " + appID + ".";
LOG.error(message, e);
@ -122,14 +145,24 @@ public class AppBlock extends HtmlBlock {
._()._().tbody();
StringBuilder attemptsTableData = new StringBuilder("[\n");
for (ApplicationAttemptReport appAttemptReport : attempts) {
for (final ApplicationAttemptReport appAttemptReport : attempts) {
AppAttemptInfo appAttempt = new AppAttemptInfo(appAttemptReport);
ContainerReport containerReport;
try {
containerReport =
appContext.getAMContainer(appAttemptReport
if (callerUGI == null) {
containerReport = appContext.getAMContainer(appAttemptReport
.getApplicationAttemptId());
} catch (IOException e) {
} else {
containerReport = callerUGI.doAs(
new PrivilegedExceptionAction<ContainerReport> () {
@Override
public ContainerReport run() throws Exception {
return appContext.getAMContainer(appAttemptReport
.getApplicationAttemptId());
}
});
}
} catch (Exception e) {
String message =
"Failed to read the AM container of the application attempt "
+ appAttemptReport.getApplicationAttemptId() + ".";

View File

@ -23,11 +23,12 @@ import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_STATE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR_VALUE;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.HashSet;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.api.ApplicationContext;
@ -70,10 +71,21 @@ public class AppsBlock extends HtmlBlock {
}
}
UserGroupInformation callerUGI = getCallerUGI();
Collection<ApplicationReport> appReports;
try {
appReports = appContext.getAllApplications().values();
} catch (IOException e) {
if (callerUGI == null) {
appReports = appContext.getAllApplications().values();
} else {
appReports = callerUGI.doAs(
new PrivilegedExceptionAction<Collection<ApplicationReport>> () {
@Override
public Collection<ApplicationReport> run() throws Exception {
return appContext.getAllApplications().values();
}
});
}
} catch (Exception e) {
String message = "Failed to read the applications.";
LOG.error(message, e);
html.p()._(message)._();
@ -86,7 +98,7 @@ public class AppsBlock extends HtmlBlock {
continue;
}
AppInfo app = new AppInfo(appReport);
String percent = String.format("%.1f", app.getProgress());
String percent = String.format("%.1f", app.getProgress() * 100.0F);
// AppID numerical value parsed by parseHadoopID in yarn.dt.plugins.js
appsTableData
.append("[\"<a href='")

View File

@ -20,10 +20,11 @@ package org.apache.hadoop.yarn.server.webapp;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
@ -63,10 +64,22 @@ public class ContainerBlock extends HtmlBlock {
return;
}
final ContainerId containerIdFinal = containerId;
UserGroupInformation callerUGI = getCallerUGI();
ContainerReport containerReport;
try {
containerReport = appContext.getContainer(containerId);
} catch (IOException e) {
if (callerUGI == null) {
containerReport = appContext.getContainer(containerId);
} else {
containerReport = callerUGI.doAs(
new PrivilegedExceptionAction<ContainerReport> () {
@Override
public ContainerReport run() throws Exception {
return appContext.getContainer(containerIdFinal);
}
});
}
} catch (Exception e) {
String message = "Failed to read the container " + containerid + ".";
LOG.error(message, e);
html.p()._(message)._();

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.yarn.server.webapp;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
@ -28,6 +28,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.WebApplicationException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -60,6 +61,7 @@ public class WebServices {
String userQuery, String queueQuery, String count, String startedBegin,
String startedEnd, String finishBegin, String finishEnd,
Set<String> applicationTypes) {
UserGroupInformation callerUGI = getUser(req);
long num = 0;
boolean checkCount = false;
boolean checkStart = false;
@ -137,8 +139,18 @@ public class WebServices {
AppsInfo allApps = new AppsInfo();
Collection<ApplicationReport> appReports = null;
try {
appReports = appContext.getAllApplications().values();
} catch (IOException e) {
if (callerUGI == null) {
appReports = appContext.getAllApplications().values();
} else {
appReports = callerUGI.doAs(
new PrivilegedExceptionAction<Collection<ApplicationReport>> () {
@Override
public Collection<ApplicationReport> run() throws Exception {
return appContext.getAllApplications().values();
}
});
}
} catch (Exception e) {
throw new WebApplicationException(e);
}
for (ApplicationReport appReport : appReports) {
@ -193,11 +205,22 @@ public class WebServices {
public AppInfo getApp(HttpServletRequest req, HttpServletResponse res,
String appId) {
ApplicationId id = parseApplicationId(appId);
UserGroupInformation callerUGI = getUser(req);
final ApplicationId id = parseApplicationId(appId);
ApplicationReport app = null;
try {
app = appContext.getApplication(id);
} catch (IOException e) {
if (callerUGI == null) {
app = appContext.getApplication(id);
} else {
app = callerUGI.doAs(
new PrivilegedExceptionAction<ApplicationReport> () {
@Override
public ApplicationReport run() throws Exception {
return appContext.getApplication(id);
}
});
}
} catch (Exception e) {
throw new WebApplicationException(e);
}
if (app == null) {
@ -208,11 +231,22 @@ public class WebServices {
public AppAttemptsInfo getAppAttempts(HttpServletRequest req,
HttpServletResponse res, String appId) {
ApplicationId id = parseApplicationId(appId);
UserGroupInformation callerUGI = getUser(req);
final ApplicationId id = parseApplicationId(appId);
Collection<ApplicationAttemptReport> appAttemptReports = null;
try {
appAttemptReports = appContext.getApplicationAttempts(id).values();
} catch (IOException e) {
if (callerUGI == null) {
appAttemptReports = appContext.getApplicationAttempts(id).values();
} else {
appAttemptReports = callerUGI.doAs(
new PrivilegedExceptionAction<Collection<ApplicationAttemptReport>> () {
@Override
public Collection<ApplicationAttemptReport> run() throws Exception {
return appContext.getApplicationAttempts(id).values();
}
});
}
} catch (Exception e) {
throw new WebApplicationException(e);
}
AppAttemptsInfo appAttemptsInfo = new AppAttemptsInfo();
@ -226,13 +260,24 @@ public class WebServices {
public AppAttemptInfo getAppAttempt(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
UserGroupInformation callerUGI = getUser(req);
ApplicationId aid = parseApplicationId(appId);
ApplicationAttemptId aaid = parseApplicationAttemptId(appAttemptId);
final ApplicationAttemptId aaid = parseApplicationAttemptId(appAttemptId);
validateIds(aid, aaid, null);
ApplicationAttemptReport appAttempt = null;
try {
appAttempt = appContext.getApplicationAttempt(aaid);
} catch (IOException e) {
if (callerUGI == null) {
appAttempt = appContext.getApplicationAttempt(aaid);
} else {
appAttempt = callerUGI.doAs(
new PrivilegedExceptionAction<ApplicationAttemptReport> () {
@Override
public ApplicationAttemptReport run() throws Exception {
return appContext.getApplicationAttempt(aaid);
}
});
}
} catch (Exception e) {
throw new WebApplicationException(e);
}
if (appAttempt == null) {
@ -244,13 +289,24 @@ public class WebServices {
public ContainersInfo getContainers(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId) {
UserGroupInformation callerUGI = getUser(req);
ApplicationId aid = parseApplicationId(appId);
ApplicationAttemptId aaid = parseApplicationAttemptId(appAttemptId);
final ApplicationAttemptId aaid = parseApplicationAttemptId(appAttemptId);
validateIds(aid, aaid, null);
Collection<ContainerReport> containerReports = null;
try {
containerReports = appContext.getContainers(aaid).values();
} catch (IOException e) {
if (callerUGI == null) {
containerReports = appContext.getContainers(aaid).values();
} else {
containerReports = callerUGI.doAs(
new PrivilegedExceptionAction<Collection<ContainerReport>> () {
@Override
public Collection<ContainerReport> run() throws Exception {
return appContext.getContainers(aaid).values();
}
});
}
} catch (Exception e) {
throw new WebApplicationException(e);
}
ContainersInfo containersInfo = new ContainersInfo();
@ -264,14 +320,25 @@ public class WebServices {
public ContainerInfo getContainer(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId,
String containerId) {
UserGroupInformation callerUGI = getUser(req);
ApplicationId aid = parseApplicationId(appId);
ApplicationAttemptId aaid = parseApplicationAttemptId(appAttemptId);
ContainerId cid = parseContainerId(containerId);
final ContainerId cid = parseContainerId(containerId);
validateIds(aid, aaid, cid);
ContainerReport container = null;
try {
container = appContext.getContainer(cid);
} catch (IOException e) {
if (callerUGI == null) {
container = appContext.getContainer(cid);
} else {
container = callerUGI.doAs(
new PrivilegedExceptionAction<ContainerReport> () {
@Override
public ContainerReport run() throws Exception {
return appContext.getContainer(cid);
}
});
}
} catch (Exception e) {
throw new WebApplicationException(e);
}
if (container == null) {
@ -364,4 +431,14 @@ public class WebServices {
throw new NotFoundException("appAttemptId and containerId don't match");
}
}
protected static UserGroupInformation getUser(HttpServletRequest req) {
String remoteUser = req.getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
return callerUGI;
}
}

View File

@ -33,6 +33,7 @@ public class AppAttemptInfo {
protected String host;
protected int rpcPort;
protected String trackingUrl;
protected String originalTrackingUrl;
protected String diagnosticsInfo;
protected YarnApplicationAttemptState appAttemptState;
protected String amContainerId;
@ -46,6 +47,7 @@ public class AppAttemptInfo {
host = appAttempt.getHost();
rpcPort = appAttempt.getRpcPort();
trackingUrl = appAttempt.getTrackingUrl();
originalTrackingUrl = appAttempt.getOriginalTrackingUrl();
diagnosticsInfo = appAttempt.getDiagnostics();
appAttemptState = appAttempt.getYarnApplicationAttemptState();
if (appAttempt.getAMContainerId() != null) {
@ -69,6 +71,10 @@ public class AppAttemptInfo {
return trackingUrl;
}
public String getOriginalTrackingUrl() {
return originalTrackingUrl;
}
public String getDiagnosticsInfo() {
return diagnosticsInfo;
}

View File

@ -23,7 +23,6 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.Times;
@ -50,8 +49,6 @@ public class AppInfo {
protected long startedTime;
protected long finishedTime;
protected long elapsedTime;
protected int allocatedMB;
protected int allocatedVCores;
public AppInfo() {
// JAXB needs this
@ -77,12 +74,6 @@ public class AppInfo {
finishedTime = app.getFinishTime();
elapsedTime = Times.elapsed(startedTime, finishedTime);
finalAppStatus = app.getFinalApplicationStatus();
ApplicationResourceUsageReport usage =
app.getApplicationResourceUsageReport();
if (usage != null) {
allocatedMB = usage.getUsedResources().getMemory();
allocatedVCores = usage.getUsedResources().getVirtualCores();
}
progress = app.getProgress();
}
@ -158,12 +149,4 @@ public class AppInfo {
return elapsedTime;
}
public int getAllocatedMB() {
return allocatedMB;
}
public int getAllocatedVCores() {
return allocatedVCores;
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@ -98,6 +99,10 @@ public interface RMContext {
void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter);
void setSystemMetricsPublisher(SystemMetricsPublisher systemMetricsPublisher);
SystemMetricsPublisher getSystemMetricsPublisher();
ConfigurationProvider getConfigurationProvider();
boolean isWorkPreservingRecoveryEnabled();

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -81,6 +82,7 @@ public class RMContextImpl implements RMContext {
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private ConfigurationProvider configurationProvider;
private int epoch;
@ -345,6 +347,17 @@ public class RMContextImpl implements RMContext {
return rmApplicationHistoryWriter;
}
@Override
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher;
}
@Override
public SystemMetricsPublisher getSystemMetricsPublisher() {
return systemMetricsPublisher;
}
@Override
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
@ -306,6 +307,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new RMApplicationHistoryWriter();
}
protected SystemMetricsPublisher createSystemMetricsPublisher() {
return new SystemMetricsPublisher();
}
// sanity check for configurations
protected static void validateConfigs(Configuration conf) {
// validate max-attempts
@ -409,6 +414,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
addService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);

View File

@ -85,13 +85,17 @@ public class RMApplicationHistoryWriter extends CompositeService {
conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED);
writer = createApplicationHistoryStore(conf);
addIfService(writer);
dispatcher = createDispatcher(conf);
dispatcher.register(WritingHistoryEventType.class,
new ForwardingEventHandler());
addIfService(dispatcher);
// Only create the services when the history service is enabled, preventing
// wasting the system resources.
if (historyServiceEnabled) {
writer = createApplicationHistoryStore(conf);
addIfService(writer);
dispatcher = createDispatcher(conf);
dispatcher.register(WritingHistoryEventType.class,
new ForwardingEventHandler());
addIfService(dispatcher);
}
super.serviceInit(conf);
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
public class AppAttemptFinishedEvent extends
SystemMetricsEvent {
private ApplicationAttemptId appAttemptId;
private String trackingUrl;
private String originalTrackingUrl;
private String diagnosticsInfo;
private FinalApplicationStatus appStatus;
private YarnApplicationAttemptState state;
public AppAttemptFinishedEvent(
ApplicationAttemptId appAttemptId,
String trackingUrl,
String originalTrackingUrl,
String diagnosticsInfo,
FinalApplicationStatus appStatus,
YarnApplicationAttemptState state,
long finishedTime) {
super(SystemMetricsEventType.APP_ATTEMPT_FINISHED, finishedTime);
this.appAttemptId = appAttemptId;
// This is the tracking URL after the application attempt is finished
this.trackingUrl = trackingUrl;
this.originalTrackingUrl = originalTrackingUrl;
this.diagnosticsInfo = diagnosticsInfo;
this.appStatus = appStatus;
this.state = state;
}
@Override
public int hashCode() {
return appAttemptId.getApplicationId().hashCode();
}
public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptId;
}
public String getTrackingUrl() {
return trackingUrl;
}
public String getOriginalTrackingURL() {
return originalTrackingUrl;
}
public String getDiagnosticsInfo() {
return diagnosticsInfo;
}
public FinalApplicationStatus getFinalApplicationStatus() {
return appStatus;
}
public YarnApplicationAttemptState getYarnApplicationAttemptState() {
return state;
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
public class AppAttemptRegisteredEvent extends
SystemMetricsEvent {
private ApplicationAttemptId appAttemptId;
private String host;
private int rpcPort;
private String trackingUrl;
private String originalTrackingUrl;
private ContainerId masterContainerId;
public AppAttemptRegisteredEvent(
ApplicationAttemptId appAttemptId,
String host,
int rpcPort,
String trackingUrl,
String originalTrackingUrl,
ContainerId masterContainerId,
long registeredTime) {
super(SystemMetricsEventType.APP_ATTEMPT_REGISTERED, registeredTime);
this.appAttemptId = appAttemptId;
this.host = host;
this.rpcPort = rpcPort;
// This is the tracking URL after the application attempt is registered
this.trackingUrl = trackingUrl;
this.originalTrackingUrl = originalTrackingUrl;
this.masterContainerId = masterContainerId;
}
@Override
public int hashCode() {
return appAttemptId.getApplicationId().hashCode();
}
public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptId;
}
public String getHost() {
return host;
}
public int getRpcPort() {
return rpcPort;
}
public String getTrackingUrl() {
return trackingUrl;
}
public String getOriginalTrackingURL() {
return originalTrackingUrl;
}
public ContainerId getMasterContainerId() {
return masterContainerId;
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class ApplicationCreatedEvent extends
SystemMetricsEvent {
private ApplicationId appId;
private String name;
private String type;
private String user;
private String queue;
private long submittedTime;
public ApplicationCreatedEvent(ApplicationId appId,
String name,
String type,
String user,
String queue,
long submittedTime,
long createdTime) {
super(SystemMetricsEventType.APP_CREATED, createdTime);
this.appId = appId;
this.name = name;
this.type = type;
this.user = user;
this.queue = queue;
this.submittedTime = submittedTime;
}
@Override
public int hashCode() {
return appId.hashCode();
}
public ApplicationId getApplicationId() {
return appId;
}
public String getApplicationName() {
return name;
}
public String getApplicationType() {
return type;
}
public String getUser() {
return user;
}
public String getQueue() {
return queue;
}
public long getSubmittedTime() {
return submittedTime;
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
public class ApplicationFinishedEvent extends
SystemMetricsEvent {
private ApplicationId appId;;
private String diagnosticsInfo;
private FinalApplicationStatus appStatus;
private YarnApplicationState state;
private ApplicationAttemptId latestAppAttemptId;
public ApplicationFinishedEvent(
ApplicationId appId,
String diagnosticsInfo,
FinalApplicationStatus appStatus,
YarnApplicationState state,
ApplicationAttemptId latestAppAttemptId,
long finishedTime) {
super(SystemMetricsEventType.APP_FINISHED, finishedTime);
this.appId = appId;
this.diagnosticsInfo = diagnosticsInfo;
this.appStatus = appStatus;
this.latestAppAttemptId = latestAppAttemptId;
this.state = state;
}
@Override
public int hashCode() {
return appId.hashCode();
}
public ApplicationId getApplicationId() {
return appId;
}
public String getDiagnosticsInfo() {
return diagnosticsInfo;
}
public FinalApplicationStatus getFinalApplicationStatus() {
return appStatus;
}
public YarnApplicationState getYarnApplicationState() {
return state;
}
public ApplicationAttemptId getLatestApplicationAttemptId() {
return latestAppAttemptId;
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
public class ContainerCreatedEvent extends SystemMetricsEvent {
private ContainerId containerId;
private Resource allocatedResource;
private NodeId allocatedNode;
private Priority allocatedPriority;
public ContainerCreatedEvent(
ContainerId containerId,
Resource allocatedResource,
NodeId allocatedNode,
Priority allocatedPriority,
long createdTime) {
super(SystemMetricsEventType.CONTAINER_CREATED, createdTime);
this.containerId = containerId;
this.allocatedResource = allocatedResource;
this.allocatedNode = allocatedNode;
this.allocatedPriority = allocatedPriority;
}
@Override
public int hashCode() {
return containerId.getApplicationAttemptId().getApplicationId().hashCode();
}
public ContainerId getContainerId() {
return containerId;
}
public Resource getAllocatedResource() {
return allocatedResource;
}
public NodeId getAllocatedNode() {
return allocatedNode;
}
public Priority getAllocatedPriority() {
return allocatedPriority;
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
public class ContainerFinishedEvent extends SystemMetricsEvent {
private ContainerId containerId;
private String diagnosticsInfo;
private int containerExitStatus;
private ContainerState state;
public ContainerFinishedEvent(
ContainerId containerId,
String diagnosticsInfo,
int containerExitStatus,
ContainerState state,
long finishedTime) {
super(SystemMetricsEventType.CONTAINER_FINISHED, finishedTime);
this.containerId = containerId;
this.diagnosticsInfo = diagnosticsInfo;
this.containerExitStatus = containerExitStatus;
this.state = state;
}
@Override
public int hashCode() {
return containerId.getApplicationAttemptId().getApplicationId().hashCode();
}
public ContainerId getContainerId() {
return containerId;
}
public String getDiagnosticsInfo() {
return diagnosticsInfo;
}
public int getContainerExitStatus() {
return containerExitStatus;
}
public ContainerState getContainerState() {
return state;
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.event.AbstractEvent;
public class SystemMetricsEvent extends AbstractEvent<SystemMetricsEventType> {
public SystemMetricsEvent(SystemMetricsEventType type) {
super(type);
}
public SystemMetricsEvent(SystemMetricsEventType type, long timestamp) {
super(type, timestamp);
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
public enum SystemMetricsEventType {
// app events
APP_CREATED,
APP_FINISHED,
// app attempt events
APP_ATTEMPT_REGISTERED,
APP_ATTEMPT_FINISHED,
// container events
CONTAINER_CREATED,
CONTAINER_FINISHED
}

View File

@ -0,0 +1,490 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@Private
@Unstable
public class SystemMetricsPublisher extends CompositeService {
private static final Log LOG = LogFactory
.getLog(SystemMetricsPublisher.class);
private static final int MAX_GET_TIMELINE_DELEGATION_TOKEN_ATTEMPTS = 10;
private Dispatcher dispatcher;
private TimelineClient client;
private boolean publishSystemMetrics;
private int getTimelineDelegtionTokenAttempts = 0;
private boolean hasReceivedTimelineDelegtionToken = false;
public SystemMetricsPublisher() {
super(SystemMetricsPublisher.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
publishSystemMetrics =
conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) &&
conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED);
if (publishSystemMetrics) {
client = TimelineClient.createTimelineClient();
addIfService(client);
dispatcher = createDispatcher(conf);
dispatcher.register(SystemMetricsEventType.class,
new ForwardingEventHandler());
addIfService(dispatcher);
LOG.info("YARN system metrics publishing service is enabled");
} else {
LOG.info("YARN system metrics publishing service is not enabled");
}
super.serviceInit(conf);
}
@SuppressWarnings("unchecked")
public void appCreated(RMApp app, long createdTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicationCreatedEvent(
app.getApplicationId(),
app.getName(),
app.getApplicationType(),
app.getUser(),
app.getQueue(),
app.getSubmitTime(),
createdTime));
}
}
@SuppressWarnings("unchecked")
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicationFinishedEvent(
app.getApplicationId(),
app.getDiagnostics().toString(),
app.getFinalApplicationStatus(),
RMServerUtils.createApplicationState(state),
app.getCurrentAppAttempt() == null ?
null : app.getCurrentAppAttempt().getAppAttemptId(),
finishedTime));
}
}
@SuppressWarnings("unchecked")
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new AppAttemptRegisteredEvent(
appAttempt.getAppAttemptId(),
appAttempt.getHost(),
appAttempt.getRpcPort(),
appAttempt.getTrackingUrl(),
appAttempt.getOriginalTrackingUrl(),
appAttempt.getMasterContainer().getId(),
registeredTime));
}
}
@SuppressWarnings("unchecked")
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState state, long finishedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new AppAttemptFinishedEvent(
appAttempt.getAppAttemptId(),
appAttempt.getTrackingUrl(),
appAttempt.getOriginalTrackingUrl(),
appAttempt.getDiagnostics(),
appAttempt.getFinalApplicationStatus(),
RMServerUtils.createApplicationAttemptState(state),
finishedTime));
}
}
@SuppressWarnings("unchecked")
public void containerCreated(RMContainer container, long createdTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ContainerCreatedEvent(
container.getContainerId(),
container.getAllocatedResource(),
container.getAllocatedNode(),
container.getAllocatedPriority(),
createdTime));
}
}
@SuppressWarnings("unchecked")
public void containerFinished(RMContainer container, long finishedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ContainerFinishedEvent(
container.getContainerId(),
container.getDiagnosticsInfo(),
container.getContainerExitStatus(),
container.getContainerState(),
finishedTime));
}
}
protected Dispatcher createDispatcher(Configuration conf) {
MultiThreadedDispatcher dispatcher =
new MultiThreadedDispatcher(
conf.getInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
dispatcher.setDrainEventsOnStop();
return dispatcher;
}
protected void handleSystemMetricsEvent(
SystemMetricsEvent event) {
switch (event.getType()) {
case APP_CREATED:
publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
break;
case APP_FINISHED:
publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
break;
case APP_ATTEMPT_REGISTERED:
publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
break;
case APP_ATTEMPT_FINISHED:
publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
break;
case CONTAINER_CREATED:
publishContainerCreatedEvent((ContainerCreatedEvent) event);
break;
case CONTAINER_FINISHED:
publishContainerFinishedEvent((ContainerFinishedEvent) event);
break;
default:
LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
}
}
private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
TimelineEntity entity =
createApplicationEntity(event.getApplicationId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
event.getApplicationName());
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
event.getApplicationType());
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
event.getUser());
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
event.getQueue());
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
event.getSubmittedTime());
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(
ApplicationMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
TimelineEntity entity =
createApplicationEntity(event.getApplicationId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(
ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
event.getFinalApplicationStatus().toString());
eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
event.getYarnApplicationState().toString());
if (event.getLatestApplicationAttemptId() != null) {
eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
event.getLatestApplicationAttemptId().toString());
}
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private static TimelineEntity createApplicationEntity(
ApplicationId applicationId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
entity.setEntityId(applicationId.toString());
return entity;
}
private void
publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
TimelineEntity entity =
createAppAttemptEntity(event.getApplicationAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(
AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
event.getTrackingUrl());
eventInfo.put(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
event.getOriginalTrackingURL());
eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
event.getHost());
eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
event.getRpcPort());
eventInfo.put(
AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
event.getMasterContainerId().toString());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
TimelineEntity entity =
createAppAttemptEntity(event.getApplicationAttemptId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(
AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
event.getTrackingUrl());
eventInfo.put(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
event.getOriginalTrackingURL());
eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
event.getFinalApplicationStatus().toString());
eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO,
event.getYarnApplicationAttemptState().toString());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private static TimelineEntity createAppAttemptEntity(
ApplicationAttemptId appAttemptId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(
AppAttemptMetricsConstants.ENTITY_TYPE);
entity.setEntityId(appAttemptId.toString());
entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
appAttemptId.getApplicationId().toString());
return entity;
}
private void publishContainerCreatedEvent(ContainerCreatedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
event.getAllocatedResource().getMemory());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
event.getAllocatedResource().getVirtualCores());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
event.getAllocatedNode().getHost());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
event.getAllocatedNode().getPort());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
event.getAllocatedPriority().getPriority());
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
putEntity(entity);
}
private void publishContainerFinishedEvent(ContainerFinishedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
Map<String, Object> eventInfo = new HashMap<String, Object>();
eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
event.getDiagnosticsInfo());
eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
event.getContainerExitStatus());
eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
event.getContainerState().toString());
tEvent.setEventInfo(eventInfo);
entity.addEvent(tEvent);
putEntity(entity);
}
private static TimelineEntity createContainerEntity(
ContainerId containerId) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(
ContainerMetricsConstants.ENTITY_TYPE);
entity.setEntityId(containerId.toString());
entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
containerId.getApplicationAttemptId().toString());
return entity;
}
private void putEntity(TimelineEntity entity) {
if (UserGroupInformation.isSecurityEnabled()) {
if (!hasReceivedTimelineDelegtionToken
&& getTimelineDelegtionTokenAttempts < MAX_GET_TIMELINE_DELEGATION_TOKEN_ATTEMPTS) {
try {
Token<TimelineDelegationTokenIdentifier> token =
client.getDelegationToken(
UserGroupInformation.getCurrentUser().getUserName());
UserGroupInformation.getCurrentUser().addToken(token);
hasReceivedTimelineDelegtionToken = true;
} catch (Exception e) {
LOG.error("Error happens when getting timeline delegation token", e);
} finally {
++getTimelineDelegtionTokenAttempts;
if (!hasReceivedTimelineDelegtionToken
&& getTimelineDelegtionTokenAttempts == MAX_GET_TIMELINE_DELEGATION_TOKEN_ATTEMPTS) {
LOG.error("Run out of the attempts to get timeline delegation token. " +
"Use kerberos authentication only.");
}
}
}
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Publishing the entity " + entity.getEntityId() +
", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
client.putEntities(entity);
} catch (Exception e) {
LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
+ entity.getEntityId() + "]", e);
}
}
/**
* EventHandler implementation which forward events to SystemMetricsPublisher.
* Making use of it, SystemMetricsPublisher can avoid to have a public handle
* method.
*/
private final class ForwardingEventHandler implements
EventHandler<SystemMetricsEvent> {
@Override
public void handle(SystemMetricsEvent event) {
handleSystemMetricsEvent(event);
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected static class MultiThreadedDispatcher extends CompositeService
implements Dispatcher {
private List<AsyncDispatcher> dispatchers =
new ArrayList<AsyncDispatcher>();
public MultiThreadedDispatcher(int num) {
super(MultiThreadedDispatcher.class.getName());
for (int i = 0; i < num; ++i) {
AsyncDispatcher dispatcher = createDispatcher();
dispatchers.add(dispatcher);
addIfService(dispatcher);
}
}
@Override
public EventHandler getEventHandler() {
return new CompositEventHandler();
}
@Override
public void register(Class<? extends Enum> eventType, EventHandler handler) {
for (AsyncDispatcher dispatcher : dispatchers) {
dispatcher.register(eventType, handler);
}
}
public void setDrainEventsOnStop() {
for (AsyncDispatcher dispatcher : dispatchers) {
dispatcher.setDrainEventsOnStop();
}
}
private class CompositEventHandler implements EventHandler<Event> {
@Override
public void handle(Event event) {
// Use hashCode (of ApplicationId) to dispatch the event to the child
// dispatcher, such that all the writing events of one application will
// be handled by one thread, the scheduled order of the these events
// will be preserved
int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
dispatchers.get(index).getEventHandler().handle(event);
}
}
protected AsyncDispatcher createDispatcher() {
return new AsyncDispatcher();
}
}
}

View File

@ -171,6 +171,12 @@ public interface RMApp extends EventHandler<RMAppEvent> {
*/
String getTrackingUrl();
/**
* The original tracking url for the application master.
* @return the original tracking url for the application master.
*/
String getOriginalTrackingUrl();
/**
* the diagnostics information for the application master.
* @return the diagnostics information for the application master.

View File

@ -365,6 +365,7 @@ public class RMAppImpl implements RMApp, Recoverable {
this.stateMachine = stateMachineFactory.make(this);
rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
rmContext.getSystemMetricsPublisher().appCreated(this, startTime);
}
@Override
@ -627,6 +628,20 @@ public class RMAppImpl implements RMApp, Recoverable {
}
}
@Override
public String getOriginalTrackingUrl() {
this.readLock.lock();
try {
if (this.currentAttempt != null) {
return this.currentAttempt.getOriginalTrackingUrl();
}
return null;
} finally {
this.readLock.unlock();
}
}
@Override
public StringBuilder getDiagnostics() {
this.readLock.lock();
@ -1096,6 +1111,8 @@ public class RMAppImpl implements RMApp, Recoverable {
app.rmContext.getRMApplicationHistoryWriter()
.applicationFinished(app, finalState);
app.rmContext.getSystemMetricsPublisher()
.appFinished(app, finalState, app.finishTime);
};
}

View File

@ -1156,6 +1156,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.rmContext.getRMApplicationHistoryWriter()
.applicationAttemptFinished(appAttempt, finalAttemptState);
appAttempt.rmContext.getSystemMetricsPublisher()
.appAttemptFinished(
appAttempt, finalAttemptState, System.currentTimeMillis());
}
}
@ -1269,6 +1272,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.rmContext.getRMApplicationHistoryWriter()
.applicationAttemptStarted(appAttempt);
appAttempt.rmContext.getSystemMetricsPublisher()
.appAttemptRegistered(appAttempt, System.currentTimeMillis());
}
}
@ -1723,8 +1728,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
masterContainer == null ? null : masterContainer.getId();
attemptReport = ApplicationAttemptReport.newInstance(this
.getAppAttemptId(), this.getHost(), this.getRpcPort(), this
.getTrackingUrl(), this.getDiagnostics(), YarnApplicationAttemptState
.valueOf(this.getState().toString()), amId);
.getTrackingUrl(), this.getOriginalTrackingUrl(), this.getDiagnostics(),
YarnApplicationAttemptState .valueOf(this.getState().toString()), amId);
} finally {
this.readLock.unlock();
}

View File

@ -192,6 +192,8 @@ public class RMContainerImpl implements RMContainer {
this.writeLock = lock.writeLock();
rmContext.getRMApplicationHistoryWriter().containerStarted(this);
rmContext.getSystemMetricsPublisher().containerCreated(
this, this.creationTime);
}
@Override
@ -497,6 +499,8 @@ public class RMContainerImpl implements RMContainer {
container.rmContext.getRMApplicationHistoryWriter().containerFinished(
container);
container.rmContext.getSystemMetricsPublisher().containerFinished(
container, container.finishTime);
}
private static void updateAttemptMetrics(RMContainerImpl container) {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
@ -119,6 +120,8 @@ public class TestAppManager{
}
};
((RMContextImpl)context).setStateStore(mock(RMStateStore.class));
((RMContextImpl)context).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
return context;
}

View File

@ -104,6 +104,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -1081,6 +1082,8 @@ public class TestClientRMService {
.thenThrow(new IOException("queue does not exist"));
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
yarnScheduler);
when(rmContext.getRMApps()).thenReturn(apps);

View File

@ -117,6 +117,10 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public String getOriginalTrackingUrl() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getMaxAppAttempts() {
throw new UnsupportedOperationException("Not supported yet.");
}

View File

@ -0,0 +1,355 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.EnumSet;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestSystemMetricsPublisher {
private static ApplicationHistoryServer timelineServer;
private static SystemMetricsPublisher metricsPublisher;
private static TimelineStore store;
@BeforeClass
public static void setup() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class);
conf.setInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
2);
timelineServer = new ApplicationHistoryServer();
timelineServer.init(conf);
timelineServer.start();
store = timelineServer.getTimelineStore();
metricsPublisher = new SystemMetricsPublisher();
metricsPublisher.init(conf);
metricsPublisher.start();
}
@AfterClass
public static void tearDown() throws Exception {
if (metricsPublisher != null) {
metricsPublisher.stop();
}
if (timelineServer != null) {
timelineServer.stop();
}
AHSWebApp.resetInstance();
}
@Test(timeout = 10000)
public void testPublishApplicationMetrics() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
RMApp app = createRMApp(appId);
metricsPublisher.appCreated(app, app.getStartTime());
metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
TimelineEntity entity = null;
do {
entity =
store.getEntity(appId.toString(),
ApplicationMetricsConstants.ENTITY_TYPE,
EnumSet.allOf(Field.class));
// ensure two events are both published before leaving the loop
} while (entity == null || entity.getEvents().size() < 2);
// verify all the fields
Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE,
entity.getEntityType());
Assert
.assertEquals(app.getApplicationId().toString(), entity.getEntityId());
Assert
.assertEquals(
app.getName(),
entity.getOtherInfo().get(
ApplicationMetricsConstants.NAME_ENTITY_INFO));
Assert.assertEquals(app.getQueue(),
entity.getOtherInfo()
.get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO));
Assert
.assertEquals(
app.getUser(),
entity.getOtherInfo().get(
ApplicationMetricsConstants.USER_ENTITY_INFO));
Assert
.assertEquals(
app.getApplicationType(),
entity.getOtherInfo().get(
ApplicationMetricsConstants.TYPE_ENTITY_INFO));
Assert.assertEquals(app.getSubmitTime(),
entity.getOtherInfo().get(
ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO));
boolean hasCreatedEvent = false;
boolean hasFinishedEvent = false;
for (TimelineEvent event : entity.getEvents()) {
if (event.getEventType().equals(
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
hasCreatedEvent = true;
Assert.assertEquals(app.getStartTime(), event.getTimestamp());
} else if (event.getEventType().equals(
ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
hasFinishedEvent = true;
Assert.assertEquals(app.getFinishTime(), event.getTimestamp());
Assert.assertEquals(
app.getDiagnostics().toString(),
event.getEventInfo().get(
ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO));
Assert.assertEquals(
app.getFinalApplicationStatus().toString(),
event.getEventInfo().get(
ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO));
Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event
.getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO));
}
}
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent);
}
@Test(timeout = 10000)
public void testPublishAppAttemptMetrics() throws Exception {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId);
metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED,
Integer.MAX_VALUE + 2L);
TimelineEntity entity = null;
do {
entity =
store.getEntity(appAttemptId.toString(),
AppAttemptMetricsConstants.ENTITY_TYPE,
EnumSet.allOf(Field.class));
// ensure two events are both published before leaving the loop
} while (entity == null || entity.getEvents().size() < 2);
// verify all the fields
Assert.assertEquals(AppAttemptMetricsConstants.ENTITY_TYPE,
entity.getEntityType());
Assert.assertEquals(appAttemptId.toString(), entity.getEntityId());
Assert.assertEquals(
appAttemptId.getApplicationId().toString(),
entity.getPrimaryFilters()
.get(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER).iterator()
.next());
boolean hasRegisteredEvent = false;
boolean hasFinishedEvent = false;
for (TimelineEvent event : entity.getEvents()) {
if (event.getEventType().equals(
AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) {
hasRegisteredEvent = true;
Assert.assertEquals(appAttempt.getHost(),
event.getEventInfo()
.get(AppAttemptMetricsConstants.HOST_EVENT_INFO));
Assert
.assertEquals(appAttempt.getRpcPort(),
event.getEventInfo().get(
AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO));
Assert.assertEquals(
appAttempt.getMasterContainer().getId().toString(),
event.getEventInfo().get(
AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO));
} else if (event.getEventType().equals(
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) {
hasFinishedEvent = true;
Assert.assertEquals(appAttempt.getDiagnostics(), event.getEventInfo()
.get(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO));
Assert.assertEquals(appAttempt.getTrackingUrl(), event.getEventInfo()
.get(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO));
Assert.assertEquals(
appAttempt.getOriginalTrackingUrl(),
event.getEventInfo().get(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO));
Assert.assertEquals(
appAttempt.getFinalApplicationStatus().toString(),
event.getEventInfo().get(
AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO));
Assert.assertEquals(
YarnApplicationAttemptState.FINISHED.toString(),
event.getEventInfo().get(
AppAttemptMetricsConstants.STATE_EVENT_INFO));
}
}
Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent);
}
@Test(timeout = 10000)
public void testPublishContainerMetrics() throws Exception {
ContainerId containerId =
ContainerId.newInstance(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1), 1);
RMContainer container = createRMContainer(containerId);
metricsPublisher.containerCreated(container, container.getCreationTime());
metricsPublisher.containerFinished(container, container.getFinishTime());
TimelineEntity entity = null;
do {
entity =
store.getEntity(containerId.toString(),
ContainerMetricsConstants.ENTITY_TYPE,
EnumSet.allOf(Field.class));
// ensure two events are both published before leaving the loop
} while (entity == null || entity.getEvents().size() < 2);
// verify all the fields
Assert.assertEquals(ContainerMetricsConstants.ENTITY_TYPE,
entity.getEntityType());
Assert.assertEquals(containerId.toString(), entity.getEntityId());
Assert.assertEquals(
containerId.getApplicationAttemptId().toString(),
entity.getPrimaryFilters()
.get(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER).iterator()
.next());
Assert.assertEquals(
container.getAllocatedNode().getHost(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO));
Assert.assertEquals(
container.getAllocatedNode().getPort(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO));
Assert.assertEquals(
container.getAllocatedResource().getMemory(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO));
Assert.assertEquals(
container.getAllocatedResource().getVirtualCores(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO));
Assert.assertEquals(
container.getAllocatedPriority().getPriority(),
entity.getOtherInfo().get(
ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO));
boolean hasCreatedEvent = false;
boolean hasFinishedEvent = false;
for (TimelineEvent event : entity.getEvents()) {
if (event.getEventType().equals(
ContainerMetricsConstants.CREATED_EVENT_TYPE)) {
hasCreatedEvent = true;
Assert.assertEquals(container.getCreationTime(), event.getTimestamp());
} else if (event.getEventType().equals(
ContainerMetricsConstants.FINISHED_EVENT_TYPE)) {
hasFinishedEvent = true;
Assert.assertEquals(container.getFinishTime(), event.getTimestamp());
Assert.assertEquals(
container.getDiagnosticsInfo(),
event.getEventInfo().get(
ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO));
Assert.assertEquals(
container.getContainerExitStatus(),
event.getEventInfo().get(
ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO));
Assert.assertEquals(container.getContainerState().toString(), event
.getEventInfo().get(ContainerMetricsConstants.STATE_EVENT_INFO));
}
}
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent);
}
private static RMApp createRMApp(ApplicationId appId) {
RMApp app = mock(RMApp.class);
when(app.getApplicationId()).thenReturn(appId);
when(app.getName()).thenReturn("test app");
when(app.getApplicationType()).thenReturn("test app type");
when(app.getUser()).thenReturn("test user");
when(app.getQueue()).thenReturn("test queue");
when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L);
when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L);
when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L);
when(app.getDiagnostics()).thenReturn(
new StringBuilder("test diagnostics info"));
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(
ApplicationAttemptId.newInstance(appId, 1));
when(app.getCurrentAppAttempt()).thenReturn(appAttempt);
when(app.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
return app;
}
private static RMAppAttempt createRMAppAttempt(
ApplicationAttemptId appAttemptId) {
RMAppAttempt appAttempt = mock(RMAppAttempt.class);
when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
when(appAttempt.getHost()).thenReturn("test host");
when(appAttempt.getRpcPort()).thenReturn(-100);
Container container = mock(Container.class);
when(container.getId())
.thenReturn(ContainerId.newInstance(appAttemptId, 1));
when(appAttempt.getMasterContainer()).thenReturn(container);
when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
when(appAttempt.getOriginalTrackingUrl()).thenReturn(
"test original tracking url");
when(appAttempt.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
return appAttempt;
}
private static RMContainer createRMContainer(ContainerId containerId) {
RMContainer container = mock(RMContainer.class);
when(container.getContainerId()).thenReturn(containerId);
when(container.getAllocatedNode()).thenReturn(
NodeId.newInstance("test host", -100));
when(container.getAllocatedResource()).thenReturn(
Resource.newInstance(-1, -1));
when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED);
when(container.getCreationTime()).thenReturn(Integer.MAX_VALUE + 1L);
when(container.getFinishTime()).thenReturn(Integer.MAX_VALUE + 2L);
when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info");
when(container.getContainerExitStatus()).thenReturn(-1);
when(container.getContainerState()).thenReturn(ContainerState.COMPLETE);
return container;
}
}

View File

@ -50,6 +50,7 @@ public class MockRMApp implements RMApp {
int failCount = 0;
ApplicationId id;
String url = null;
String oUrl = null;
StringBuilder diagnostics = new StringBuilder();
RMAppAttempt attempt;
int maxAppAttempts = 1;
@ -183,6 +184,15 @@ public class MockRMApp implements RMApp {
this.url = url;
}
@Override
public String getOriginalTrackingUrl() {
return oUrl;
}
public void setOriginalTrackingUrl(String oUrl) {
this.oUrl = oUrl;
}
@Override
public StringBuilder getDiagnostics() {
return diagnostics;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@ -94,6 +96,7 @@ public class TestRMAppTransitions {
private DrainDispatcher rmDispatcher;
private RMStateStore store;
private RMApplicationHistoryWriter writer;
private SystemMetricsPublisher publisher;
private YarnScheduler scheduler;
private TestSchedulerEventDispatcher schedulerDispatcher;
@ -203,6 +206,8 @@ public class TestRMAppTransitions {
new ClientToAMTokenSecretManagerInRM(),
writer);
((RMContextImpl)realRMContext).setStateStore(store);
publisher = mock(SystemMetricsPublisher.class);
((RMContextImpl)realRMContext).setSystemMetricsPublisher(publisher);
this.rmContext = spy(realRMContext);
@ -354,6 +359,7 @@ public class TestRMAppTransitions {
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = createNewTestApp(submissionContext);
verify(writer).applicationStarted(any(RMApp.class));
verify(publisher).appCreated(any(RMApp.class), anyLong());
// NEW => NEW_SAVING event RMAppEventType.START
RMAppEvent event =
new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
@ -477,6 +483,7 @@ public class TestRMAppTransitions {
// reset the counter of Mockito.verify
reset(writer);
reset(publisher);
// test app fails after 1 app attempt failure
LOG.info("--- START: testUnmanagedAppFailPath ---");
@ -961,6 +968,10 @@ public class TestRMAppTransitions {
ArgumentCaptor.forClass(RMAppState.class);
verify(writer).applicationFinished(any(RMApp.class), finalState.capture());
Assert.assertEquals(state, finalState.getValue());
finalState = ArgumentCaptor.forClass(RMAppState.class);
verify(publisher).appFinished(any(RMApp.class), finalState.capture(),
anyLong());
Assert.assertEquals(state, finalState.getValue());
}
private void verifyAppRemovedSchedulerEvent(RMAppState finalState) {

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -134,6 +136,7 @@ public class TestRMAppAttemptTransitions {
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
private RMApplicationHistoryWriter writer;
private SystemMetricsPublisher publisher;
private RMStateStore store;
@ -246,6 +249,8 @@ public class TestRMAppAttemptTransitions {
store = mock(RMStateStore.class);
((RMContextImpl) rmContext).setStateStore(store);
publisher = mock(SystemMetricsPublisher.class);
((RMContextImpl) rmContext).setSystemMetricsPublisher(publisher);
scheduler = mock(YarnScheduler.class);
masterService = mock(ApplicationMasterService.class);
@ -1377,6 +1382,11 @@ public class TestRMAppAttemptTransitions {
verify(writer).applicationAttemptFinished(
any(RMAppAttempt.class), finalState.capture());
Assert.assertEquals(state, finalState.getValue());
finalState =
ArgumentCaptor.forClass(RMAppAttemptState.class);
verify(publisher).appAttemptFinished(any(RMAppAttempt.class), finalState.capture(),
anyLong());
Assert.assertEquals(state, finalState.getValue());
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -98,11 +100,13 @@ public class TestRMContainerImpl {
Mockito.doReturn(rmApp).when(rmApps).get((ApplicationId)Matchers.any());
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getRMApps()).thenReturn(rmApps);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, "user", rmContext);
@ -111,6 +115,7 @@ public class TestRMContainerImpl {
assertEquals(nodeId, rmContainer.getAllocatedNode());
assertEquals(priority, rmContainer.getAllocatedPriority());
verify(writer).containerStarted(any(RMContainer.class));
verify(publisher).containerCreated(any(RMContainer.class), anyLong());
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
@ -143,6 +148,7 @@ public class TestRMContainerImpl {
rmContainer.getContainerExitStatus());
assertEquals(ContainerState.COMPLETE, rmContainer.getContainerState());
verify(writer).containerFinished(any(RMContainer.class));
verify(publisher).containerFinished(any(RMContainer.class), anyLong());
ArgumentCaptor<RMAppAttemptContainerFinishedEvent> captor = ArgumentCaptor
.forClass(RMAppAttemptContainerFinishedEvent.class);
@ -184,10 +190,12 @@ public class TestRMContainerImpl {
"host:3465", resource, priority, null);
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
nodeId, "user", rmContext);
@ -196,6 +204,7 @@ public class TestRMContainerImpl {
assertEquals(nodeId, rmContainer.getAllocatedNode());
assertEquals(priority, rmContainer.getAllocatedPriority());
verify(writer).containerStarted(any(RMContainer.class));
verify(publisher).containerCreated(any(RMContainer.class), anyLong());
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
@ -224,6 +233,8 @@ public class TestRMContainerImpl {
drainDispatcher.await();
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
verify(writer, never()).containerFinished(any(RMContainer.class));
verify(publisher, never()).containerFinished(any(RMContainer.class),
anyLong());
}
@Test

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -249,10 +250,12 @@ public class TestChildQueueOrder {
mock(ContainerAllocationExpirer.class);
DrainDispatcher drainDispatcher = new DrainDispatcher();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
app_0.getApplicationId(), 1);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -92,6 +93,7 @@ public class TestUtils {
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), writer);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
return rmContext;
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -145,6 +146,8 @@ public class TestFifoScheduler {
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null,
null, null, null, null, null, null, null, writer);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
FifoScheduler scheduler = new FifoScheduler();
Configuration conf = new Configuration();
@ -188,6 +191,8 @@ public class TestFifoScheduler {
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
FifoScheduler scheduler = new FifoScheduler();
scheduler.setRMContext(rmContext);
@ -257,6 +262,8 @@ public class TestFifoScheduler {
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
FifoScheduler scheduler = new FifoScheduler(){
@SuppressWarnings("unused")