YARN-5743. [Atsv2] Publish queue name and RMAppMetrics to ATS (Rohith Sharma K S via Varun Saxena)
This commit is contained in:
parent
853a5da041
commit
c3db98aa10
|
@ -71,10 +71,22 @@ public class ApplicationMetricsConstants {
|
|||
"YARN_APPLICATION_STATE";
|
||||
|
||||
public static final String APP_CPU_METRICS =
|
||||
"YARN_APPLICATION_CPU_METRIC";
|
||||
"YARN_APPLICATION_CPU";
|
||||
|
||||
public static final String APP_MEM_METRICS =
|
||||
"YARN_APPLICATION_MEM_METRIC";
|
||||
"YARN_APPLICATION_MEMORY";
|
||||
|
||||
public static final String APP_RESOURCE_PREEMPTED_CPU =
|
||||
"YARN_APPLICATION_RESOURCE_PREEMPTED_CPU";
|
||||
|
||||
public static final String APP_RESOURCE_PREEMPTED_MEM =
|
||||
"YARN_APPLICATION_RESOURCE_PREEMPTED_MEMORY";
|
||||
|
||||
public static final String APP_NON_AM_CONTAINER_PREEMPTED =
|
||||
"YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED";
|
||||
|
||||
public static final String APP_AM_CONTAINER_PREEMPTED =
|
||||
"YARN_APPLICATION_AM_CONTAINER_PREEMPTED";
|
||||
|
||||
public static final String APP_CPU_PREEMPT_METRICS =
|
||||
"YARN_APPLICATION_CPU_PREEMPT_METRIC";
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -39,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
||||
|
@ -103,6 +106,8 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
|
|||
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
|
||||
app.getApplicationType());
|
||||
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser());
|
||||
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
|
||||
app.getQueue());
|
||||
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
|
||||
app.getSubmitTime());
|
||||
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
|
||||
|
@ -143,11 +148,6 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
|
|||
@Override
|
||||
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
|
||||
ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
|
||||
RMAppMetrics appMetrics = app.getRMAppMetrics();
|
||||
entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
|
||||
appMetrics.getVcoreSeconds());
|
||||
entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
|
||||
appMetrics.getMemorySeconds());
|
||||
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
||||
|
@ -169,10 +169,49 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
|
|||
}
|
||||
entity.setInfo(entityInfo);
|
||||
|
||||
RMAppMetrics appMetrics = app.getRMAppMetrics();
|
||||
Set<TimelineMetric> entityMetrics =
|
||||
getTimelinelineAppMetrics(appMetrics, finishedTime);
|
||||
entity.setMetrics(entityMetrics);
|
||||
|
||||
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
|
||||
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
|
||||
}
|
||||
|
||||
private Set<TimelineMetric> getTimelinelineAppMetrics(
|
||||
RMAppMetrics appMetrics, long timestamp) {
|
||||
Set<TimelineMetric> entityMetrics = new HashSet<TimelineMetric>();
|
||||
|
||||
entityMetrics.add(getTimelineMetric(
|
||||
ApplicationMetricsConstants.APP_CPU_METRICS, timestamp,
|
||||
appMetrics.getVcoreSeconds()));
|
||||
entityMetrics.add(getTimelineMetric(
|
||||
ApplicationMetricsConstants.APP_MEM_METRICS, timestamp,
|
||||
appMetrics.getMemorySeconds()));
|
||||
entityMetrics.add(getTimelineMetric(
|
||||
ApplicationMetricsConstants.APP_RESOURCE_PREEMPTED_CPU, timestamp,
|
||||
appMetrics.getResourcePreempted().getVirtualCores()));
|
||||
entityMetrics.add(getTimelineMetric(
|
||||
ApplicationMetricsConstants.APP_RESOURCE_PREEMPTED_MEM, timestamp,
|
||||
appMetrics.getResourcePreempted().getMemorySize()));
|
||||
entityMetrics.add(getTimelineMetric(
|
||||
ApplicationMetricsConstants.APP_NON_AM_CONTAINER_PREEMPTED, timestamp,
|
||||
appMetrics.getNumNonAMContainersPreempted()));
|
||||
entityMetrics.add(getTimelineMetric(
|
||||
ApplicationMetricsConstants.APP_AM_CONTAINER_PREEMPTED, timestamp,
|
||||
appMetrics.getNumAMContainersPreempted()));
|
||||
|
||||
return entityMetrics;
|
||||
}
|
||||
|
||||
private TimelineMetric getTimelineMetric(String name, long timestamp,
|
||||
Number value) {
|
||||
TimelineMetric metric = new TimelineMetric();
|
||||
metric.setId(name);
|
||||
metric.addValue(timestamp, value);
|
||||
return metric;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void appStateUpdated(RMApp app, YarnApplicationState appState,
|
||||
|
|
|
@ -210,7 +210,7 @@ public class TestSystemMetricsPublisherForV2 {
|
|||
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
File appFile = new File(outputDirApp, timelineServiceFileName);
|
||||
Assert.assertTrue(appFile.exists());
|
||||
verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
||||
verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE, 6);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
|
@ -244,7 +244,8 @@ public class TestSystemMetricsPublisherForV2 {
|
|||
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
File appFile = new File(outputDirApp, timelineServiceFileName);
|
||||
Assert.assertTrue(appFile.exists());
|
||||
verifyEntity(appFile, 2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
|
||||
verifyEntity(appFile, 2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE,
|
||||
0);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
|
@ -276,7 +277,7 @@ public class TestSystemMetricsPublisherForV2 {
|
|||
File appFile = new File(outputDirApp, timelineServiceFileName);
|
||||
Assert.assertTrue(appFile.exists());
|
||||
verifyEntity(appFile, 2,
|
||||
ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
|
||||
ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE, 0);
|
||||
}
|
||||
|
||||
private RMApp createAppAndRegister(ApplicationId appId) {
|
||||
|
@ -290,16 +291,18 @@ public class TestSystemMetricsPublisherForV2 {
|
|||
}
|
||||
|
||||
private static void verifyEntity(File entityFile, long expectedEvents,
|
||||
String eventForCreatedTime) throws IOException {
|
||||
String eventForCreatedTime, long expectedMetrics) throws IOException {
|
||||
BufferedReader reader = null;
|
||||
String strLine;
|
||||
long count = 0;
|
||||
long metricsCount = 0;
|
||||
try {
|
||||
reader = new BufferedReader(new FileReader(entityFile));
|
||||
while ((strLine = reader.readLine()) != null) {
|
||||
if (strLine.trim().length() > 0) {
|
||||
TimelineEntity entity = FileSystemTimelineReaderImpl.
|
||||
getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class);
|
||||
metricsCount = entity.getMetrics().size();
|
||||
for (TimelineEvent event : entity.getEvents()) {
|
||||
if (event.getId().equals(eventForCreatedTime)) {
|
||||
assertTrue(entity.getCreatedTime() > 0);
|
||||
|
@ -313,7 +316,9 @@ public class TestSystemMetricsPublisherForV2 {
|
|||
reader.close();
|
||||
}
|
||||
assertEquals("Expected " + expectedEvents + " events to be published",
|
||||
count, expectedEvents);
|
||||
expectedEvents, count);
|
||||
assertEquals("Expected " + expectedMetrics + " metrics is incorrect",
|
||||
expectedMetrics, metricsCount);
|
||||
}
|
||||
|
||||
private String getTimelineEntityDir(RMApp app) {
|
||||
|
@ -348,8 +353,8 @@ public class TestSystemMetricsPublisherForV2 {
|
|||
when(app.getFinalApplicationStatus()).thenReturn(
|
||||
FinalApplicationStatus.UNDEFINED);
|
||||
when(app.getRMAppMetrics()).thenReturn(
|
||||
new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
|
||||
Long.MAX_VALUE, Long.MAX_VALUE));
|
||||
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, Integer.MAX_VALUE,
|
||||
Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE));
|
||||
when(app.getApplicationTags()).thenReturn(Collections.<String> emptySet());
|
||||
ApplicationSubmissionContext appSubmissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
|
|
Loading…
Reference in New Issue