YARN-9382 Publish container killed, paused and resumed events to ATSv2. Contributed by Abhishesk Modi.
This commit is contained in:
parent
f52a88fdc8
commit
6acc1a2bd0
|
@ -34,6 +34,15 @@ public class ContainerMetricsConstants {
|
|||
public static final String CREATED_IN_RM_EVENT_TYPE =
|
||||
"YARN_RM_CONTAINER_CREATED";
|
||||
|
||||
// Event of this type will be emitted by NM.
|
||||
public static final String PAUSED_EVENT_TYPE = "YARN_CONTAINER_PAUSED";
|
||||
|
||||
// Event of this type will be emitted by NM.
|
||||
public static final String RESUMED_EVENT_TYPE = "YARN_CONTAINER_RESUMED";
|
||||
|
||||
// Event of this type will be emitted by NM.
|
||||
public static final String KILLED_EVENT_TYPE = "YARN_CONTAINER_KILLED";
|
||||
|
||||
// Event of this type will be emitted by NM.
|
||||
public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED";
|
||||
|
||||
|
|
|
@ -25,6 +25,9 @@ import java.util.Map;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerPauseEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResumeEvent;
|
||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -256,6 +259,95 @@ public class NMTimelinePublisher extends CompositeService {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void publishContainerResumedEvent(
|
||||
ContainerEvent event) {
|
||||
if (publishNMContainerEvents) {
|
||||
ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event;
|
||||
ContainerId containerId = resumeEvent.getContainerID();
|
||||
ContainerEntity entity = createContainerEntity(containerId);
|
||||
|
||||
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||
entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
|
||||
resumeEvent.getDiagnostic());
|
||||
entity.setInfo(entityInfo);
|
||||
|
||||
Container container = context.getContainers().get(containerId);
|
||||
if (container != null) {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setId(ContainerMetricsConstants.RESUMED_EVENT_TYPE);
|
||||
tEvent.setTimestamp(event.getTimestamp());
|
||||
|
||||
long containerStartTime = container.getContainerStartTime();
|
||||
entity.addEvent(tEvent);
|
||||
entity
|
||||
.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
||||
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
||||
containerId.getApplicationAttemptId().getApplicationId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void publishContainerPausedEvent(
|
||||
ContainerEvent event) {
|
||||
if (publishNMContainerEvents) {
|
||||
ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event;
|
||||
ContainerId containerId = pauseEvent.getContainerID();
|
||||
ContainerEntity entity = createContainerEntity(containerId);
|
||||
|
||||
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||
entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
|
||||
pauseEvent.getDiagnostic());
|
||||
entity.setInfo(entityInfo);
|
||||
|
||||
Container container = context.getContainers().get(containerId);
|
||||
if (container != null) {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setId(ContainerMetricsConstants.PAUSED_EVENT_TYPE);
|
||||
tEvent.setTimestamp(event.getTimestamp());
|
||||
|
||||
long containerStartTime = container.getContainerStartTime();
|
||||
entity.addEvent(tEvent);
|
||||
entity
|
||||
.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
||||
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
||||
containerId.getApplicationAttemptId().getApplicationId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void publishContainerKilledEvent(
|
||||
ContainerEvent event) {
|
||||
if (publishNMContainerEvents) {
|
||||
ContainerKillEvent killEvent = (ContainerKillEvent) event;
|
||||
ContainerId containerId = killEvent.getContainerID();
|
||||
ContainerEntity entity = createContainerEntity(containerId);
|
||||
|
||||
Map<String, Object> entityInfo = new HashMap<String, Object>();
|
||||
entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
|
||||
killEvent.getDiagnostic());
|
||||
entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
|
||||
killEvent.getContainerExitStatus());
|
||||
entity.setInfo(entityInfo);
|
||||
|
||||
Container container = context.getContainers().get(containerId);
|
||||
if (container != null) {
|
||||
TimelineEvent tEvent = new TimelineEvent();
|
||||
tEvent.setId(ContainerMetricsConstants.KILLED_EVENT_TYPE);
|
||||
tEvent.setTimestamp(event.getTimestamp());
|
||||
|
||||
long containerStartTime = container.getContainerStartTime();
|
||||
entity.addEvent(tEvent);
|
||||
entity
|
||||
.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
||||
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
|
||||
containerId.getApplicationAttemptId().getApplicationId()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void publishContainerFinishedEvent(ContainerStatus containerStatus,
|
||||
long containerFinishTime, long containerStartTime) {
|
||||
|
@ -398,7 +490,15 @@ public class NMTimelinePublisher extends CompositeService {
|
|||
case INIT_CONTAINER:
|
||||
publishContainerCreatedEvent(event);
|
||||
break;
|
||||
|
||||
case KILL_CONTAINER:
|
||||
publishContainerKilledEvent(event);
|
||||
break;
|
||||
case PAUSE_CONTAINER:
|
||||
publishContainerPausedEvent(event);
|
||||
break;
|
||||
case RESUME_CONTAINER:
|
||||
publishContainerResumedEvent(event);
|
||||
break;
|
||||
default:
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(event.getType()
|
||||
|
|
|
@ -25,7 +25,11 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -35,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -45,6 +50,10 @@ import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerPauseEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResumeEvent;
|
||||
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
|
||||
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
|
||||
import org.junit.Assert;
|
||||
|
@ -94,6 +103,19 @@ public class TestNMTimelinePublisher {
|
|||
private Context createMockContext() {
|
||||
Context context = mock(Context.class);
|
||||
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
|
||||
|
||||
ConcurrentMap<ContainerId, Container> containers =
|
||||
new ConcurrentHashMap<>();
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
|
||||
Container container = mock(Container.class);
|
||||
when(container.getContainerStartTime())
|
||||
.thenReturn(System.currentTimeMillis());
|
||||
containers.putIfAbsent(cId, container);
|
||||
when(context.getContainers()).thenReturn(containers);
|
||||
|
||||
return context;
|
||||
}
|
||||
|
||||
|
@ -145,6 +167,121 @@ public class TestNMTimelinePublisher {
|
|||
cId.getContainerId()), entity.getIdPrefix());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPublishContainerPausedEvent() {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
|
||||
|
||||
ContainerEvent containerEvent =
|
||||
new ContainerPauseEvent(cId, "test pause");
|
||||
|
||||
publisher.createTimelineClient(appId);
|
||||
publisher.publishContainerEvent(containerEvent);
|
||||
publisher.stopTimelineClient(appId);
|
||||
dispatcher.await();
|
||||
|
||||
ContainerEntity cEntity = new ContainerEntity();
|
||||
cEntity.setId(cId.toString());
|
||||
TimelineEntity[] lastPublishedEntities =
|
||||
timelineClient.getLastPublishedEntities();
|
||||
|
||||
Assert.assertNotNull(lastPublishedEntities);
|
||||
Assert.assertEquals(1, lastPublishedEntities.length);
|
||||
TimelineEntity entity = lastPublishedEntities[0];
|
||||
Assert.assertEquals(cEntity, entity);
|
||||
|
||||
NavigableSet<TimelineEvent> events = entity.getEvents();
|
||||
Assert.assertEquals(1, events.size());
|
||||
Assert.assertEquals(ContainerMetricsConstants.PAUSED_EVENT_TYPE,
|
||||
events.iterator().next().getId());
|
||||
|
||||
Map<String, Object> info = entity.getInfo();
|
||||
Assert.assertTrue(
|
||||
info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO));
|
||||
Assert.assertEquals("test pause",
|
||||
info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPublishContainerResumedEvent() {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
|
||||
|
||||
ContainerEvent containerEvent =
|
||||
new ContainerResumeEvent(cId, "test resume");
|
||||
|
||||
publisher.createTimelineClient(appId);
|
||||
publisher.publishContainerEvent(containerEvent);
|
||||
publisher.stopTimelineClient(appId);
|
||||
dispatcher.await();
|
||||
|
||||
ContainerEntity cEntity = new ContainerEntity();
|
||||
cEntity.setId(cId.toString());
|
||||
TimelineEntity[] lastPublishedEntities =
|
||||
timelineClient.getLastPublishedEntities();
|
||||
|
||||
Assert.assertNotNull(lastPublishedEntities);
|
||||
Assert.assertEquals(1, lastPublishedEntities.length);
|
||||
TimelineEntity entity = lastPublishedEntities[0];
|
||||
Assert.assertEquals(cEntity, entity);
|
||||
|
||||
NavigableSet<TimelineEvent> events = entity.getEvents();
|
||||
Assert.assertEquals(1, events.size());
|
||||
Assert.assertEquals(ContainerMetricsConstants.RESUMED_EVENT_TYPE,
|
||||
events.iterator().next().getId());
|
||||
|
||||
Map<String, Object> info = entity.getInfo();
|
||||
Assert.assertTrue(
|
||||
info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO));
|
||||
Assert.assertEquals("test resume",
|
||||
info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPublishContainerKilledEvent() {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
|
||||
|
||||
ContainerEvent containerEvent =
|
||||
new ContainerKillEvent(cId, 1, "test kill");
|
||||
|
||||
publisher.createTimelineClient(appId);
|
||||
publisher.publishContainerEvent(containerEvent);
|
||||
publisher.stopTimelineClient(appId);
|
||||
dispatcher.await();
|
||||
|
||||
ContainerEntity cEntity = new ContainerEntity();
|
||||
cEntity.setId(cId.toString());
|
||||
TimelineEntity[] lastPublishedEntities =
|
||||
timelineClient.getLastPublishedEntities();
|
||||
|
||||
Assert.assertNotNull(lastPublishedEntities);
|
||||
Assert.assertEquals(1, lastPublishedEntities.length);
|
||||
TimelineEntity entity = lastPublishedEntities[0];
|
||||
Assert.assertEquals(cEntity, entity);
|
||||
|
||||
NavigableSet<TimelineEvent> events = entity.getEvents();
|
||||
Assert.assertEquals(1, events.size());
|
||||
Assert.assertEquals(ContainerMetricsConstants.KILLED_EVENT_TYPE,
|
||||
events.iterator().next().getId());
|
||||
|
||||
Map<String, Object> info = entity.getInfo();
|
||||
Assert.assertTrue(
|
||||
info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO));
|
||||
Assert.assertEquals("test kill",
|
||||
info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
|
||||
Assert.assertTrue(
|
||||
info.containsKey(ContainerMetricsConstants.EXIT_STATUS_INFO));
|
||||
Assert.assertEquals(1,
|
||||
info.get(ContainerMetricsConstants.EXIT_STATUS_INFO));
|
||||
}
|
||||
|
||||
@Test public void testContainerResourceUsage() {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
publisher.createTimelineClient(appId);
|
||||
|
|
Loading…
Reference in New Issue