diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java index f275b3722c9..1ee27d2dc25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.timelineservice; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.AbstractEvent; /** @@ -25,11 +26,14 @@ import org.apache.hadoop.yarn.event.AbstractEvent; * timelineservice v2. */ public class NMTimelineEvent extends AbstractEvent { - public NMTimelineEvent(NMTimelineEventType type) { - super(type); + private ApplicationId appId; + + public NMTimelineEvent(NMTimelineEventType type, ApplicationId appId) { + super(type, System.currentTimeMillis()); + this.appId=appId; } - public NMTimelineEvent(NMTimelineEventType type, long timestamp) { - super(type, timestamp); + public ApplicationId getApplicationId() { + return appId; } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java index b4ae45ad3f9..5d81c947eb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java @@ -24,4 +24,7 @@ package org.apache.hadoop.yarn.server.nodemanager.timelineservice; public enum NMTimelineEventType { // Publish the NM Timeline entity TIMELINE_ENTITY_PUBLISH, + + // Stop and remove timeline client + STOP_TIMELINE_CLIENT } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 94b702500e6..bba56703b6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -95,7 +95,7 @@ public class NMTimelinePublisher extends CompositeService { @Override protected void serviceInit(Configuration conf) throws Exception { - dispatcher = new AsyncDispatcher(); + dispatcher = createDispatcher(); dispatcher.register(NMTimelineEventType.class, new ForwardingEventHandler()); addIfService(dispatcher); @@ -112,6 +112,10 @@ public class NMTimelinePublisher extends CompositeService { super.serviceInit(conf); } + protected AsyncDispatcher createDispatcher() { + return new AsyncDispatcher("NM Timeline dispatcher"); + } + @Override protected void serviceStart() throws Exception { super.serviceStart(); @@ -140,6 +144,9 @@ public class NMTimelinePublisher extends CompositeService { putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(), ((TimelinePublishEvent) event).getApplicationId()); break; + case STOP_TIMELINE_CLIENT: + removeAndStopTimelineClient(event.getApplicationId()); + break; default: LOG.error("Unknown NMTimelineEvent type: " + event.getType()); } @@ -391,20 +398,13 @@ public class NMTimelinePublisher extends CompositeService { } private static class TimelinePublishEvent extends NMTimelineEvent { - private ApplicationId appId; private TimelineEntity entityToPublish; public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) { - super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, System - .currentTimeMillis()); - this.appId = appId; + super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, appId); this.entityToPublish = entity; } - public ApplicationId getApplicationId() { - return appId; - } - public TimelineEntity getTimelineEntityToPublish() { return entityToPublish; } @@ -433,6 +433,11 @@ public class NMTimelinePublisher extends CompositeService { } public void stopTimelineClient(ApplicationId appId) { + dispatcher.getEventHandler().handle( + new NMTimelineEvent(NMTimelineEventType.STOP_TIMELINE_CLIENT, appId)); + } + + private void removeAndStopTimelineClient(ApplicationId appId) { TimelineV2Client client = appToClientMap.remove(appId); if (client != null) { client.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java index 5e971588229..c22e42ea445 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java @@ -34,17 +34,25 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.junit.Assert; import org.junit.Test; +import org.junit.After; +import org.junit.Before; /** * Tests {@link NMTimelinePublisher}. @@ -53,18 +61,23 @@ public class TestNMTimelinePublisher { private static final String MEMORY_ID = "MEMORY"; private static final String CPU_ID = "CPU"; - @Test - public void testContainerResourceUsage() { - Context context = mock(Context.class); - @SuppressWarnings("unchecked") - final DummyTimelineClient timelineClient = new DummyTimelineClient(null); - when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); + private NMTimelinePublisher publisher; + private DummyTimelineClient timelineClient; + private Configuration conf; + private DrainDispatcher dispatcher; - Configuration conf = new Configuration(); + + @Before public void setup() throws Exception { + conf = new Configuration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS, + 3000L); + timelineClient = new DummyTimelineClient(null); + Context context = createMockContext(); + dispatcher = new DrainDispatcher(); - NMTimelinePublisher publisher = new NMTimelinePublisher(context) { + publisher = new NMTimelinePublisher(context) { public void createTimelineClient(ApplicationId appId) { if (!getAppToClientMap().containsKey(appId)) { timelineClient.init(getConfig()); @@ -72,15 +85,73 @@ public class TestNMTimelinePublisher { getAppToClientMap().put(appId, timelineClient); } } + + @Override protected AsyncDispatcher createDispatcher() { + return dispatcher; + } }; publisher.init(conf); publisher.start(); + } + + private Context createMockContext() { + Context context = mock(Context.class); + when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); + return context; + } + + @After public void tearDown() throws Exception { + if (publisher != null) { + publisher.stop(); + } + if (timelineClient != null) { + timelineClient.stop(); + } + } + + @Test public void testPublishContainerFinish() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 2); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); + + String diag = "test-diagnostics"; + int exitStatus = 0; + ContainerStatus cStatus = mock(ContainerStatus.class); + when(cStatus.getContainerId()).thenReturn(cId); + when(cStatus.getDiagnostics()).thenReturn(diag); + when(cStatus.getExitStatus()).thenReturn(exitStatus); + long timeStamp = System.currentTimeMillis(); + + ApplicationContainerFinishedEvent finishedEvent = + new ApplicationContainerFinishedEvent(cStatus, timeStamp); + + publisher.createTimelineClient(appId); + publisher.publishApplicationEvent(finishedEvent); + publisher.stopTimelineClient(appId); + dispatcher.await(); + + ContainerEntity cEntity = new ContainerEntity(); + cEntity.setId(cId.toString()); + TimelineEntity[] lastPublishedEntities = + timelineClient.getLastPublishedEntities(); + + Assert.assertNotNull(lastPublishedEntities); + Assert.assertEquals(1, lastPublishedEntities.length); + TimelineEntity entity = lastPublishedEntities[0]; + Assert.assertTrue(cEntity.equals(entity)); + Assert.assertEquals(diag, + entity.getInfo().get(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + Assert.assertEquals(exitStatus, + entity.getInfo().get(ContainerMetricsConstants.EXIT_STATUS_INFO)); + } + + @Test public void testContainerResourceUsage() { ApplicationId appId = ApplicationId.newInstance(0, 1); publisher.createTimelineClient(appId); Container aContainer = mock(Container.class); - when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId( - ApplicationAttemptId.newInstance(appId, 1), - 0L)); + when(aContainer.getContainerId()).thenReturn(ContainerId + .newContainerId(ApplicationAttemptId.newInstance(appId, 1), 0L)); publisher.reportContainerResourceUsage(aContainer, 1024L, 8F); verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8); timelineClient.reset(); @@ -97,7 +168,6 @@ public class TestNMTimelinePublisher { (float) ResourceCalculatorProcessTree.UNAVAILABLE); verifyPublishedResourceUsageMetrics(timelineClient, 1024L, ResourceCalculatorProcessTree.UNAVAILABLE); - publisher.stop(); } private void verifyPublishedResourceUsageMetrics( @@ -157,8 +227,12 @@ public class TestNMTimelinePublisher { private TimelineEntity[] lastPublishedEntities; - @Override - public void putEntitiesAsync(TimelineEntity... entities) + @Override public void putEntitiesAsync(TimelineEntity... entities) + throws IOException, YarnException { + this.lastPublishedEntities = entities; + } + + @Override public void putEntities(TimelineEntity... entities) throws IOException, YarnException { this.lastPublishedEntities = entities; }