diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java index cd44a585680..a3d67144223 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java @@ -59,11 +59,14 @@ public final class NeverRestartPolicy implements ComponentRestartPolicy { return false; } - @Override public boolean isReadyForDownStream(Component component) { - if (hasCompleted(component)) { - return true; + @Override public boolean isReadyForDownStream(Component dependentComponent) { + if (dependentComponent.getNumReadyInstances() + + dependentComponent.getNumSucceededInstances() + + dependentComponent.getNumFailedInstances() + < dependentComponent.getNumDesiredInstances()) { + return false; } - return false; + return true; } @Override public boolean allowUpgrades() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java index b939ba0428f..28ebf9ef329 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java @@ -65,12 +65,14 @@ public final class OnFailureRestartPolicy implements ComponentRestartPolicy { return false; } - @Override public boolean isReadyForDownStream(Component component) { - if (hasCompletedSuccessfully(component)) { - return true; + @Override public boolean isReadyForDownStream(Component dependentComponent) { + if (dependentComponent.getNumReadyInstances() + + dependentComponent.getNumSucceededInstances() + + dependentComponent.getNumFailedInstances() + < dependentComponent.getNumDesiredInstances()) { + return false; } - - return false; + return true; } @Override public boolean allowUpgrades() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java index 03158cfc18e..3e3b4a1a3dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java @@ -65,6 +65,7 @@ public class TestComponentRestartPolicy { when(component.getNumSucceededInstances()).thenReturn(new Long(1)); when(component.getNumFailedInstances()).thenReturn(new Long(2)); when(component.getNumDesiredInstances()).thenReturn(3); + when(component.getNumReadyInstances()).thenReturn(3); ComponentInstance instance = mock(ComponentInstance.class); when(instance.getComponent()).thenReturn(component); @@ -92,6 +93,7 @@ public class TestComponentRestartPolicy { when(component.getNumSucceededInstances()).thenReturn(new Long(3)); when(component.getNumFailedInstances()).thenReturn(new Long(0)); when(component.getNumDesiredInstances()).thenReturn(3); + when(component.getNumReadyInstances()).thenReturn(3); ComponentInstance instance = mock(ComponentInstance.class); when(instance.getComponent()).thenReturn(component); @@ -123,7 +125,7 @@ public class TestComponentRestartPolicy { assertEquals(true, restartPolicy.shouldRelaunchInstance(instance, containerStatus)); - assertEquals(false, restartPolicy.isReadyForDownStream(component)); + assertEquals(true, restartPolicy.isReadyForDownStream(component)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java index 7cef91e25db..c758e6fbeaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/monitor/TestServiceMonitor.java @@ -85,11 +85,15 @@ public class TestServiceMonitor extends ServiceTestUtils { exampleApp.setId(applicationId.toString()); exampleApp.setName("testComponentDependency"); exampleApp.addComponent(createComponent("compa", 1, "sleep 1000")); - Component compb = createComponent("compb", 1, "sleep 1000"); - // Let compb depends on compa; - compb.setDependencies(Collections.singletonList("compa")); + Component compb = createComponent("compb", 1, "sleep 1000", Component + .RestartPolicyEnum.ON_FAILURE, Collections.singletonList("compa")); + // Let compb depends on compb; + Component compc = createComponent("compc", 1, "sleep 1000", Component + .RestartPolicyEnum.NEVER, Collections.singletonList("compb")); + exampleApp.addComponent(compb); + exampleApp.addComponent(compc); MockServiceAM am = new MockServiceAM(exampleApp); am.init(conf); @@ -105,6 +109,11 @@ public class TestServiceMonitor extends ServiceTestUtils { // waiting for compb's dependencies are satisfied am.waitForDependenciesSatisfied("compb"); + // feed 1 container to compb, + am.feedContainerToComp(exampleApp, 2, "compb"); + // waiting for compc's dependencies are satisfied + am.waitForDependenciesSatisfied("compc"); + // feed 1 container to compb am.feedContainerToComp(exampleApp, 2, "compb"); am.flexComponent("compa", 2);