YARN-9056. Improved YARN service upgrade state logic for readiness check.

Contributed by Chandni Singh
This commit is contained in:
Eric Yang 2018-11-27 18:36:59 -05:00
parent 4c106fca0c
commit f657a2a661
9 changed files with 104 additions and 41 deletions

View File

@ -115,6 +115,7 @@ import static org.apache.hadoop.yarn.api.records.ContainerExitStatus
.KILLED_AFTER_APP_COMPLETION;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
.EXIT_FALSE;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
@ -833,9 +834,8 @@ public class ServiceScheduler extends CompositeService {
LOG.error("No component instance exists for {}", containerId);
return;
}
ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent(
containerId, ComponentInstanceEventType.BECOME_READY);
dispatcher.getEventHandler().handle(becomeReadyEvent);
dispatcher.getEventHandler().handle(
new ComponentInstanceEvent(containerId, START));
}
@Override

View File

@ -188,13 +188,13 @@ public class Component implements EventHandler<ComponentEvent> {
new NeedsUpgradeTransition())
.addTransition(STABLE, CANCEL_UPGRADING, CANCEL_UPGRADE,
new NeedsUpgradeTransition())
.addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE,
.addTransition(STABLE, EnumSet.of(STABLE, FLEXING), CHECK_STABLE,
new CheckStableTransition())
// Cancel upgrade while previous upgrade is still in progress
.addTransition(UPGRADING, CANCEL_UPGRADING,
CANCEL_UPGRADE, new NeedsUpgradeTransition())
.addTransition(UPGRADING, EnumSet.of(UPGRADING, STABLE),
.addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE),
CHECK_STABLE, new CheckStableTransition())
.addTransition(UPGRADING, UPGRADING, CONTAINER_COMPLETED,
new CompletedAfterUpgradeTransition())

View File

