YARN-5743. [Atsv2] Publish queue name and RMAppMetrics to ATS (Rohith Sharma K S via Varun Saxena)
This commit is contained in:
parent
d26a1bb9d6
commit
b154d3edce
|
@ -71,10 +71,22 @@ public class ApplicationMetricsConstants {
|
||||||
"YARN_APPLICATION_STATE";
|
"YARN_APPLICATION_STATE";
|
||||||
|
|
||||||
public static final String APP_CPU_METRICS =
|
public static final String APP_CPU_METRICS =
|
||||||
"YARN_APPLICATION_CPU_METRIC";
|
"YARN_APPLICATION_CPU";
|
||||||
|
|
||||||
public static final String APP_MEM_METRICS =
|
public static final String APP_MEM_METRICS =
|
||||||
"YARN_APPLICATION_MEM_METRIC";
|
"YARN_APPLICATION_MEMORY";
|
||||||
|
|
||||||
|
public static final String APP_RESOURCE_PREEMPTED_CPU =
|
||||||
|
"YARN_APPLICATION_RESOURCE_PREEMPTED_CPU";
|
||||||
|
|
||||||
|
public static final String APP_RESOURCE_PREEMPTED_MEM =
|
||||||
|
"YARN_APPLICATION_RESOURCE_PREEMPTED_MEMORY";
|
||||||
|
|
||||||
|
public static final String APP_NON_AM_CONTAINER_PREEMPTED =
|
||||||
|
"YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED";
|
||||||
|
|
||||||
|
public static final String APP_AM_CONTAINER_PREEMPTED =
|
||||||
|
"YARN_APPLICATION_AM_CONTAINER_PREEMPTED";
|
||||||
|
|
||||||
public static final String LATEST_APP_ATTEMPT_EVENT_INFO =
|
public static final String LATEST_APP_ATTEMPT_EVENT_INFO =
|
||||||
"YARN_APPLICATION_LATEST_APP_ATTEMPT";
|
"YARN_APPLICATION_LATEST_APP_ATTEMPT";
|
||||||
|
|
|
@ -19,7 +19,9 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
|
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -40,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
|
||||||
|
@ -104,6 +107,8 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
|
||||||
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
|
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
|
||||||
app.getApplicationType());
|
app.getApplicationType());
|
||||||
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser());
|
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser());
|
||||||
|
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
|
||||||
|
app.getQueue());
|
||||||
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
|
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
|
||||||
app.getSubmitTime());
|
app.getSubmitTime());
|
||||||
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
|
entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
|
||||||
|
@ -148,11 +153,6 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
|
||||||
@Override
|
@Override
|
||||||
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
|
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
|
||||||
ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
|
ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
|
||||||
RMAppMetrics appMetrics = app.getRMAppMetrics();
|
|
||||||
entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
|
|
||||||
appMetrics.getVcoreSeconds());
|
|
||||||
entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
|
|
||||||
appMetrics.getMemorySeconds());
|
|
||||||
|
|
||||||
TimelineEvent tEvent = new TimelineEvent();
|
TimelineEvent tEvent = new TimelineEvent();
|
||||||
tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
||||||
|
@ -174,10 +174,49 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
|
||||||
}
|
}
|
||||||
entity.setInfo(entityInfo);
|
entity.setInfo(entityInfo);
|
||||||
|
|
||||||
|
RMAppMetrics appMetrics = app.getRMAppMetrics();
|
||||||
|
Set<TimelineMetric> entityMetrics =
|
||||||
|
getTimelinelineAppMetrics(appMetrics, finishedTime);
|
||||||
|
entity.setMetrics(entityMetrics);
|
||||||
|
|
||||||
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
|
getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
|
||||||
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
|
SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Set<TimelineMetric> getTimelinelineAppMetrics(
|
||||||
|
RMAppMetrics appMetrics, long timestamp) {
|
||||||
|
Set<TimelineMetric> entityMetrics = new HashSet<TimelineMetric>();
|
||||||
|
|
||||||
|
entityMetrics.add(getTimelineMetric(
|
||||||
|
ApplicationMetricsConstants.APP_CPU_METRICS, timestamp,
|
||||||
|
appMetrics.getVcoreSeconds()));
|
||||||
|
entityMetrics.add(getTimelineMetric(
|
||||||
|
ApplicationMetricsConstants.APP_MEM_METRICS, timestamp,
|
||||||
|
appMetrics.getMemorySeconds()));
|
||||||
|
entityMetrics.add(getTimelineMetric(
|
||||||
|
ApplicationMetricsConstants.APP_RESOURCE_PREEMPTED_CPU, timestamp,
|
||||||
|
appMetrics.getResourcePreempted().getVirtualCores()));
|
||||||
|
entityMetrics.add(getTimelineMetric(
|
||||||
|
ApplicationMetricsConstants.APP_RESOURCE_PREEMPTED_MEM, timestamp,
|
||||||
|
appMetrics.getResourcePreempted().getMemorySize()));
|
||||||
|
entityMetrics.add(getTimelineMetric(
|
||||||
|
ApplicationMetricsConstants.APP_NON_AM_CONTAINER_PREEMPTED, timestamp,
|
||||||
|
appMetrics.getNumNonAMContainersPreempted()));
|
||||||
|
entityMetrics.add(getTimelineMetric(
|
||||||
|
ApplicationMetricsConstants.APP_AM_CONTAINER_PREEMPTED, timestamp,
|
||||||
|
appMetrics.getNumAMContainersPreempted()));
|
||||||
|
|
||||||
|
return entityMetrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TimelineMetric getTimelineMetric(String name, long timestamp,
|
||||||
|
Number value) {
|
||||||
|
TimelineMetric metric = new TimelineMetric();
|
||||||
|
metric.setId(name);
|
||||||
|
metric.addValue(timestamp, value);
|
||||||
|
return metric;
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public void appStateUpdated(RMApp app, YarnApplicationState appState,
|
public void appStateUpdated(RMApp app, YarnApplicationState appState,
|
||||||
|
|
|
@ -211,7 +211,7 @@ public class TestSystemMetricsPublisherForV2 {
|
||||||
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||||
File appFile = new File(outputDirApp, timelineServiceFileName);
|
File appFile = new File(outputDirApp, timelineServiceFileName);
|
||||||
Assert.assertTrue(appFile.exists());
|
Assert.assertTrue(appFile.exists());
|
||||||
verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE, 6);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
|
@ -245,7 +245,8 @@ public class TestSystemMetricsPublisherForV2 {
|
||||||
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||||
File appFile = new File(outputDirApp, timelineServiceFileName);
|
File appFile = new File(outputDirApp, timelineServiceFileName);
|
||||||
Assert.assertTrue(appFile.exists());
|
Assert.assertTrue(appFile.exists());
|
||||||
verifyEntity(appFile, 2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
|
verifyEntity(appFile, 2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE,
|
||||||
|
0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
|
@ -277,7 +278,7 @@ public class TestSystemMetricsPublisherForV2 {
|
||||||
File appFile = new File(outputDirApp, timelineServiceFileName);
|
File appFile = new File(outputDirApp, timelineServiceFileName);
|
||||||
Assert.assertTrue(appFile.exists());
|
Assert.assertTrue(appFile.exists());
|
||||||
verifyEntity(appFile, 2,
|
verifyEntity(appFile, 2,
|
||||||
ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
|
ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private RMApp createAppAndRegister(ApplicationId appId) {
|
private RMApp createAppAndRegister(ApplicationId appId) {
|
||||||
|
@ -291,16 +292,18 @@ public class TestSystemMetricsPublisherForV2 {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void verifyEntity(File entityFile, long expectedEvents,
|
private static void verifyEntity(File entityFile, long expectedEvents,
|
||||||
String eventForCreatedTime) throws IOException {
|
String eventForCreatedTime, long expectedMetrics) throws IOException {
|
||||||
BufferedReader reader = null;
|
BufferedReader reader = null;
|
||||||
String strLine;
|
String strLine;
|
||||||
long count = 0;
|
long count = 0;
|
||||||
|
long metricsCount = 0;
|
||||||
try {
|
try {
|
||||||
reader = new BufferedReader(new FileReader(entityFile));
|
reader = new BufferedReader(new FileReader(entityFile));
|
||||||
while ((strLine = reader.readLine()) != null) {
|
while ((strLine = reader.readLine()) != null) {
|
||||||
if (strLine.trim().length() > 0) {
|
if (strLine.trim().length() > 0) {
|
||||||
TimelineEntity entity = FileSystemTimelineReaderImpl.
|
TimelineEntity entity = FileSystemTimelineReaderImpl.
|
||||||
getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class);
|
getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class);
|
||||||
|
metricsCount = entity.getMetrics().size();
|
||||||
for (TimelineEvent event : entity.getEvents()) {
|
for (TimelineEvent event : entity.getEvents()) {
|
||||||
if (event.getId().equals(eventForCreatedTime)) {
|
if (event.getId().equals(eventForCreatedTime)) {
|
||||||
assertTrue(entity.getCreatedTime() > 0);
|
assertTrue(entity.getCreatedTime() > 0);
|
||||||
|
@ -314,7 +317,9 @@ public class TestSystemMetricsPublisherForV2 {
|
||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
assertEquals("Expected " + expectedEvents + " events to be published",
|
assertEquals("Expected " + expectedEvents + " events to be published",
|
||||||
count, expectedEvents);
|
expectedEvents, count);
|
||||||
|
assertEquals("Expected " + expectedMetrics + " metrics is incorrect",
|
||||||
|
expectedMetrics, metricsCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getTimelineEntityDir(RMApp app) {
|
private String getTimelineEntityDir(RMApp app) {
|
||||||
|
@ -349,7 +354,8 @@ public class TestSystemMetricsPublisherForV2 {
|
||||||
when(app.getFinalApplicationStatus()).thenReturn(
|
when(app.getFinalApplicationStatus()).thenReturn(
|
||||||
FinalApplicationStatus.UNDEFINED);
|
FinalApplicationStatus.UNDEFINED);
|
||||||
when(app.getRMAppMetrics()).thenReturn(
|
when(app.getRMAppMetrics()).thenReturn(
|
||||||
new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE));
|
new RMAppMetrics(Resource.newInstance(0, 0), 0, 0, Integer.MAX_VALUE,
|
||||||
|
Long.MAX_VALUE));
|
||||||
when(app.getApplicationTags()).thenReturn(Collections.<String> emptySet());
|
when(app.getApplicationTags()).thenReturn(Collections.<String> emptySet());
|
||||||
ApplicationSubmissionContext appSubmissionContext =
|
ApplicationSubmissionContext appSubmissionContext =
|
||||||
mock(ApplicationSubmissionContext.class);
|
mock(ApplicationSubmissionContext.class);
|
||||||
|
|
Loading…
Reference in New Issue