diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 9b9305cc93a..aa1fc90e548 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -109,6 +109,7 @@ import static org.apache.hadoop.yarn.api.records.ContainerExitStatus .KILLED_AFTER_APP_COMPLETION; import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*; import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; +import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes .EXIT_FALSE; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes @@ -827,9 +828,8 @@ public class ServiceScheduler extends CompositeService { LOG.error("No component instance exists for {}", containerId); return; } - ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent( - containerId, ComponentInstanceEventType.BECOME_READY); - dispatcher.getEventHandler().handle(becomeReadyEvent); + dispatcher.getEventHandler().handle( + new ComponentInstanceEvent(containerId, START)); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 526bde0ff13..0972afe2826 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -177,13 +177,13 @@ public class Component implements EventHandler { new NeedsUpgradeTransition()) .addTransition(STABLE, CANCEL_UPGRADING, CANCEL_UPGRADE, new NeedsUpgradeTransition()) - .addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE, + .addTransition(STABLE, EnumSet.of(STABLE, FLEXING), CHECK_STABLE, new CheckStableTransition()) // Cancel upgrade while previous upgrade is still in progress .addTransition(UPGRADING, CANCEL_UPGRADING, CANCEL_UPGRADE, new NeedsUpgradeTransition()) - .addTransition(UPGRADING, EnumSet.of(UPGRADING, STABLE), + .addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE), CHECK_STABLE, new CheckStableTransition()) .addTransition(UPGRADING, UPGRADING, CONTAINER_COMPLETED, new CompletedAfterUpgradeTransition()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 89c9a228256..86b0e32bd36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -131,7 +131,7 @@ public class ComponentInstance implements EventHandler, .addTransition(STARTED, INIT, STOP, new ContainerStoppedTransition()) .addTransition(STARTED, READY, BECOME_READY, - new ContainerBecomeReadyTransition()) + new ContainerBecomeReadyTransition(false)) // FROM READY .addTransition(READY, STARTED, BECOME_NOT_READY, @@ -144,16 +144,20 @@ public class ComponentInstance implements EventHandler, // FROM UPGRADING .addTransition(UPGRADING, EnumSet.of(READY, CANCEL_UPGRADING), CANCEL_UPGRADE, new CancelUpgradeTransition()) - .addTransition(UPGRADING, EnumSet.of(READY), BECOME_READY, - new ReadyAfterUpgradeTransition()) + .addTransition(UPGRADING, EnumSet.of(REINITIALIZED), START, + new StartedAfterUpgradeTransition()) .addTransition(UPGRADING, UPGRADING, STOP, new StoppedAfterUpgradeTransition()) // FROM CANCEL_UPGRADING - .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, READY), - BECOME_READY, new ReadyAfterUpgradeTransition()) + .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, + REINITIALIZED), START, new StartedAfterUpgradeTransition()) .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT), STOP, new StoppedAfterCancelUpgradeTransition()) + .addTransition(REINITIALIZED, CANCEL_UPGRADING, CANCEL_UPGRADE, + new CancelledAfterReinitTransition()) + .addTransition(REINITIALIZED, READY, BECOME_READY, + new ContainerBecomeReadyTransition(true)) .installTopology(); public ComponentInstance(Component component, @@ -229,16 +233,30 @@ public class ComponentInstance implements EventHandler, } private static class ContainerBecomeReadyTransition extends BaseTransition { + private final boolean isReinitialized; + + ContainerBecomeReadyTransition(boolean isReinitialized) { + this.isReinitialized = isReinitialized; + } + @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { compInstance.setContainerState(ContainerState.READY); - compInstance.component.incContainersReady(true); + if (!isReinitialized) { + compInstance.component.incContainersReady(true); + } else { + compInstance.component.incContainersReady(false); + ComponentEvent checkState = new ComponentEvent( + compInstance.component.getName(), ComponentEventType.CHECK_STABLE); + compInstance.scheduler.getDispatcher().getEventHandler().handle( + checkState); + } compInstance.postContainerReady(); } } - private static class ReadyAfterUpgradeTransition implements + private static class StartedAfterUpgradeTransition implements MultipleArcTransition { @@ -249,7 +267,7 @@ public class ComponentInstance implements EventHandler, if (instance.pendingCancelUpgrade) { // cancellation of upgrade was triggered before the upgrade was // finished. - LOG.info("{} received ready but cancellation pending", + LOG.info("{} received started but cancellation pending", event.getContainerId()); instance.upgradeInProgress.set(true); instance.cancelUpgrade(); @@ -258,8 +276,7 @@ public class ComponentInstance implements EventHandler, } instance.upgradeInProgress.set(false); - instance.setContainerState(ContainerState.READY); - instance.component.incContainersReady(false); + instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY); Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ? instance.component.getUpgradeStatus() : @@ -267,12 +284,7 @@ public class ComponentInstance implements EventHandler, status.decContainersThatNeedUpgrade(); instance.serviceVersion = status.getTargetVersion(); - ComponentEvent checkState = new ComponentEvent( - instance.component.getName(), - ComponentEventType.CHECK_STABLE); - instance.scheduler.getDispatcher().getEventHandler().handle(checkState); - instance.postContainerReady(); - return ComponentInstanceState.READY; + return ComponentInstanceState.REINITIALIZED; } } @@ -570,6 +582,19 @@ public class ComponentInstance implements EventHandler, } } + private static class CancelledAfterReinitTransition extends BaseTransition { + @Override + public void transition(ComponentInstance instance, + ComponentInstanceEvent event) { + if (instance.upgradeInProgress.compareAndSet(false, true)) { + instance.cancelUpgrade(); + } else { + LOG.info("{} pending cancellation", event.getContainerId()); + instance.pendingCancelUpgrade = true; + } + } + } + private static class CancelUpgradeTransition implements MultipleArcTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java index 28cbcf570ad..92b221efdb9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java @@ -23,5 +23,6 @@ public enum ComponentInstanceState { STARTED, READY, UPGRADING, - CANCEL_UPGRADING + CANCEL_UPGRADING, + REINITIALIZED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ServiceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ServiceMonitor.java index 033569cc17f..9e77e41bb9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ServiceMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/monitor/ServiceMonitor.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState; import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; @@ -37,6 +38,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.REINITIALIZED; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.STARTED; import static org.apache.hadoop.yarn.service.component.ComponentEventType.FLEX; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_NOT_READY; @@ -108,8 +110,9 @@ public class ServiceMonitor extends AbstractService { ComponentInstance instance = entry.getValue(); ProbeStatus status = instance.ping(); + ComponentInstanceState instanceState = instance.getState(); if (status.isSuccess()) { - if (instance.getState() == STARTED) { + if (instanceState == STARTED || instanceState == REINITIALIZED) { LOG.info("Readiness check succeeded for {}: {}", instance .getCompInstanceName(), status); // synchronously update the state. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java index 406eea486b0..1d8ccff4f6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java @@ -306,10 +306,14 @@ public class TestServiceManager { private void makeAllInstancesReady(ServiceContext context) throws TimeoutException, InterruptedException { context.scheduler.getLiveInstances().forEach(((containerId, instance) -> { - ComponentInstanceEvent event = new ComponentInstanceEvent(containerId, - ComponentInstanceEventType.BECOME_READY); + ComponentInstanceEvent startEvent = new ComponentInstanceEvent( + containerId, ComponentInstanceEventType.START); + context.scheduler.getDispatcher().getEventHandler().handle(startEvent); - context.scheduler.getDispatcher().getEventHandler().handle(event); + ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent( + containerId, ComponentInstanceEventType.BECOME_READY); + context.scheduler.getDispatcher().getEventHandler().handle( + becomeReadyEvent); })); GenericTestUtils.waitFor(()-> { for (ComponentInstance instance: @@ -350,11 +354,17 @@ public class TestServiceManager { // instances of comp1 get upgraded and become ready event is triggered // become ready compInstances.forEach(instance -> { - ComponentInstanceEvent event = new ComponentInstanceEvent( + ComponentInstanceEvent startEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.START); + context.scheduler.getDispatcher().getEventHandler().handle(startEvent); + + ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent( instance.getContainer().getId(), ComponentInstanceEventType.BECOME_READY); - context.scheduler.getDispatcher().getEventHandler().handle(event); + context.scheduler.getDispatcher().getEventHandler().handle( + becomeReadyEvent); }); GenericTestUtils.waitFor(() -> { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index d50c574dd34..31a95b0c942 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -497,7 +497,8 @@ public class TestYarnNativeServices extends ServiceTestUtils { } // Test to verify ANTI_AFFINITY placement policy - // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler + // 1. Start mini cluster + // with 3 NMs and scheduler placement-constraint handler // 2. Create an example service with 3 containers // 3. Verify no more than 1 container comes up in each of the 3 NMs // 4. Flex the component to 4 containers diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index f11d871021c..1961ff4791a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -38,6 +38,7 @@ import org.junit.Test; import java.util.Iterator; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_READY; +import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP; /** @@ -159,6 +160,8 @@ public class TestComponent { // reinitialization of a container done for(ComponentInstance instance : comp.getAllComponentInstances()) { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), START)); instance.handle(new ComponentInstanceEvent( instance.getContainer().getId(), BECOME_READY)); } @@ -199,6 +202,8 @@ public class TestComponent { // second instance finished upgrading ComponentInstance instance2 = iter.next(); + instance2.handle(new ComponentInstanceEvent( + instance2.getContainer().getId(), ComponentInstanceEventType.START)); instance2.handle(new ComponentInstanceEvent( instance2.getContainer().getId(), ComponentInstanceEventType.BECOME_READY)); @@ -230,6 +235,9 @@ public class TestComponent { // cancel upgrade successful for both instances for(ComponentInstance instance : comp.getAllComponentInstances()) { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.START)); instance.handle(new ComponentInstanceEvent( instance.getContainer().getId(), ComponentInstanceEventType.BECOME_READY)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index c5a96317caa..09652d7403b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -98,7 +98,12 @@ public class TestComponentInstance { ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); instance.handle(instanceEvent); - + instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), + ComponentInstanceEventType.START)); + Assert.assertEquals("instance not running", + ContainerState.RUNNING_BUT_UNREADY, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), ComponentInstanceEventType.BECOME_READY)); Assert.assertEquals("instance not ready", ContainerState.READY, @@ -246,23 +251,33 @@ public class TestComponentInstance { instance.handle(cancelEvent); // either upgrade failed or successful - ComponentInstanceEvent readyOrStopEvent = new ComponentInstanceEvent( - instance.getContainer().getId(), - upgradeSuccessful ? ComponentInstanceEventType.BECOME_READY : - ComponentInstanceEventType.STOP); + if (upgradeSuccessful) { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.START)); + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.BECOME_READY)); + } else { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.STOP)); + } - instance.handle(readyOrStopEvent); Assert.assertEquals("instance not upgrading", ContainerState.UPGRADING, component.getComponentSpec().getContainer(instance.getContainer() .getId().toString()).getState()); // response for cancel received - ComponentInstanceEvent readyOrStopCancel = new ComponentInstanceEvent( - instance.getContainer().getId(), - cancelUpgradeSuccessful ? ComponentInstanceEventType.BECOME_READY : - ComponentInstanceEventType.STOP); - - instance.handle(readyOrStopCancel); + if (cancelUpgradeSuccessful) { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.START)); + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.BECOME_READY)); + } else { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.STOP)); + } if (cancelUpgradeSuccessful) { Assert.assertEquals("instance not ready", ContainerState.READY, component.getComponentSpec().getContainer(instance.getContainer()