YARN-6735. Have a way to turn off container metrics from NMs. Contributed by Abhishek Modi.
This commit is contained in:
parent
5345508fa3
commit
108c569e3b
@ -1219,6 +1219,16 @@ public static boolean isAclEnabled(Configuration conf) {
|
|||||||
public static final String DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS =
|
public static final String DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS =
|
||||||
"0.0.0.0:" + DEFAULT_NM_COLLECTOR_SERVICE_PORT;
|
"0.0.0.0:" + DEFAULT_NM_COLLECTOR_SERVICE_PORT;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The setting that controls whether yarn container events are published to
|
||||||
|
* the timeline service or not by NM. This configuration setting is for ATS
|
||||||
|
* V2
|
||||||
|
*/
|
||||||
|
public static final String NM_PUBLISH_CONTAINER_EVENTS_ENABLED = NM_PREFIX
|
||||||
|
+ "emit-container-events";
|
||||||
|
public static final boolean DEFAULT_NM_PUBLISH_CONTAINER_EVENTS_ENABLED =
|
||||||
|
true;
|
||||||
|
|
||||||
/** Interval in between cache cleanups.*/
|
/** Interval in between cache cleanups.*/
|
||||||
public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
|
public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
|
||||||
NM_PREFIX + "localizer.cache.cleanup.interval-ms";
|
NM_PREFIX + "localizer.cache.cleanup.interval-ms";
|
||||||
|
@ -1185,6 +1185,14 @@
|
|||||||
<value>${yarn.nodemanager.hostname}:8048</value>
|
<value>${yarn.nodemanager.hostname}:8048</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>The setting that controls whether yarn container events are
|
||||||
|
published to the timeline service or not by NM. This configuration setting
|
||||||
|
is for ATS V2.</description>
|
||||||
|
<name>yarn.nodemanager.emit-container-events</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Interval in between cache cleanups.</description>
|
<description>Interval in between cache cleanups.</description>
|
||||||
<name>yarn.nodemanager.localizer.cache.cleanup.interval-ms</name>
|
<name>yarn.nodemanager.localizer.cache.cleanup.interval-ms</name>
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -88,6 +89,8 @@ public class NMTimelinePublisher extends CompositeService {
|
|||||||
|
|
||||||
private final Map<ApplicationId, TimelineV2Client> appToClientMap;
|
private final Map<ApplicationId, TimelineV2Client> appToClientMap;
|
||||||
|
|
||||||
|
private boolean publishNMContainerEvents = true;
|
||||||
|
|
||||||
public NMTimelinePublisher(Context context) {
|
public NMTimelinePublisher(Context context) {
|
||||||
super(NMTimelinePublisher.class.getName());
|
super(NMTimelinePublisher.class.getName());
|
||||||
this.context = context;
|
this.context = context;
|
||||||
@ -110,6 +113,10 @@ protected void serviceInit(Configuration conf) throws Exception {
|
|||||||
if (webAppURLWithoutScheme.contains(":")) {
|
if (webAppURLWithoutScheme.contains(":")) {
|
||||||
httpPort = webAppURLWithoutScheme.split(":")[1];
|
httpPort = webAppURLWithoutScheme.split(":")[1];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
publishNMContainerEvents = conf.getBoolean(
|
||||||
|
YarnConfiguration.NM_PUBLISH_CONTAINER_EVENTS_ENABLED,
|
||||||
|
YarnConfiguration.DEFAULT_NM_PUBLISH_CONTAINER_EVENTS_ENABLED);
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,31 +162,148 @@ protected void handleNMTimelineEvent(NMTimelineEvent event) {
|
|||||||
|
|
||||||
public void reportContainerResourceUsage(Container container, Long pmemUsage,
|
public void reportContainerResourceUsage(Container container, Long pmemUsage,
|
||||||
Float cpuUsagePercentPerCore) {
|
Float cpuUsagePercentPerCore) {
|
||||||
if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
|
if (publishNMContainerEvents) {
|
||||||
cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE
|
||||||
ContainerEntity entity =
|
|| cpuUsagePercentPerCore !=
|
||||||
createContainerEntity(container.getContainerId());
|
ResourceCalculatorProcessTree.UNAVAILABLE) {
|
||||||
long currentTimeMillis = System.currentTimeMillis();
|
ContainerEntity entity =
|
||||||
if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
createContainerEntity(container.getContainerId());
|
||||||
TimelineMetric memoryMetric = new TimelineMetric();
|
long currentTimeMillis = System.currentTimeMillis();
|
||||||
memoryMetric.setId(ContainerMetric.MEMORY.toString());
|
if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
||||||
memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
|
TimelineMetric memoryMetric = new TimelineMetric();
|
||||||
memoryMetric.addValue(currentTimeMillis, pmemUsage);
|
memoryMetric.setId(ContainerMetric.MEMORY.toString());
|
||||||
entity.addMetric(memoryMetric);
|
memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
|
||||||
}
|
memoryMetric.addValue(currentTimeMillis, pmemUsage);
|
||||||
if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
entity.addMetric(memoryMetric);
|
||||||
TimelineMetric cpuMetric = new TimelineMetric();
|
}
|
||||||
cpuMetric.setId(ContainerMetric.CPU.toString());
|
if (cpuUsagePercentPerCore !=
|
||||||
// TODO: support average
|
ResourceCalculatorProcessTree.UNAVAILABLE) {
|
||||||
cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
|
TimelineMetric cpuMetric = new TimelineMetric();
|
||||||
cpuMetric.addValue(currentTimeMillis,
|
cpuMetric.setId(ContainerMetric.CPU.toString());
|
||||||
Math.round(cpuUsagePercentPerCore));
|
// TODO: support average
|
||||||
entity.addMetric(cpuMetric);
|
cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
|
||||||
|
cpuMetric.addValue(currentTimeMillis,
|
||||||
|
Math.round(cpuUsagePercentPerCore));
|
||||||
|
entity.addMetric(cpuMetric);
|
||||||
|
}
|
||||||
|
entity.setIdPrefix(TimelineServiceHelper.
|
||||||
|
invertLong(container.getContainerStartTime()));
|
||||||
|
ApplicationId appId = container.getContainerId().
|
||||||
|
getApplicationAttemptId().getApplicationId();
|
||||||
|
try {
|
||||||
|
// no need to put it as part of publisher as timeline client
|
||||||
|
// already has Queuing concept
|
||||||
|
TimelineV2Client timelineClient = getTimelineClient(appId);
|
||||||
|
if (timelineClient != null) {
|
||||||
|
timelineClient.putEntitiesAsync(entity);
|
||||||
|
} else {
|
||||||
|
LOG.error("Seems like client has been removed before the container"
|
||||||
|
+ " metric could be published for " +
|
||||||
|
container.getContainerId());
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error(
|
||||||
|
"Failed to publish Container metrics for container " +
|
||||||
|
container.getContainerId());
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Failed to publish Container metrics for container " +
|
||||||
|
container.getContainerId(), e);
|
||||||
|
}
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.error(
|
||||||
|
"Failed to publish Container metrics for container " +
|
||||||
|
container.getContainerId(), e.getMessage());
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Failed to publish Container metrics for container " +
|
||||||
|
container.getContainerId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private void publishContainerCreatedEvent(ContainerEvent event) {
|
||||||
|
if (publishNMContainerEvents) {
|
||||||
|
ContainerId containerId = event.getContainerID();
|
||||||
|
ContainerEntity entity = createContainerEntity(containerId);
|
||||||
|
Container container = context.getContainers().get(containerId);
|
||||||
|
Resource resource = container.getResource();
|
||||||
|
|
||||||
|
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||||
|
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO,
|
||||||
|
resource.getMemorySize());
|
||||||
|
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO,
|
||||||
|
resource.getVirtualCores());
|
||||||
|
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
|
||||||
|
nodeId.getHost());
|
||||||
|
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO,
|
||||||
|
nodeId.getPort());
|
||||||
|
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO,
|
||||||
|
container.getPriority().toString());
|
||||||
|
entityInfo.put(
|
||||||
|
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
|
||||||
|
httpAddress);
|
||||||
|
entity.setInfo(entityInfo);
|
||||||
|
|
||||||
|
TimelineEvent tEvent = new TimelineEvent();
|
||||||
|
tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
|
||||||
|
tEvent.setTimestamp(event.getTimestamp());
|
||||||
|
|
||||||
|
long containerStartTime = container.getContainerStartTime();
|
||||||
|
entity.addEvent(tEvent);
|
||||||
|
entity.setCreatedTime(containerStartTime);
|
||||||
|
entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
||||||
|
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
||||||
|
containerId.getApplicationAttemptId().getApplicationId()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private void publishContainerFinishedEvent(ContainerStatus containerStatus,
|
||||||
|
long containerFinishTime, long containerStartTime) {
|
||||||
|
if (publishNMContainerEvents) {
|
||||||
|
ContainerId containerId = containerStatus.getContainerId();
|
||||||
|
TimelineEntity entity = createContainerEntity(containerId);
|
||||||
|
|
||||||
|
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||||
|
entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
|
||||||
|
containerStatus.getDiagnostics());
|
||||||
|
entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
|
||||||
|
containerStatus.getExitStatus());
|
||||||
|
entityInfo.put(ContainerMetricsConstants.STATE_INFO,
|
||||||
|
ContainerState.COMPLETE.toString());
|
||||||
|
entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME,
|
||||||
|
containerFinishTime);
|
||||||
|
entity.setInfo(entityInfo);
|
||||||
|
|
||||||
|
TimelineEvent tEvent = new TimelineEvent();
|
||||||
|
tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
|
||||||
|
tEvent.setTimestamp(containerFinishTime);
|
||||||
|
entity.addEvent(tEvent);
|
||||||
|
entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
||||||
|
|
||||||
|
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
||||||
|
containerId.getApplicationAttemptId().getApplicationId()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void publishContainerLocalizationEvent(
|
||||||
|
ContainerLocalizationEvent event, String eventType) {
|
||||||
|
if (publishNMContainerEvents) {
|
||||||
|
Container container = event.getContainer();
|
||||||
|
ContainerId containerId = container.getContainerId();
|
||||||
|
TimelineEntity entity = createContainerEntity(containerId);
|
||||||
|
|
||||||
|
TimelineEvent tEvent = new TimelineEvent();
|
||||||
|
tEvent.setId(eventType);
|
||||||
|
tEvent.setTimestamp(event.getTimestamp());
|
||||||
|
entity.addEvent(tEvent);
|
||||||
entity.setIdPrefix(TimelineServiceHelper.
|
entity.setIdPrefix(TimelineServiceHelper.
|
||||||
invertLong(container.getContainerStartTime()));
|
invertLong(container.getContainerStartTime()));
|
||||||
ApplicationId appId = container.getContainerId().getApplicationAttemptId()
|
|
||||||
.getApplicationId();
|
ApplicationId appId = container.getContainerId().
|
||||||
|
getApplicationAttemptId().getApplicationId();
|
||||||
try {
|
try {
|
||||||
// no need to put it as part of publisher as timeline client already has
|
// no need to put it as part of publisher as timeline client already has
|
||||||
// Queuing concept
|
// Queuing concept
|
||||||
@ -187,8 +311,8 @@ public void reportContainerResourceUsage(Container container, Long pmemUsage,
|
|||||||
if (timelineClient != null) {
|
if (timelineClient != null) {
|
||||||
timelineClient.putEntitiesAsync(entity);
|
timelineClient.putEntitiesAsync(entity);
|
||||||
} else {
|
} else {
|
||||||
LOG.error("Seems like client has been removed before the container"
|
LOG.error("Seems like client has been removed before the event"
|
||||||
+ " metric could be published for " + container.getContainerId());
|
+ " could be published for " + container.getContainerId());
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Failed to publish Container metrics for container "
|
LOG.error("Failed to publish Container metrics for container "
|
||||||
@ -208,110 +332,6 @@ public void reportContainerResourceUsage(Container container, Long pmemUsage,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private void publishContainerCreatedEvent(ContainerEvent event) {
|
|
||||||
ContainerId containerId = event.getContainerID();
|
|
||||||
ContainerEntity entity = createContainerEntity(containerId);
|
|
||||||
Container container = context.getContainers().get(containerId);
|
|
||||||
Resource resource = container.getResource();
|
|
||||||
|
|
||||||
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
|
||||||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO,
|
|
||||||
resource.getMemorySize());
|
|
||||||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO,
|
|
||||||
resource.getVirtualCores());
|
|
||||||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
|
|
||||||
nodeId.getHost());
|
|
||||||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO,
|
|
||||||
nodeId.getPort());
|
|
||||||
entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO,
|
|
||||||
container.getPriority().toString());
|
|
||||||
entityInfo.put(
|
|
||||||
ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
|
|
||||||
httpAddress);
|
|
||||||
entity.setInfo(entityInfo);
|
|
||||||
|
|
||||||
TimelineEvent tEvent = new TimelineEvent();
|
|
||||||
tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
|
|
||||||
tEvent.setTimestamp(event.getTimestamp());
|
|
||||||
|
|
||||||
long containerStartTime = container.getContainerStartTime();
|
|
||||||
entity.addEvent(tEvent);
|
|
||||||
entity.setCreatedTime(containerStartTime);
|
|
||||||
entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
|
||||||
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
|
||||||
containerId.getApplicationAttemptId().getApplicationId()));
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private void publishContainerFinishedEvent(ContainerStatus containerStatus,
|
|
||||||
long containerFinishTime, long containerStartTime) {
|
|
||||||
ContainerId containerId = containerStatus.getContainerId();
|
|
||||||
TimelineEntity entity = createContainerEntity(containerId);
|
|
||||||
|
|
||||||
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
|
||||||
entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
|
|
||||||
containerStatus.getDiagnostics());
|
|
||||||
entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
|
|
||||||
containerStatus.getExitStatus());
|
|
||||||
entityInfo.put(ContainerMetricsConstants.STATE_INFO,
|
|
||||||
ContainerState.COMPLETE.toString());
|
|
||||||
entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME,
|
|
||||||
containerFinishTime);
|
|
||||||
entity.setInfo(entityInfo);
|
|
||||||
|
|
||||||
TimelineEvent tEvent = new TimelineEvent();
|
|
||||||
tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
|
|
||||||
tEvent.setTimestamp(containerFinishTime);
|
|
||||||
entity.addEvent(tEvent);
|
|
||||||
entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
|
||||||
|
|
||||||
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
|
||||||
containerId.getApplicationAttemptId().getApplicationId()));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void publishContainerLocalizationEvent(
|
|
||||||
ContainerLocalizationEvent event, String eventType) {
|
|
||||||
Container container = event.getContainer();
|
|
||||||
ContainerId containerId = container.getContainerId();
|
|
||||||
TimelineEntity entity = createContainerEntity(containerId);
|
|
||||||
|
|
||||||
TimelineEvent tEvent = new TimelineEvent();
|
|
||||||
tEvent.setId(eventType);
|
|
||||||
tEvent.setTimestamp(event.getTimestamp());
|
|
||||||
entity.addEvent(tEvent);
|
|
||||||
entity.setIdPrefix(TimelineServiceHelper.
|
|
||||||
invertLong(container.getContainerStartTime()));
|
|
||||||
|
|
||||||
ApplicationId appId =
|
|
||||||
container.getContainerId().getApplicationAttemptId().getApplicationId();
|
|
||||||
try {
|
|
||||||
// no need to put it as part of publisher as timeline client already has
|
|
||||||
// Queuing concept
|
|
||||||
TimelineV2Client timelineClient = getTimelineClient(appId);
|
|
||||||
if (timelineClient != null) {
|
|
||||||
timelineClient.putEntitiesAsync(entity);
|
|
||||||
} else {
|
|
||||||
LOG.error("Seems like client has been removed before the event could be"
|
|
||||||
+ " published for " + container.getContainerId());
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Failed to publish Container metrics for container "
|
|
||||||
+ container.getContainerId());
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Failed to publish Container metrics for container "
|
|
||||||
+ container.getContainerId(), e);
|
|
||||||
}
|
|
||||||
} catch (YarnException e) {
|
|
||||||
LOG.error("Failed to publish Container metrics for container "
|
|
||||||
+ container.getContainerId(), e.getMessage());
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Failed to publish Container metrics for container "
|
|
||||||
+ container.getContainerId(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ContainerEntity createContainerEntity(
|
private static ContainerEntity createContainerEntity(
|
||||||
ContainerId containerId) {
|
ContainerId containerId) {
|
||||||
ContainerEntity entity = new ContainerEntity();
|
ContainerEntity entity = new ContainerEntity();
|
||||||
|
@ -67,6 +67,8 @@ public class TestNMTimelinePublisher {
|
|||||||
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||||
conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
|
conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
|
||||||
3000L);
|
3000L);
|
||||||
|
conf.setBoolean(YarnConfiguration.NM_PUBLISH_CONTAINER_EVENTS_ENABLED,
|
||||||
|
true);
|
||||||
timelineClient = new DummyTimelineClient(null);
|
timelineClient = new DummyTimelineClient(null);
|
||||||
Context context = createMockContext();
|
Context context = createMockContext();
|
||||||
dispatcher = new DrainDispatcher();
|
dispatcher = new DrainDispatcher();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user