diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index 66f9aab034b..c15f99d3bb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -19,7 +19,12 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -31,6 +36,7 @@ import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; @@ -59,6 +65,8 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { private final NodeTimelineCollectorManager collectorManager; private long collectorLingerPeriod; private ScheduledExecutorService scheduler; + private Map> appIdToContainerId = + new ConcurrentHashMap<>(); public PerNodeTimelineCollectorsAuxService() { this(new NodeTimelineCollectorManager(true)); @@ -148,7 +156,15 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { if (context.getContainerType() == ContainerType.APPLICATION_MASTER) { ApplicationId appId = context.getContainerId(). getApplicationAttemptId().getApplicationId(); - addApplication(appId, context.getUser()); + synchronized (appIdToContainerId) { + Set masterContainers = appIdToContainerId.get(appId); + if (masterContainers == null) { + masterContainers = new HashSet<>(); + appIdToContainerId.put(appId, masterContainers); + } + masterContainers.add(context.getContainerId()); + addApplication(appId, context.getUser()); + } } } @@ -162,16 +178,35 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { // intercept the event of the AM container being stopped and remove the app // level collector service if (context.getContainerType() == ContainerType.APPLICATION_MASTER) { - final ApplicationId appId = - context.getContainerId().getApplicationAttemptId().getApplicationId(); - scheduler.schedule(new Runnable() { - public void run() { - removeApplication(appId); - } - }, collectorLingerPeriod, TimeUnit.MILLISECONDS); + final ContainerId containerId = context.getContainerId(); + removeApplicationCollector(containerId); } } + @VisibleForTesting + protected Future removeApplicationCollector(final ContainerId containerId) { + final ApplicationId appId = + containerId.getApplicationAttemptId().getApplicationId(); + return scheduler.schedule(new Runnable() { + public void run() { + synchronized (appIdToContainerId) { + Set masterContainers = appIdToContainerId.get(appId); + if (masterContainers == null) { + LOG.info("Stop container for " + containerId + + " is called before initializing container."); + return; + } + masterContainers.remove(containerId); + if (masterContainers.size() == 0) { + // remove only if it is last master container + removeApplication(appId); + appIdToContainerId.remove(appId); + } + } + } + }, collectorLingerPeriod, TimeUnit.MILLISECONDS); + } + @VisibleForTesting boolean hasApplication(ApplicationId appId) { return collectorManager.containsTimelineCollector(appId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java index a1d4aa98143..032073958da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ExitUtil; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.junit.After; +import org.junit.Assert; import org.junit.Test; /** @@ -57,10 +59,10 @@ public class TestPerNodeTimelineCollectorsAuxService { private ApplicationAttemptId appAttemptId; private PerNodeTimelineCollectorsAuxService auxService; private Configuration conf; + private ApplicationId appId; public TestPerNodeTimelineCollectorsAuxService() { - ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); + appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); appAttemptId = ApplicationAttemptId.newInstance(appId, 1); conf = new YarnConfiguration(); // enable timeline service v.2 @@ -111,15 +113,6 @@ public class TestPerNodeTimelineCollectorsAuxService { when(context.getContainerType()).thenReturn( ContainerType.APPLICATION_MASTER); auxService.stopContainer(context); - // auxService should have the app's collector and need to remove only after - // a configured period - assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); - for (int i = 0; i < 4; i++) { - Thread.sleep(500L); - if (!auxService.hasApplication(appAttemptId.getApplicationId())) { - break; - } - } // auxService should not have that app assertFalse(auxService.hasApplication(appAttemptId.getApplicationId())); @@ -159,21 +152,53 @@ public class TestPerNodeTimelineCollectorsAuxService { private PerNodeTimelineCollectorsAuxService createCollectorAndAddApplication() { PerNodeTimelineCollectorsAuxService service = createCollector(); + + ContainerInitializationContext context = + createContainerInitalizationContext(1); + service.initializeContainer(context); + return service; + } + + ContainerInitializationContext createContainerInitalizationContext( + int attempt) { + appAttemptId = ApplicationAttemptId.newInstance(appId, attempt); // create an AM container ContainerId containerId = getAMContainerId(); ContainerInitializationContext context = mock(ContainerInitializationContext.class); when(context.getContainerId()).thenReturn(containerId); - when(context.getContainerType()).thenReturn( - ContainerType.APPLICATION_MASTER); - service.initializeContainer(context); - return service; + when(context.getContainerType()) + .thenReturn(ContainerType.APPLICATION_MASTER); + return context; + } + + ContainerTerminationContext createContainerTerminationContext(int attempt) { + appAttemptId = ApplicationAttemptId.newInstance(appId, attempt); + // create an AM container + ContainerId containerId = getAMContainerId(); + ContainerTerminationContext context = + mock(ContainerTerminationContext.class); + when(context.getContainerId()).thenReturn(containerId); + when(context.getContainerType()) + .thenReturn(ContainerType.APPLICATION_MASTER); + return context; } private PerNodeTimelineCollectorsAuxService createCollector() { NodeTimelineCollectorManager collectorManager = createCollectorManager(); PerNodeTimelineCollectorsAuxService service = - spy(new PerNodeTimelineCollectorsAuxService(collectorManager)); + spy(new PerNodeTimelineCollectorsAuxService(collectorManager) { + @Override + protected Future removeApplicationCollector(ContainerId containerId) { + Future future = super.removeApplicationCollector(containerId); + try { + future.get(); + } catch (Exception e) { + Assert.fail("Expeption thrown while removing collector"); + } + return future; + } + }); service.init(conf); service.start(); return service; @@ -204,4 +229,40 @@ public class TestPerNodeTimelineCollectorsAuxService { private ContainerId getContainerId(long id) { return ContainerId.newContainerId(appAttemptId, id); } + + @Test(timeout = 60000) + public void testRemoveAppWhenSecondAttemptAMCotainerIsLaunchedSameNode() + throws Exception { + // add first attempt collector + auxService = createCollectorAndAddApplication(); + // auxService should have a single app + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); + + // add second attempt collector before first attempt master container stop + ContainerInitializationContext containerInitalizationContext = + createContainerInitalizationContext(2); + auxService.initializeContainer(containerInitalizationContext); + + assertTrue("Applicatin not found in collectors.", + auxService.hasApplication(appAttemptId.getApplicationId())); + + // first attempt stop container + ContainerTerminationContext context = createContainerTerminationContext(1); + auxService.stopContainer(context); + + // 2nd attempt container removed, still collector should hold application id + assertTrue("collector has removed application though 2nd attempt" + + " is running this node", + auxService.hasApplication(appAttemptId.getApplicationId())); + + // second attempt stop container + context = createContainerTerminationContext(2); + auxService.stopContainer(context); + + // auxService should not have that app + assertFalse("Application is not removed from collector", + auxService.hasApplication(appAttemptId.getApplicationId())); + auxService.close(); + } + }