diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 9bf567a7940..63331977b2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -360,17 +361,22 @@ public class ServiceScheduler extends CompositeService { amRMClient.releaseAssignedContainer(container.getId()); } } - + ApplicationId appId = ApplicationId.fromString(app.getId()); existingRecords.forEach((encodedContainerId, record) -> { String componentName = record.get(YarnRegistryAttributes.YARN_COMPONENT); if (componentName != null) { Component component = componentsByName.get(componentName); - ComponentInstance compInstance = component.getComponentInstance( - record.description); - ContainerId containerId = ContainerId.fromString(record.get( - YarnRegistryAttributes.YARN_ID)); - unRecoveredInstances.put(containerId, compInstance); - component.removePendingInstance(compInstance); + if (component != null) { + ComponentInstance compInstance = component.getComponentInstance( + record.description); + ContainerId containerId = ContainerId.fromString(record.get( + YarnRegistryAttributes.YARN_ID)); + if (containerId.getApplicationAttemptId().getApplicationId() + .equals(appId)) { + unRecoveredInstances.put(containerId, compInstance); + component.removePendingInstance(compInstance); + } + } } }); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java index 4dc1ebd74b3..8db98bd0275 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java @@ -210,6 +210,49 @@ public class TestServiceAM extends ServiceTestUtils{ .getState()); } + // Test to verify that the AM doesn't wait for containers of a different app + // even though it corresponds to the same service. + @Test(timeout = 200000) + public void testContainersFromDifferentApp() + throws Exception { + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + Service exampleApp = new Service(); + exampleApp.setId(applicationId.toString()); + exampleApp.setName("testContainersFromDifferentApp"); + String comp1Name = "comp1"; + String comp1InstName = "comp1-0"; + + org.apache.hadoop.yarn.service.api.records.Component compA = + createComponent(comp1Name, 1, "sleep"); + exampleApp.addComponent(compA); + + MockServiceAM am = new MockServiceAM(exampleApp); + ContainerId containerId = am.createContainerId(1); + // saves the container in the registry + am.feedRegistryComponent(containerId, comp1Name, comp1InstName); + + ApplicationId changedAppId = ApplicationId.newInstance( + System.currentTimeMillis(), 2); + exampleApp.setId(changedAppId.toString()); + am.init(conf); + am.start(); + // 1 pending instance since the container in registry belongs to a different + // app. + Assert.assertEquals(1, + am.getComponent(comp1Name).getPendingInstances().size()); + + am.feedContainerToComp(exampleApp, 1, comp1Name); + GenericTestUtils.waitFor(() -> am.getCompInstance(comp1Name, comp1InstName) + .getContainerStatus() != null, 2000, 200000); + + Assert.assertEquals("container state", + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + am.getCompInstance(comp1Name, comp1InstName).getContainerStatus() + .getState()); + am.stop(); + } + @Test public void testScheduleWithMultipleResourceTypes() throws TimeoutException, InterruptedException, IOException {