@ -129,7 +129,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
.addTransition(STARTED, INIT, STOP,
new ContainerStoppedTransition())
.addTransition(STARTED, READY, BECOME_READY,
new ContainerBecomeReadyTransition())
new ContainerBecomeReadyTransition(false))
// FROM READY
.addTransition(READY, STARTED, BECOME_NOT_READY,
@ -142,16 +142,20 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// FROM UPGRADING
.addTransition(UPGRADING, EnumSet.of(READY, CANCEL_UPGRADING),
CANCEL_UPGRADE, new CancelUpgradeTransition())
.addTransition(UPGRADING, EnumSet.of(READY), BECOME_READY,
new ReadyAfterUpgradeTransition())
.addTransition(UPGRADING, EnumSet.of(REINITIALIZED), START,
new StartedAfterUpgradeTransition())
.addTransition(UPGRADING, UPGRADING, STOP,
new StoppedAfterUpgradeTransition())
// FROM CANCEL_UPGRADING
.addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, READY),
BECOME_READY, new ReadyAfterUpgradeTransition())
.addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING,
REINITIALIZED), START, new StartedAfterUpgradeTransition())
.addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT),
STOP, new StoppedAfterCancelUpgradeTransition())
.addTransition(REINITIALIZED, CANCEL_UPGRADING, CANCEL_UPGRADE,
new CancelledAfterReinitTransition())
.addTransition(REINITIALIZED, READY, BECOME_READY,
new ContainerBecomeReadyTransition(true))
.installTopology();
public ComponentInstance(Component component,
@ -227,16 +231,30 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
private static class ContainerBecomeReadyTransition extends BaseTransition {
private final boolean isReinitialized;
ContainerBecomeReadyTransition(boolean isReinitialized) {
this.isReinitialized = isReinitialized;
}
@Override
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
compInstance.setContainerState(ContainerState.READY);
if (!isReinitialized) {
compInstance.component.incContainersReady(true);
} else {
compInstance.component.incContainersReady(false);
ComponentEvent checkState = new ComponentEvent(
compInstance.component.getName(), ComponentEventType.CHECK_STABLE);
compInstance.scheduler.getDispatcher().getEventHandler().handle(
checkState);
}
compInstance.postContainerReady();
}
}
private static class ReadyAfterUpgradeTransition implements
private static class StartedAfterUpgradeTransition implements
MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
ComponentInstanceState> {
@ -247,7 +265,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
if (instance.pendingCancelUpgrade) {
// cancellation of upgrade was triggered before the upgrade was
// finished.
LOG.info("{} received ready but cancellation pending",
LOG.info("{} received started but cancellation pending",
event.getContainerId());
instance.upgradeInProgress.set(true);
instance.cancelUpgrade();
@ -256,8 +274,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
instance.upgradeInProgress.set(false);
instance.setContainerState(ContainerState.READY);
instance.component.incContainersReady(false);
instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
instance.component.getUpgradeStatus() :
@ -265,12 +282,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
status.decContainersThatNeedUpgrade();
instance.serviceVersion = status.getTargetVersion();
ComponentEvent checkState = new ComponentEvent(
instance.component.getName(),
ComponentEventType.CHECK_STABLE);
instance.scheduler.getDispatcher().getEventHandler().handle(checkState);
instance.postContainerReady();
return ComponentInstanceState.READY;
return ComponentInstanceState.REINITIALIZED;
}
}
@ -568,6 +580,19 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
}
private static class CancelledAfterReinitTransition extends BaseTransition {
@Override
public void transition(ComponentInstance instance,
ComponentInstanceEvent event) {
if (instance.upgradeInProgress.compareAndSet(false, true)) {
instance.cancelUpgrade();
} else {
LOG.info("{} pending cancellation", event.getContainerId());
instance.pendingCancelUpgrade = true;
}
}
}
private static class CancelUpgradeTransition implements
MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
ComponentInstanceState> {

View File

@ -23,5 +23,6 @@ public enum ComponentInstanceState {
STARTED,
READY,
UPGRADING,
CANCEL_UPGRADING
CANCEL_UPGRADING,
REINITIALIZED
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
@ -37,6 +38,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.REINITIALIZED;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.STARTED;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.FLEX;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_NOT_READY;
@ -108,8 +110,9 @@ public class ServiceMonitor extends AbstractService {
ComponentInstance instance = entry.getValue();
ProbeStatus status = instance.ping();
ComponentInstanceState instanceState = instance.getState();
if (status.isSuccess()) {
if (instance.getState() == STARTED) {
if (instanceState == STARTED || instanceState == REINITIALIZED) {
LOG.info("Readiness check succeeded for {}: {}", instance
.getCompInstanceName(), status);
// synchronously update the state.

View File

@ -306,10 +306,14 @@ public class TestServiceManager {
private void makeAllInstancesReady(ServiceContext context)
throws TimeoutException, InterruptedException {
context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
ComponentInstanceEvent event = new ComponentInstanceEvent(containerId,
ComponentInstanceEventType.BECOME_READY);
ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
containerId, ComponentInstanceEventType.START);
context.scheduler.getDispatcher().getEventHandler().handle(startEvent);
context.scheduler.getDispatcher().getEventHandler().handle(event);
ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent(
containerId, ComponentInstanceEventType.BECOME_READY);
context.scheduler.getDispatcher().getEventHandler().handle(
becomeReadyEvent);
}));
GenericTestUtils.waitFor(()-> {
for (ComponentInstance instance:
@ -350,11 +354,17 @@ public class TestServiceManager {
// instances of comp1 get upgraded and become ready event is triggered
// become ready
compInstances.forEach(instance -> {
ComponentInstanceEvent event = new ComponentInstanceEvent(
ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.START);
context.scheduler.getDispatcher().getEventHandler().handle(startEvent);
ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY);
context.scheduler.getDispatcher().getEventHandler().handle(event);
context.scheduler.getDispatcher().getEventHandler().handle(
becomeReadyEvent);
});
GenericTestUtils.waitFor(() -> {

View File

@ -497,7 +497,8 @@ public class TestYarnNativeServices extends ServiceTestUtils {
}
// Test to verify ANTI_AFFINITY placement policy
// 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler
// 1. Start mini cluster
// with 3 NMs and scheduler placement-constraint handler
// 2. Create an example service with 3 containers
// 3. Verify no more than 1 container comes up in each of the 3 NMs
// 4. Flex the component to 4 containers

View File

@ -38,6 +38,7 @@ import org.junit.Test;
import java.util.Iterator;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_READY;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
/**
@ -159,6 +160,8 @@ public class TestComponent {
// reinitialization of a container done
for(ComponentInstance instance : comp.getAllComponentInstances()) {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), START));
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), BECOME_READY));
}
@ -199,6 +202,8 @@ public class TestComponent {
// second instance finished upgrading
ComponentInstance instance2 = iter.next();
instance2.handle(new ComponentInstanceEvent(
instance2.getContainer().getId(), ComponentInstanceEventType.START));
instance2.handle(new ComponentInstanceEvent(
instance2.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));
@ -230,6 +235,9 @@ public class TestComponent {
// cancel upgrade successful for both instances
for(ComponentInstance instance : comp.getAllComponentInstances()) {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.START));
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));

View File

@ -98,7 +98,12 @@ public class TestComponentInstance {
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
instance.handle(instanceEvent);
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
ComponentInstanceEventType.START));
Assert.assertEquals("instance not running",
ContainerState.RUNNING_BUT_UNREADY,
component.getComponentSpec().getContainer(instance.getContainer()
.getId().toString()).getState());
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));
Assert.assertEquals("instance not ready", ContainerState.READY,
@ -246,23 +251,33 @@ public class TestComponentInstance {
instance.handle(cancelEvent);
// either upgrade failed or successful
ComponentInstanceEvent readyOrStopEvent = new ComponentInstanceEvent(
if (upgradeSuccessful) {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.START));
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
upgradeSuccessful ? ComponentInstanceEventType.BECOME_READY :
ComponentInstanceEventType.STOP);
ComponentInstanceEventType.BECOME_READY));
} else {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.STOP));
}
instance.handle(readyOrStopEvent);
Assert.assertEquals("instance not upgrading", ContainerState.UPGRADING,
component.getComponentSpec().getContainer(instance.getContainer()
.getId().toString()).getState());
// response for cancel received
ComponentInstanceEvent readyOrStopCancel = new ComponentInstanceEvent(
if (cancelUpgradeSuccessful) {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.START));
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
cancelUpgradeSuccessful ? ComponentInstanceEventType.BECOME_READY :
ComponentInstanceEventType.STOP);
instance.handle(readyOrStopCancel);
ComponentInstanceEventType.BECOME_READY));
} else {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.STOP));
}
if (cancelUpgradeSuccessful) {
Assert.assertEquals("instance not ready", ContainerState.READY,
component.getComponentSpec().getContainer(instance.getContainer()