From 7b175739a9dfd9f5aef2f257f7e3ca5bdc8f8f09 Mon Sep 17 00:00:00 2001 From: Brahma Reddy Battula Date: Thu, 9 Jul 2020 12:34:52 +0530 Subject: [PATCH] YARN-10341. Yarn Service Container Completed event doesn't get processed. Contributed by Bilwa S T. (cherry picked from commit dfe60392c91be21f574c1659af22f5c381b2675a) --- .../hadoop/yarn/service/ServiceScheduler.java | 2 +- .../hadoop/yarn/service/TestServiceAM.java | 88 +++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 458a7a1c5c1..0d77479b959 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -737,7 +737,7 @@ public class ServiceScheduler extends CompositeService { LOG.warn( "Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ", containerId, status.getExitStatus(), status.getDiagnostics()); - return; + continue; } ComponentEvent event = new ComponentEvent(instance.getCompName(), CONTAINER_COMPLETED) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java index bbcbee24680..5b961a83817 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java @@ -22,22 +22,29 @@ import com.google.common.collect.ImmutableMap; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingCluster; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.DockerCredentialTokenIdentifier; import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.ComponentState; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState; @@ -47,7 +54,9 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +72,8 @@ import java.util.concurrent.TimeoutException; import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class TestServiceAM extends ServiceTestUtils{ @@ -72,6 +83,9 @@ public class TestServiceAM extends ServiceTestUtils{ private File basedir; YarnConfiguration conf = new YarnConfiguration(); TestingCluster zkCluster; + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = + new ServiceTestUtils.ServiceFSWatcher(); @Before public void setup() throws Exception { @@ -311,6 +325,80 @@ public class TestServiceAM extends ServiceTestUtils{ am.stop(); } + @Test + public void testContainerCompletedEventProcessed() throws Exception { + ServiceContext context = createServiceContext("abc"); + MockServiceScheduler scheduler = new MockServiceScheduler(context); + scheduler.init(conf); + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 1); + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 0); + ContainerStatus containerStatus1 = ContainerStatus.newInstance(containerId1, + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + "successful", 0); + ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 1); + ContainerStatus containerStatus2 = ContainerStatus.newInstance(containerId2, + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + "successful", 0); + ComponentInstance instance = Mockito.mock(ComponentInstance.class); + Mockito.doReturn("componentInstance").when(instance).getCompName(); + scheduler.addLiveCompInstance(containerId2, instance); + List statuses = new ArrayList<>(); + // First container instance will be null + statuses.add(containerStatus1); + // Second container instance is added + scheduler.addLiveCompInstance(containerId2, instance); + statuses.add(containerStatus2); + scheduler.callbackHandler.onContainersCompleted(statuses); + // For second container event should be dispatched. + verify(scheduler.dispatcher, times(1)).getEventHandler(); + DefaultMetricsSystem.shutdown(); + } + + private ServiceContext createServiceContext(String name) + throws Exception { + Artifact artifact = new Artifact(); + artifact.setId("1"); + artifact.setType(Artifact.TypeEnum.TARBALL); + Service serviceDef = ServiceTestUtils.createExampleApplication(); + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + serviceDef.setId(applicationId.toString()); + serviceDef.setName(name); + serviceDef.setState(ServiceState.STARTED); + serviceDef.getComponents().forEach(component -> + component.setArtifact(artifact)); + ServiceContext context = new MockRunningServiceContext(rule, + serviceDef); + context.scheduler.getDispatcher().setDrainEventsOnStop(); + context.scheduler.getDispatcher().start(); + return context; + } + + class MockServiceScheduler extends ServiceScheduler { + private AsyncDispatcher dispatcher; + private AMRMClientCallback callbackHandler = new AMRMClientCallback(); + + MockServiceScheduler(ServiceContext context) { + super(context); + } + + @Override + protected AsyncDispatcher createAsyncDispatcher() { + dispatcher = Mockito.mock(AsyncDispatcher.class); + EventHandler handler = Mockito.mock(EventHandler.class); + Mockito.doReturn(handler).when(dispatcher).getEventHandler(); + return dispatcher; + } + + @Override + protected AMRMClientAsync createAMRMClient() { + return AMRMClientAsync.createAMRMClientAsync(1000, callbackHandler); + } + + } + @Test public void testRecordTokensForContainers() throws Exception { ApplicationId applicationId = ApplicationId.newInstance(123456, 1);