From 913f87dada27776c539dfb352400ecf8d40e7943 Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Wed, 26 Sep 2018 14:51:35 -0400 Subject: [PATCH] YARN-8665. Added Yarn service cancel upgrade option. Contributed by Chandni Singh --- .../yarn/service/client/ApiServiceClient.java | 20 + .../hadoop/yarn/service/webapp/ApiServer.java | 34 +- .../hadoop/yarn/service/ClientAMProtocol.java | 5 + .../hadoop/yarn/service/ClientAMService.java | 12 + .../hadoop/yarn/service/ServiceEvent.java | 14 +- .../hadoop/yarn/service/ServiceEventType.java | 3 +- .../hadoop/yarn/service/ServiceManager.java | 349 ++++++++++++------ .../service/api/records/ContainerState.java | 2 +- .../service/api/records/ServiceState.java | 7 +- .../yarn/service/client/ServiceClient.java | 21 ++ .../yarn/service/component/Component.java | 275 +++++++++----- .../service/component/ComponentEvent.java | 10 - .../service/component/ComponentEventType.java | 1 + .../service/component/ComponentState.java | 3 +- .../component/instance/ComponentInstance.java | 269 +++++++++++--- .../instance/ComponentInstanceEventType.java | 3 +- .../instance/ComponentInstanceState.java | 3 +- .../ContainerLaunchService.java | 4 +- .../client/ClientAMProtocolPBClientImpl.java | 13 + .../ClientAMProtocolPBServiceImpl.java | 13 + .../yarn/service/provider/ProviderUtils.java | 9 +- .../yarn/service/utils/ServiceApiUtil.java | 14 +- .../yarn/service/utils/SliderFileSystem.java | 49 +++ .../src/main/proto/ClientAMProtocol.proto | 8 + .../service/MockRunningServiceContext.java | 20 +- .../hadoop/yarn/service/ServiceTestUtils.java | 2 +- .../yarn/service/TestServiceManager.java | 136 +++++-- .../yarn/service/TestYarnNativeServices.java | 44 +++ .../yarn/service/client/TestServiceCLI.java | 17 + .../yarn/service/component/TestComponent.java | 239 ++++++++++-- .../instance/TestComponentInstance.java | 177 ++++++++- .../src/test/resources/log4j.properties | 19 + .../yarn/client/cli/ApplicationCLI.java | 12 +- .../hadoop/yarn/client/cli/TestYarnCLI.java | 2 + .../yarn/client/api/AppAdminClient.java | 13 + .../ContainerManagerImpl.java | 1 + 36 files changed, 1479 insertions(+), 344 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index ca6cc508b27..b7a15412c3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -723,4 +723,24 @@ public class ApiServiceClient extends AppAdminClient { } return null; } + + @Override + public int actionCancelUpgrade( + String appName) throws IOException, YarnException { + int result; + try { + Service service = new Service(); + service.setName(appName); + service.setState(ServiceState.CANCEL_UPGRADING); + String buffer = jsonSerDeser.toJson(service); + LOG.info("Cancel upgrade in progress. Please wait.."); + ClientResponse response = getApiClient(getServicePath(appName)) + .put(ClientResponse.class, buffer); + result = processResponse(response); + } catch (Exception e) { + LOG.error("Failed to cancel upgrade: ", e); + result = EXIT_EXCEPTION_THROWN; + } + return result; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index cd6f0d79e2d..c4e33171d48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -58,6 +58,7 @@ import java.util.*; import java.util.stream.Collectors; import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED; +import static org.apache.hadoop.yarn.service.api.records.ServiceState.CANCEL_UPGRADING; import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*; @@ -445,6 +446,12 @@ public class ApiServer { return upgradeService(updateServiceData, ugi); } + // If CANCEL_UPGRADING is requested + if (updateServiceData.getState() != null && + updateServiceData.getState() == CANCEL_UPGRADING) { + return cancelUpgradeService(appName, ugi); + } + // If new lifetime value specified then update it if (updateServiceData.getLifetime() != null && updateServiceData.getLifetime() > 0) { @@ -460,8 +467,7 @@ public class ApiServer { LOG.error(message, e); return formatResponse(Status.NOT_FOUND, e.getMessage()); } catch (YarnException e) { - String message = "Service is not found in hdfs: " + appName; - LOG.error(message, e); + LOG.error(e.getMessage(), e); return formatResponse(Status.NOT_FOUND, e.getMessage()); } catch (Exception e) { String message = "Error while performing operation for app: " + appName; @@ -707,6 +713,27 @@ public class ApiServer { return formatResponse(Status.ACCEPTED, status); } + private Response cancelUpgradeService(String serviceName, + final UserGroupInformation ugi) throws IOException, InterruptedException { + int result = ugi.doAs((PrivilegedExceptionAction) () -> { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + int exitCode = sc.actionCancelUpgrade(serviceName); + sc.close(); + return exitCode; + }); + if (result == EXIT_SUCCESS) { + ServiceStatus status = new ServiceStatus(); + LOG.info("Service {} cancelling upgrade", serviceName); + status.setDiagnostics("Service " + serviceName + + " cancelling upgrade."); + status.setState(ServiceState.ACCEPTED); + return formatResponse(Status.ACCEPTED, status); + } + return Response.status(Status.BAD_REQUEST).build(); + } + private Response processComponentsUpgrade(UserGroupInformation ugi, String serviceName, Set compNames) throws YarnException, IOException, InterruptedException { @@ -734,7 +761,8 @@ public class ApiServer { Service service, List containers) throws YarnException, IOException, InterruptedException { - if (service.getState() != ServiceState.UPGRADING) { + if (!service.getState().equals(ServiceState.UPGRADING) && + !service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { throw new YarnException( String.format("The upgrade of service %s has not been initiated.", service.getName())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java index 652a314abef..39e7dfa7d0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; @@ -60,4 +62,7 @@ public interface ClientAMProtocol { GetCompInstancesResponseProto getCompInstances( GetCompInstancesRequestProto request) throws IOException, YarnException; + + CancelUpgradeResponseProto cancelUpgrade( + CancelUpgradeRequestProto request) throws IOException, YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index 2ef8f7ee7b7..47e98293a2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; @@ -208,4 +210,14 @@ public class ClientAMService extends AbstractService ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(containers.toArray( new Container[containers.size()]))).build(); } + + @Override + public CancelUpgradeResponseProto cancelUpgrade( + CancelUpgradeRequestProto request) throws IOException, YarnException { + LOG.info("Cancel service upgrade by {}", + UserGroupInformation.getCurrentUser()); + ServiceEvent event = new ServiceEvent(ServiceEventType.CANCEL_UPGRADE); + context.scheduler.getDispatcher().getEventHandler().handle(event); + return CancelUpgradeResponseProto.newBuilder().build(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java index 3a55472c0c8..cf4455de378 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java @@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.service.api.records.Component; -import java.util.Queue; +import java.util.List; /** * Events are handled by {@link ServiceManager} to manage the service @@ -33,7 +33,8 @@ public class ServiceEvent extends AbstractEvent { private String version; private boolean autoFinalize; private boolean expressUpgrade; - private Queue compsToUpgradeInOrder; + // For express upgrade they should be in order. + private List compsToUpgrade; public ServiceEvent(ServiceEventType serviceEventType) { super(serviceEventType); @@ -71,13 +72,12 @@ public class ServiceEvent extends AbstractEvent { return this; } - public Queue getCompsToUpgradeInOrder() { - return compsToUpgradeInOrder; + public List getCompsToUpgrade() { + return compsToUpgrade; } - public ServiceEvent setCompsToUpgradeInOrder( - Queue compsToUpgradeInOrder) { - this.compsToUpgradeInOrder = compsToUpgradeInOrder; + public ServiceEvent setCompsToUpgrade(List compsToUpgrade) { + this.compsToUpgrade = compsToUpgrade; return this; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java index 4fc420ba6dd..03afdd36f24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java @@ -24,5 +24,6 @@ package org.apache.hadoop.yarn.service; public enum ServiceEventType { START, UPGRADE, - CHECK_STABLE + CHECK_STABLE, + CANCEL_UPGRADE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java index 04454b1d290..48513256091 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -20,32 +20,31 @@ package org.apache.hadoop.yarn.service; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; -import org.apache.hadoop.yarn.state.InvalidStateTransitionException; -import org.apache.hadoop.yarn.state.MultipleArcTransition; -import org.apache.hadoop.yarn.state.StateMachine; -import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.state.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; @@ -71,8 +70,10 @@ public class ServiceManager implements EventHandler { private final SliderFileSystem fs; private String upgradeVersion; - private Queue compsToUpgradeInOrder; + private List componentsToUpgrade; + private List compsAffectedByUpgrade = new ArrayList<>(); + private String cancelledVersion; private static final StateMachineFactory STATE_MACHINE_FACTORY = @@ -88,11 +89,14 @@ public class ServiceManager implements EventHandler { .addTransition(State.UPGRADING, EnumSet.of(State.STABLE, State.UPGRADING), ServiceEventType.START, - new CheckStableTransition()) + new StartFromUpgradeTransition()) .addTransition(State.UPGRADING, EnumSet.of(State.STABLE, State.UPGRADING), ServiceEventType.CHECK_STABLE, new CheckStableTransition()) + + .addTransition(State.UPGRADING, State.UPGRADING, + ServiceEventType.CANCEL_UPGRADE, new CancelUpgradeTransition()) .installTopology(); public ServiceManager(ServiceContext context) { @@ -148,19 +152,25 @@ public class ServiceManager implements EventHandler { public State transition(ServiceManager serviceManager, ServiceEvent event) { serviceManager.upgradeVersion = event.getVersion(); + serviceManager.componentsToUpgrade = event.getCompsToUpgrade(); + event.getCompsToUpgrade().forEach(comp -> + serviceManager.compsAffectedByUpgrade.add(comp.getName())); try { if (event.isExpressUpgrade()) { - serviceManager.serviceSpec.setState(ServiceState.EXPRESS_UPGRADING); - serviceManager.compsToUpgradeInOrder = event - .getCompsToUpgradeInOrder(); - serviceManager.upgradeNextCompIfAny(); - } else if (event.isAutoFinalize()) { - serviceManager.serviceSpec.setState(ServiceState - .UPGRADING_AUTO_FINALIZE); + serviceManager.dispatchNeedUpgradeEvents(false); + serviceManager.upgradeNextCompIfAny(false); } else { - serviceManager.serviceSpec.setState( - ServiceState.UPGRADING); + serviceManager.dispatchNeedUpgradeEvents(false); } + + if (event.isExpressUpgrade()) { + serviceManager.setServiceState(ServiceState.EXPRESS_UPGRADING); + } else if (event.isAutoFinalize()) { + serviceManager.setServiceState(ServiceState.UPGRADING_AUTO_FINALIZE); + } else { + serviceManager.setServiceState(ServiceState.UPGRADING); + } + return State.UPGRADING; } catch (Throwable e) { LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(), @@ -181,24 +191,32 @@ public class ServiceManager implements EventHandler { if (currState.equals(ServiceState.STABLE)) { return State.STABLE; } - if (currState.equals(ServiceState.EXPRESS_UPGRADING)) { - org.apache.hadoop.yarn.service.api.records.Component component = - serviceManager.compsToUpgradeInOrder.peek(); - if (!component.getState().equals(ComponentState.NEEDS_UPGRADE) && - !component.getState().equals(ComponentState.UPGRADING)) { - serviceManager.compsToUpgradeInOrder.remove(); + if (currState.equals(ServiceState.EXPRESS_UPGRADING) || + currState.equals(ServiceState.CANCEL_UPGRADING)) { + + if (!serviceManager.componentsToUpgrade.isEmpty()) { + org.apache.hadoop.yarn.service.api.records.Component compSpec = + serviceManager.componentsToUpgrade.get(0); + Component component = serviceManager.scheduler.getAllComponents() + .get(compSpec.getName()); + + if (!component.isUpgrading()) { + serviceManager.componentsToUpgrade.remove(0); + serviceManager.upgradeNextCompIfAny( + currState.equals(ServiceState.CANCEL_UPGRADING)); + } } - serviceManager.upgradeNextCompIfAny(); } + if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) || - event.getType().equals(ServiceEventType.START) || - (currState.equals(ServiceState.EXPRESS_UPGRADING) && - serviceManager.compsToUpgradeInOrder.isEmpty())) { + ((currState.equals(ServiceState.EXPRESS_UPGRADING) || + currState.equals(ServiceState.CANCEL_UPGRADING)) && + serviceManager.componentsToUpgrade.isEmpty())) { + ServiceState targetState = checkIfStable(serviceManager.serviceSpec); if (targetState.equals(ServiceState.STABLE)) { - if (serviceManager.finalizeUpgrade()) { - LOG.info("Service def state changed from {} -> {}", currState, - serviceManager.serviceSpec.getState()); + if (serviceManager.finalizeUpgrade( + currState.equals(ServiceState.CANCEL_UPGRADING))) { return State.STABLE; } } @@ -207,52 +225,149 @@ public class ServiceManager implements EventHandler { } } - private void upgradeNextCompIfAny() { - if (!compsToUpgradeInOrder.isEmpty()) { - org.apache.hadoop.yarn.service.api.records.Component component = - compsToUpgradeInOrder.peek(); + private static class StartFromUpgradeTransition implements + MultipleArcTransition { - ComponentEvent needUpgradeEvent = new ComponentEvent( - component.getName(), ComponentEventType.UPGRADE).setTargetSpec( - component).setUpgradeVersion(upgradeVersion).setExpressUpgrade(true); - context.scheduler.getDispatcher().getEventHandler().handle( - needUpgradeEvent); + @Override + public State transition(ServiceManager serviceManager, ServiceEvent event) { + ServiceState currState = serviceManager.serviceSpec.getState(); + + ServiceState targetState = checkIfStable(serviceManager.serviceSpec); + if (targetState.equals(ServiceState.STABLE)) { + if (serviceManager.finalizeUpgrade( + currState.equals(ServiceState.CANCEL_UPGRADING))) { + return State.STABLE; + } + } + return State.UPGRADING; + } + } + + private static class CancelUpgradeTransition implements + SingleArcTransition { + + @Override + public void transition(ServiceManager serviceManager, + ServiceEvent event) { + if (!serviceManager.getState().equals(State.UPGRADING)) { + LOG.info("[SERVICE]: Cannot cancel the upgrade in {} state", + serviceManager.getState()); + return; + } + try { + Service targetSpec = ServiceApiUtil.loadService( + serviceManager.context.fs, serviceManager.getName()); + + Service sourceSpec = ServiceApiUtil.loadServiceUpgrade( + serviceManager.context.fs, serviceManager.getName(), + serviceManager.upgradeVersion); + serviceManager.cancelledVersion = serviceManager.upgradeVersion; + LOG.info("[SERVICE] cancel version {}", + serviceManager.cancelledVersion); + serviceManager.upgradeVersion = serviceManager.serviceSpec.getVersion(); + serviceManager.componentsToUpgrade = serviceManager + .resolveCompsToUpgrade(sourceSpec, targetSpec); + + serviceManager.compsAffectedByUpgrade.clear(); + serviceManager.componentsToUpgrade.forEach(comp -> + serviceManager.compsAffectedByUpgrade.add(comp.getName())); + + serviceManager.dispatchNeedUpgradeEvents(true); + serviceManager.upgradeNextCompIfAny(true); + serviceManager.setServiceState(ServiceState.CANCEL_UPGRADING); + } catch (Throwable e) { + LOG.error("[SERVICE]: Cancellation of upgrade failed", e); + } + } + } + + private void upgradeNextCompIfAny(boolean cancelUpgrade) { + if (!componentsToUpgrade.isEmpty()) { + org.apache.hadoop.yarn.service.api.records.Component component = + componentsToUpgrade.get(0); + + serviceSpec.getComponent(component.getName()).getContainers().forEach( + container -> { + ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( + ContainerId.fromString(container.getId()), + !cancelUpgrade ? ComponentInstanceEventType.UPGRADE : + ComponentInstanceEventType.CANCEL_UPGRADE); + LOG.info("Upgrade container {} {}", container.getId(), + cancelUpgrade); + dispatcher.getEventHandler().handle(upgradeEvent); + }); + } + } + + private void dispatchNeedUpgradeEvents(boolean cancelUpgrade) { + if (componentsToUpgrade != null) { + componentsToUpgrade.forEach(component -> { + ComponentEvent needUpgradeEvent = new ComponentEvent( + component.getName(), !cancelUpgrade ? ComponentEventType.UPGRADE : + ComponentEventType.CANCEL_UPGRADE) + .setTargetSpec(component) + .setUpgradeVersion(upgradeVersion); + LOG.info("Upgrade component {} {}", component.getName(), cancelUpgrade); + context.scheduler.getDispatcher().getEventHandler() + .handle(needUpgradeEvent); + }); } } /** * @return whether finalization of upgrade was successful. */ - private boolean finalizeUpgrade() { - try { - // save the application id and state to - Service targetSpec = ServiceApiUtil.loadServiceUpgrade( - fs, getName(), upgradeVersion); - targetSpec.setId(serviceSpec.getId()); - targetSpec.setState(ServiceState.STABLE); - Map allComps = scheduler.getAllComponents(); - targetSpec.getComponents().forEach(compSpec -> { - Component comp = allComps.get(compSpec.getName()); - compSpec.setState(comp.getComponentSpec().getState()); - }); - jsonSerDeser.save(fs.getFileSystem(), - ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true); - fs.deleteClusterUpgradeDir(getName(), upgradeVersion); - } catch (IOException e) { - LOG.error("Upgrade did not complete because unable to re-write the" + - " service definition", e); - return false; + private boolean finalizeUpgrade(boolean cancelUpgrade) { + if (!cancelUpgrade) { + try { + // save the application id and state to + Service targetSpec = ServiceApiUtil.loadServiceUpgrade( + fs, getName(), upgradeVersion); + targetSpec.setId(serviceSpec.getId()); + targetSpec.setState(ServiceState.STABLE); + Map allComps = scheduler.getAllComponents(); + targetSpec.getComponents().forEach(compSpec -> { + Component comp = allComps.get(compSpec.getName()); + compSpec.setState(comp.getComponentSpec().getState()); + }); + jsonSerDeser.save(fs.getFileSystem(), + ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true); + } catch (IOException e) { + LOG.error("Upgrade did not complete because unable to re-write the" + + " service definition", e); + return false; + } } try { - fs.deleteClusterUpgradeDir(getName(), upgradeVersion); + String upgradeVersionToDel = cancelUpgrade? cancelledVersion : + upgradeVersion; + LOG.info("[SERVICE]: delete upgrade dir version {}", upgradeVersionToDel); + fs.deleteClusterUpgradeDir(getName(), upgradeVersionToDel); + + for (String comp : compsAffectedByUpgrade) { + String compDirVersionToDel = cancelUpgrade? cancelledVersion : + serviceSpec.getVersion(); + LOG.info("[SERVICE]: delete {} dir version {}", comp, + compDirVersionToDel); + fs.deleteComponentDir(compDirVersionToDel, comp); + } + + if (cancelUpgrade) { + fs.deleteComponentsVersionDirIfEmpty(cancelledVersion); + } else { + fs.deleteComponentsVersionDirIfEmpty(serviceSpec.getVersion()); + } + } catch (IOException e) { LOG.warn("Unable to delete upgrade definition for service {} " + "version {}", getName(), upgradeVersion); } - serviceSpec.setState(ServiceState.STABLE); + setServiceState(ServiceState.STABLE); serviceSpec.setVersion(upgradeVersion); upgradeVersion = null; + cancelledVersion = null; + compsAffectedByUpgrade.clear(); return true; } @@ -290,9 +405,59 @@ public class ServiceManager implements EventHandler { Service targetSpec = ServiceApiUtil.loadServiceUpgrade( context.fs, context.service.getName(), upgradeVersion); + List + compsNeedUpgradeList = resolveCompsToUpgrade(context.service, + targetSpec); + + ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE) + .setVersion(upgradeVersion) + .setAutoFinalize(autoFinalize) + .setExpressUpgrade(expressUpgrade); + + if (expressUpgrade) { + // In case of express upgrade components need to be upgraded in order. + // Once the service manager gets notified that a component finished + // upgrading, it then issues event to upgrade the next component. + Map + compsNeedUpgradeByName = new HashMap<>(); + if (compsNeedUpgradeList != null) { + compsNeedUpgradeList.forEach(component -> + compsNeedUpgradeByName.put(component.getName(), component)); + } + List resolvedComps = ServiceApiUtil + .resolveCompsDependency(targetSpec); + + List + orderedCompUpgrade = new LinkedList<>(); + resolvedComps.forEach(compName -> { + org.apache.hadoop.yarn.service.api.records.Component component = + compsNeedUpgradeByName.get(compName); + if (component != null ) { + orderedCompUpgrade.add(component); + } + }); + event.setCompsToUpgrade(orderedCompUpgrade); + } else { + event.setCompsToUpgrade(compsNeedUpgradeList); + } + context.scheduler.getDispatcher().getEventHandler().handle( + event); + + if (autoFinalize && (compsNeedUpgradeList == null || + compsNeedUpgradeList.isEmpty())) { + // nothing to upgrade and auto finalize is requested, trigger a + // state check. + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.CHECK_STABLE)); + } + } + + private List + resolveCompsToUpgrade(Service sourceSpec, Service targetSpec) { + List compsNeedUpgradeList = componentsFinder. - findTargetComponentSpecs(context.service, targetSpec); + findTargetComponentSpecs(sourceSpec, targetSpec); // remove all components from need upgrade list if there restart policy // doesn't all upgrade. @@ -316,54 +481,20 @@ public class ServiceManager implements EventHandler { }); } - ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE) - .setVersion(upgradeVersion) - .setAutoFinalize(autoFinalize) - .setExpressUpgrade(expressUpgrade); + return compsNeedUpgradeList; + } - if (expressUpgrade) { - // In case of express upgrade components need to be upgraded in order. - // Once the service manager gets notified that a component finished - // upgrading, it then issues event to upgrade the next component. - Map - compsNeedUpgradeByName = new HashMap<>(); - if (compsNeedUpgradeList != null) { - compsNeedUpgradeList.forEach(component -> - compsNeedUpgradeByName.put(component.getName(), component)); - } - List resolvedComps = ServiceApiUtil - .resolveCompsDependency(targetSpec); - - Queue - orderedCompUpgrade = new LinkedList<>(); - resolvedComps.forEach(compName -> { - org.apache.hadoop.yarn.service.api.records.Component component = - compsNeedUpgradeByName.get(compName); - if (component != null ) { - orderedCompUpgrade.add(component); - } - }); - event.setCompsToUpgradeInOrder(orderedCompUpgrade); - } - - context.scheduler.getDispatcher().getEventHandler().handle(event); - - if (compsNeedUpgradeList != null && !compsNeedUpgradeList.isEmpty()) { - if (!expressUpgrade) { - compsNeedUpgradeList.forEach(component -> { - ComponentEvent needUpgradeEvent = new ComponentEvent( - component.getName(), ComponentEventType.UPGRADE).setTargetSpec( - component).setUpgradeVersion(event.getVersion()); - context.scheduler.getDispatcher().getEventHandler().handle( - needUpgradeEvent); - - }); - } - } else if (autoFinalize) { - // nothing to upgrade if upgrade auto finalize is requested, trigger a - // state check. - context.scheduler.getDispatcher().getEventHandler().handle( - new ServiceEvent(ServiceEventType.CHECK_STABLE)); + /** + * Sets the state of the service in the service spec. + * @param state service state + */ + private void setServiceState( + org.apache.hadoop.yarn.service.api.records.ServiceState state) { + org.apache.hadoop.yarn.service.api.records.ServiceState curState = + serviceSpec.getState(); + if (!curState.equals(state)) { + serviceSpec.setState(state); + LOG.info("[SERVICE] spec state changed from {} -> {}", curState, state); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java index cac527a5482..a6e9a2e7d82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java @@ -27,5 +27,5 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable public enum ContainerState { RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING, SUCCEEDED, - FAILED; + FAILED, FAILED_UPGRADE; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index 49c19853671..3f2f4f6b91d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -29,5 +29,10 @@ import org.apache.hadoop.classification.InterfaceStability; @ApiModel(description = "The current state of an service.") public enum ServiceState { ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING, - UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED; + UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED, CANCEL_UPGRADING; + + public static boolean isUpgrading(ServiceState state) { + return state.equals(UPGRADING) || state.equals(UPGRADING_AUTO_FINALIZE) + || state.equals(EXPRESS_UPGRADING); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index a27ed87aa63..23db57efad6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.client.util.YarnClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; @@ -352,6 +353,26 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return actionUpgrade(persistedService, containersToUpgrade); } + @Override + public int actionCancelUpgrade(String appName) throws IOException, + YarnException { + Service liveService = getStatus(appName); + if (liveService == null || + !ServiceState.isUpgrading(liveService.getState())) { + throw new YarnException("Service " + appName + " is not upgrading, " + + "so nothing to cancel."); + } + + ApplicationReport appReport = yarnClient.getApplicationReport( + getAppId(appName)); + if (StringUtils.isEmpty(appReport.getHost())) { + throw new YarnException(appName + " AM hostname is empty"); + } + ClientAMProtocol proxy = createAMProxy(appName, appReport); + proxy.cancelUpgrade(CancelUpgradeRequestProto.newBuilder().build()); + return EXIT_SUCCESS; + } + @Override public int actionCleanUp(String appName, String userName) throws IOException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index acf3404fe93..526bde0ff13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.service.component; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import static org.apache.hadoop.yarn.service.api.records.Component @@ -44,7 +43,6 @@ import org.apache.hadoop.yarn.service.ServiceEventType; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; -import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; import org.apache.hadoop.yarn.service.ContainerFailureTracker; import org.apache.hadoop.yarn.service.ServiceContext; @@ -89,6 +87,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*; import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*; import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; +import static org.apache.hadoop.yarn.service.component.ComponentEventType.CANCEL_UPGRADE; +import static org.apache.hadoop.yarn.service.component.ComponentEventType.UPGRADE; import static org.apache.hadoop.yarn.service.component.ComponentState.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*; import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*; @@ -126,9 +126,8 @@ public class Component implements EventHandler { new ConcurrentHashMap<>(); private boolean healthThresholdMonitorEnabled = false; - private AtomicBoolean upgradeInProgress = new AtomicBoolean(false); - private ComponentEvent upgradeEvent; - private AtomicLong numContainersThatNeedUpgrade = new AtomicLong(0); + private UpgradeStatus upgradeStatus = new UpgradeStatus(); + private UpgradeStatus cancelUpgradeStatus = new UpgradeStatus(); private StateMachine stateMachine; @@ -160,6 +159,8 @@ public class Component implements EventHandler { // Flex while previous flex is still in progress .addTransition(FLEXING, EnumSet.of(FLEXING, STABLE), FLEX, new FlexComponentTransition()) + .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE), + CHECK_STABLE, new CheckStableTransition()) // container failed while stable .addTransition(STABLE, FLEXING, CONTAINER_COMPLETED, @@ -172,19 +173,28 @@ public class Component implements EventHandler { // For flex down, go to STABLE state .addTransition(STABLE, EnumSet.of(STABLE, FLEXING), FLEX, new FlexComponentTransition()) - .addTransition(STABLE, UPGRADING, ComponentEventType.UPGRADE, - new ComponentNeedsUpgradeTransition()) - //Upgrade while previous upgrade is still in progress - .addTransition(UPGRADING, UPGRADING, ComponentEventType.UPGRADE, - new ComponentNeedsUpgradeTransition()) - .addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE), - CHECK_STABLE, new CheckStableTransition()) - .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE), - CHECK_STABLE, new CheckStableTransition()) + .addTransition(STABLE, UPGRADING, UPGRADE, + new NeedsUpgradeTransition()) + .addTransition(STABLE, CANCEL_UPGRADING, CANCEL_UPGRADE, + new NeedsUpgradeTransition()) .addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE, new CheckStableTransition()) - .addTransition(UPGRADING, FLEXING, CONTAINER_COMPLETED, - new ContainerCompletedTransition()) + + // Cancel upgrade while previous upgrade is still in progress + .addTransition(UPGRADING, CANCEL_UPGRADING, + CANCEL_UPGRADE, new NeedsUpgradeTransition()) + .addTransition(UPGRADING, EnumSet.of(UPGRADING, STABLE), + CHECK_STABLE, new CheckStableTransition()) + .addTransition(UPGRADING, UPGRADING, CONTAINER_COMPLETED, + new CompletedAfterUpgradeTransition()) + + .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, FLEXING, + STABLE), CHECK_STABLE, new CheckStableTransition()) + .addTransition(CANCEL_UPGRADING, CANCEL_UPGRADING, + CONTAINER_COMPLETED, new CompletedAfterUpgradeTransition()) + .addTransition(CANCEL_UPGRADING, FLEXING, CONTAINER_ALLOCATED, + new ContainerAllocatedTransition()) + .installTopology(); public Component( @@ -332,7 +342,7 @@ public class Component implements EventHandler { + before + " to " + event.getDesired()); component.requestContainers(delta); component.createNumCompInstances(delta); - component.componentSpec.setState( + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); component.getScheduler().getApp().setState(ServiceState.STARTED); return FLEXING; @@ -430,11 +440,11 @@ public class Component implements EventHandler { if (component.getNumRunningInstances() + component .getNumSucceededInstances() + component.getNumFailedInstances() < component.getComponentSpec().getNumberOfContainers()) { - component.componentSpec.setState( + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); return FLEXING; } else{ - component.componentSpec.setState( + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); return STABLE; } @@ -444,22 +454,22 @@ public class Component implements EventHandler { Component component) { // if desired == running if (component.componentMetrics.containersReady.value() == component - .getComponentSpec().getNumberOfContainers() - && component.numContainersThatNeedUpgrade.get() == 0) { - component.componentSpec.setState( + .getComponentSpec().getNumberOfContainers() && + !component.doesNeedUpgrade()) { + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); return STABLE; + } else if (component.doesNeedUpgrade()) { + component.setComponentState(org.apache.hadoop.yarn.service.api.records. + ComponentState.NEEDS_UPGRADE); + return component.getState(); } else if (component.componentMetrics.containersReady.value() != component .getComponentSpec().getNumberOfContainers()) { - component.componentSpec.setState( + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); return FLEXING; - } else { - // component.numContainersThatNeedUpgrade.get() > 0 - component.componentSpec.setState(org.apache.hadoop.yarn.service.api. - records.ComponentState.NEEDS_UPGRADE); - return UPGRADING; } + return component.getState(); } // This method should be called whenever there is an increment or decrement @@ -467,22 +477,16 @@ public class Component implements EventHandler { //This should not matter for terminating components private static synchronized void checkAndUpdateComponentState( Component component, boolean isIncrement) { - org.apache.hadoop.yarn.service.api.records.ComponentState curState = - component.componentSpec.getState(); if (component.getRestartPolicyHandler().isLongLived()) { if (isIncrement) { // check if all containers are in READY state - if (component.numContainersThatNeedUpgrade.get() == 0 - && component.componentMetrics.containersReady.value() - == component.componentMetrics.containersDesired.value()) { - component.componentSpec.setState( + if (!component.upgradeStatus.areContainersUpgrading() && + !component.cancelUpgradeStatus.areContainersUpgrading() && + component.componentMetrics.containersReady.value() == + component.componentMetrics.containersDesired.value()) { + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); - if (curState != component.componentSpec.getState()) { - LOG.info("[COMPONENT {}] state changed from {} -> {}", - component.componentSpec.getName(), curState, - component.componentSpec.getState()); - } // component state change will trigger re-check of service state component.context.getServiceManager().checkAndUpdateServiceState(); } @@ -491,19 +495,14 @@ public class Component implements EventHandler { // still need to verify the count before changing the component state if (component.componentMetrics.containersReady.value() < component.componentMetrics.containersDesired.value()) { - component.componentSpec.setState( + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState .FLEXING); } else if (component.componentMetrics.containersReady.value() == component.componentMetrics.containersDesired.value()) { - component.componentSpec.setState( + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); } - if (curState != component.componentSpec.getState()) { - LOG.info("[COMPONENT {}] state changed from {} -> {}", - component.componentSpec.getName(), curState, - component.componentSpec.getState()); - } // component state change will trigger re-check of service state component.context.getServiceManager().checkAndUpdateServiceState(); } @@ -511,8 +510,8 @@ public class Component implements EventHandler { // component state change will trigger re-check of service state component.context.getServiceManager().checkAndUpdateServiceState(); } - // when the service is stable then the state of component needs to - // transition to stable + // triggers the state machine in component to reach appropriate state + // once the state in spec is changed. component.dispatcher.getEventHandler().handle( new ComponentEvent(component.getName(), ComponentEventType.CHECK_STABLE)); @@ -544,25 +543,43 @@ public class Component implements EventHandler { } } - private static class ComponentNeedsUpgradeTransition extends BaseTransition { + private static class CompletedAfterUpgradeTransition extends BaseTransition { @Override public void transition(Component component, ComponentEvent event) { - component.upgradeInProgress.set(true); - component.upgradeEvent = event; - component.componentSpec.setState(org.apache.hadoop.yarn.service.api. - records.ComponentState.NEEDS_UPGRADE); - component.numContainersThatNeedUpgrade.set( + Preconditions.checkNotNull(event.getContainerId()); + component.updateMetrics(event.getStatus()); + component.dispatcher.getEventHandler().handle( + new ComponentInstanceEvent(event.getContainerId(), STOP) + .setStatus(event.getStatus())); + } + } + + private static class NeedsUpgradeTransition extends BaseTransition { + @Override + public void transition(Component component, ComponentEvent event) { + boolean isCancel = event.getType().equals(CANCEL_UPGRADE); + UpgradeStatus status = !isCancel ? component.upgradeStatus : + component.cancelUpgradeStatus; + + status.inProgress.set(true); + status.targetSpec = event.getTargetSpec(); + status.targetVersion = event.getUpgradeVersion(); + LOG.info("[COMPONENT {}]: need upgrade to {}", + component.getName(), status.targetVersion); + + status.containersNeedUpgrade.set( component.componentSpec.getNumberOfContainers()); - component.componentSpec.getContainers().forEach(container -> { - container.setState(ContainerState.NEEDS_UPGRADE); - if (event.isExpressUpgrade()) { - ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( - ContainerId.fromString(container.getId()), - ComponentInstanceEventType.UPGRADE); - LOG.info("Upgrade container {}", container.getId()); - component.dispatcher.getEventHandler().handle(upgradeEvent); - } + + component.setComponentState(org.apache.hadoop.yarn.service.api. + records.ComponentState.NEEDS_UPGRADE); + + component.getAllComponentInstances().forEach(instance -> { + instance.setContainerState(ContainerState.NEEDS_UPGRADE); }); + + if (event.getType().equals(CANCEL_UPGRADE)) { + component.upgradeStatus.reset(); + } } } @@ -572,22 +589,22 @@ public class Component implements EventHandler { @Override public ComponentState transition(Component component, ComponentEvent componentEvent) { - org.apache.hadoop.yarn.service.api.records.ComponentState currState = - component.componentSpec.getState(); - if (currState.equals(org.apache.hadoop.yarn.service.api.records - .ComponentState.STABLE)) { - return ComponentState.STABLE; - } // checkIfStable also updates the state in definition when STABLE ComponentState targetState = checkIfStable(component); - if (targetState.equals(STABLE) && component.upgradeInProgress.get()) { - component.componentSpec.overwrite( - component.upgradeEvent.getTargetSpec()); - component.upgradeEvent = null; + + if (targetState.equals(STABLE) && + !(component.upgradeStatus.isCompleted() && + component.cancelUpgradeStatus.isCompleted())) { + // Component stable after upgrade or cancel upgrade + UpgradeStatus status = !component.cancelUpgradeStatus.isCompleted() ? + component.cancelUpgradeStatus : component.upgradeStatus; + + component.componentSpec.overwrite(status.getTargetSpec()); + status.reset(); + ServiceEvent checkStable = new ServiceEvent(ServiceEventType. CHECK_STABLE); component.dispatcher.getEventHandler().handle(checkStable); - component.upgradeInProgress.set(false); } return targetState; } @@ -625,11 +642,14 @@ public class Component implements EventHandler { "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ", getName(), container.getId(), instance.getCompInstanceName(), container.getNodeId()); - if (upgradeInProgress.get()) { + if (!(upgradeStatus.isCompleted() && cancelUpgradeStatus.isCompleted())) { + UpgradeStatus status = !cancelUpgradeStatus.isCompleted() ? + cancelUpgradeStatus : upgradeStatus; + scheduler.getContainerLaunchService() .launchCompInstance(scheduler.getApp(), instance, container, - createLaunchContext(upgradeEvent.getTargetSpec(), - upgradeEvent.getUpgradeVersion())); + createLaunchContext(status.getTargetSpec(), + status.getTargetVersion())); } else { scheduler.getContainerLaunchService().launchCompInstance( scheduler.getApp(), instance, container, @@ -830,6 +850,12 @@ public class Component implements EventHandler { } } + private boolean doesNeedUpgrade() { + return cancelUpgradeStatus.areContainersUpgrading() || + upgradeStatus.areContainersUpgrading() || + upgradeStatus.failed.get(); + } + public boolean areDependenciesReady() { List dependencies = componentSpec.getDependencies(); if (ServiceUtils.isEmpty(dependencies)) { @@ -911,10 +937,6 @@ public class Component implements EventHandler { } } - public void decContainersThatNeedUpgrade() { - numContainersThatNeedUpgrade.decrementAndGet(); - } - public int getNumReadyInstances() { return componentMetrics.containersReady.value(); } @@ -972,10 +994,33 @@ public class Component implements EventHandler { } } - public ComponentEvent getUpgradeEvent() { + /** + * Returns whether a component is upgrading or not. + */ + public boolean isUpgrading() { + this.readLock.lock(); + + try { + return !(upgradeStatus.isCompleted() && + cancelUpgradeStatus.isCompleted()); + } finally { + this.readLock.unlock(); + } + } + + public UpgradeStatus getUpgradeStatus() { this.readLock.lock(); try { - return upgradeEvent; + return upgradeStatus; + } finally { + this.readLock.unlock(); + } + } + + public UpgradeStatus getCancelUpgradeStatus() { + this.readLock.lock(); + try { + return cancelUpgradeStatus; } finally { this.readLock.unlock(); } @@ -1013,6 +1058,70 @@ public class Component implements EventHandler { } } + /** + * Sets the state of the component in the component spec. + * @param state component state + */ + private void setComponentState( + org.apache.hadoop.yarn.service.api.records.ComponentState state) { + org.apache.hadoop.yarn.service.api.records.ComponentState curState = + componentSpec.getState(); + if (!curState.equals(state)) { + componentSpec.setState(state); + LOG.info("[COMPONENT {}] spec state changed from {} -> {}", + componentSpec.getName(), curState, state); + } + } + + /** + * Status of upgrade. + */ + public static class UpgradeStatus { + private org.apache.hadoop.yarn.service.api.records.Component targetSpec; + private String targetVersion; + private AtomicBoolean inProgress = new AtomicBoolean(false); + private AtomicLong containersNeedUpgrade = new AtomicLong(0); + private AtomicBoolean failed = new AtomicBoolean(false); + + public org.apache.hadoop.yarn.service.api.records. + Component getTargetSpec() { + return targetSpec; + } + + public String getTargetVersion() { + return targetVersion; + } + + /* + * @return whether the upgrade is completed or not + */ + public boolean isCompleted() { + return !inProgress.get(); + } + + public void decContainersThatNeedUpgrade() { + if (inProgress.get()) { + containersNeedUpgrade.decrementAndGet(); + } + } + + public void containerFailedUpgrade() { + failed.set(true); + } + + void reset() { + containersNeedUpgrade.set(0); + targetSpec = null; + targetVersion = null; + inProgress.set(false); + failed.set(false); + } + + boolean areContainersUpgrading() { + return containersNeedUpgrade.get() != 0; + } + } + public ServiceContext getContext() { return context; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java index 643961d505a..84caa77b205 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java @@ -35,7 +35,6 @@ public class ComponentEvent extends AbstractEvent { private ContainerId containerId; private org.apache.hadoop.yarn.service.api.records.Component targetSpec; private String upgradeVersion; - private boolean expressUpgrade; public ContainerId getContainerId() { return containerId; @@ -114,13 +113,4 @@ public class ComponentEvent extends AbstractEvent { this.upgradeVersion = upgradeVersion; return this; } - - public boolean isExpressUpgrade() { - return expressUpgrade; - } - - public ComponentEvent setExpressUpgrade(boolean expressUpgrade) { - this.expressUpgrade = expressUpgrade; - return this; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java index 44d781f2257..d211f491f3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java @@ -24,6 +24,7 @@ public enum ComponentEventType { CONTAINER_RECOVERED, CONTAINER_STARTED, CONTAINER_COMPLETED, + CANCEL_UPGRADE, UPGRADE, CHECK_STABLE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java index 0f63d03e95e..e1cd0c1bdf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java @@ -22,5 +22,6 @@ public enum ComponentState { INIT, FLEXING, STABLE, - UPGRADING + UPGRADING, + CANCEL_UPGRADING } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index afd8c671fa4..89c9a228256 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; +import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -64,8 +65,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.MessageFormat; import java.util.Date; +import java.util.EnumSet; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -84,8 +87,9 @@ public class ComponentInstance implements EventHandler, LoggerFactory.getLogger(ComponentInstance.class); private static final String FAILED_BEFORE_LAUNCH_DIAG = "failed before launch"; + private static final String UPGRADE_FAILED = "upgrade failed"; - private StateMachine stateMachine; private Component component; private final ReadLock readLock; @@ -106,7 +110,8 @@ public class ComponentInstance implements EventHandler, // This container object is used for rest API query private org.apache.hadoop.yarn.service.api.records.Container containerSpec; private String serviceVersion; - + private AtomicBoolean upgradeInProgress = new AtomicBoolean(false); + private boolean pendingCancelUpgrade = false; private static final StateMachineFactory, .addTransition(READY, STARTED, BECOME_NOT_READY, new ContainerBecomeNotReadyTransition()) .addTransition(READY, INIT, STOP, new ContainerStoppedTransition()) - .addTransition(READY, UPGRADING, UPGRADE, - new ContainerUpgradeTransition()) - .addTransition(UPGRADING, UPGRADING, UPGRADE, - new ContainerUpgradeTransition()) - .addTransition(UPGRADING, READY, BECOME_READY, - new ContainerBecomeReadyTransition()) - .addTransition(UPGRADING, INIT, STOP, new ContainerStoppedTransition()) + .addTransition(READY, UPGRADING, UPGRADE, new UpgradeTransition()) + .addTransition(READY, EnumSet.of(READY, CANCEL_UPGRADING), CANCEL_UPGRADE, + new CancelUpgradeTransition()) + + // FROM UPGRADING + .addTransition(UPGRADING, EnumSet.of(READY, CANCEL_UPGRADING), + CANCEL_UPGRADE, new CancelUpgradeTransition()) + .addTransition(UPGRADING, EnumSet.of(READY), BECOME_READY, + new ReadyAfterUpgradeTransition()) + .addTransition(UPGRADING, UPGRADING, STOP, + new StoppedAfterUpgradeTransition()) + + // FROM CANCEL_UPGRADING + .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, READY), + BECOME_READY, new ReadyAfterUpgradeTransition()) + .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT), + STOP, new StoppedAfterCancelUpgradeTransition()) .installTopology(); public ComponentInstance(Component component, @@ -217,24 +232,53 @@ public class ComponentInstance implements EventHandler, @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { - compInstance.containerSpec.setState(ContainerState.READY); - if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) { - compInstance.component.incContainersReady(false); - compInstance.component.decContainersThatNeedUpgrade(); - compInstance.serviceVersion = compInstance.component.getUpgradeEvent() - .getUpgradeVersion(); - ComponentEvent checkState = new ComponentEvent( - compInstance.component.getName(), ComponentEventType.CHECK_STABLE); - compInstance.scheduler.getDispatcher().getEventHandler().handle( - checkState); + compInstance.setContainerState(ContainerState.READY); + compInstance.component.incContainersReady(true); + compInstance.postContainerReady(); + } + } - } else { - compInstance.component.incContainersReady(true); - } - if (compInstance.timelineServiceEnabled) { - compInstance.serviceTimelinePublisher - .componentInstanceBecomeReady(compInstance.containerSpec); + private static class ReadyAfterUpgradeTransition implements + MultipleArcTransition { + + @Override + public ComponentInstanceState transition(ComponentInstance instance, + ComponentInstanceEvent event) { + + if (instance.pendingCancelUpgrade) { + // cancellation of upgrade was triggered before the upgrade was + // finished. + LOG.info("{} received ready but cancellation pending", + event.getContainerId()); + instance.upgradeInProgress.set(true); + instance.cancelUpgrade(); + instance.pendingCancelUpgrade = false; + return instance.getState(); } + + instance.upgradeInProgress.set(false); + instance.setContainerState(ContainerState.READY); + instance.component.incContainersReady(false); + + Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ? + instance.component.getUpgradeStatus() : + instance.component.getCancelUpgradeStatus(); + status.decContainersThatNeedUpgrade(); + + instance.serviceVersion = status.getTargetVersion(); + ComponentEvent checkState = new ComponentEvent( + instance.component.getName(), + ComponentEventType.CHECK_STABLE); + instance.scheduler.getDispatcher().getEventHandler().handle(checkState); + instance.postContainerReady(); + return ComponentInstanceState.READY; + } + } + + private void postContainerReady() { + if (timelineServiceEnabled) { + serviceTimelinePublisher.componentInstanceBecomeReady(containerSpec); } } @@ -242,7 +286,7 @@ public class ComponentInstance implements EventHandler, @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { - compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY); + compInstance.setContainerState(ContainerState.RUNNING_BUT_UNREADY); compInstance.component.decContainersReady(true); } } @@ -276,11 +320,13 @@ public class ComponentInstance implements EventHandler, " completed. Reinsert back to pending list and requested "); builder.append("a new container.").append(System.lineSeparator()); builder.append(" exitStatus=").append( - failureBeforeLaunch ? null : event.getStatus().getExitStatus()); + failureBeforeLaunch || event.getStatus() == null ? null : + event.getStatus().getExitStatus()); builder.append(", diagnostics="); builder.append(failureBeforeLaunch ? FAILED_BEFORE_LAUNCH_DIAG : - event.getStatus().getDiagnostics()); + (event.getStatus() != null ? event.getStatus().getDiagnostics() : + UPGRADE_FAILED)); if (event.getStatus() != null && event.getStatus().getExitStatus() != 0) { LOG.error(builder.toString()); @@ -342,15 +388,14 @@ public class ComponentInstance implements EventHandler, ComponentInstanceEvent event) { Component comp = compInstance.component; + ContainerStatus status = event.getStatus(); + // status is not available when upgrade fails String containerDiag = compInstance.getCompInstanceId() + ": " + ( - failedBeforeLaunching ? - FAILED_BEFORE_LAUNCH_DIAG : - event.getStatus().getDiagnostics()); + failedBeforeLaunching ? FAILED_BEFORE_LAUNCH_DIAG : + (status != null ? status.getDiagnostics() : UPGRADE_FAILED)); compInstance.diagnostics.append(containerDiag + System.lineSeparator()); compInstance.cancelContainerStatusRetriever(); - if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) { - compInstance.component.decContainersThatNeedUpgrade(); - } + if (compInstance.getState().equals(READY)) { compInstance.component.decContainersReady(true); } @@ -387,10 +432,9 @@ public class ComponentInstance implements EventHandler, // record in ATS compInstance.scheduler.getServiceTimelinePublisher() .componentInstanceFinished(compInstance.getContainer().getId(), - failedBeforeLaunching ? - -1 : - event.getStatus().getExitStatus(), ContainerState.FAILED, - containerDiag); + failedBeforeLaunching || status == null ? -1 : + status.getExitStatus(), + ContainerState.FAILED, containerDiag); // mark other component-instances/containers as STOPPED for (ContainerId containerId : scheduler.getLiveInstances() @@ -449,28 +493,129 @@ public class ComponentInstance implements EventHandler, .equals(state) || ContainerState.SUCCEEDED.equals(state); } - private static class ContainerUpgradeTransition extends BaseTransition { + private static class StoppedAfterUpgradeTransition extends + BaseTransition { @Override - public void transition(ComponentInstance compInstance, + public void transition(ComponentInstance instance, ComponentInstanceEvent event) { - if (!compInstance.containerSpec.getState().equals( - ContainerState.NEEDS_UPGRADE)) { - //nothing to upgrade. this may happen with express upgrade. + instance.component.getUpgradeStatus().decContainersThatNeedUpgrade(); + instance.component.decRunningContainers(); + + final ServiceScheduler scheduler = instance.component.getScheduler(); + scheduler.getAmRMClient().releaseAssignedContainer( + event.getContainerId()); + instance.scheduler.executorService.submit( + () -> instance.cleanupRegistry(event.getContainerId())); + scheduler.removeLiveCompInstance(event.getContainerId()); + instance.component.getUpgradeStatus().containerFailedUpgrade(); + instance.setContainerState(ContainerState.FAILED_UPGRADE); + instance.upgradeInProgress.set(false); + } + } + + private static class StoppedAfterCancelUpgradeTransition implements + MultipleArcTransition { + + private ContainerStoppedTransition stoppedTransition = + new ContainerStoppedTransition(); + + @Override + public ComponentInstanceState transition(ComponentInstance instance, + ComponentInstanceEvent event) { + if (instance.pendingCancelUpgrade) { + // cancellation of upgrade was triggered before the upgrade was + // finished. + LOG.info("{} received stopped but cancellation pending", + event.getContainerId()); + instance.upgradeInProgress.set(true); + instance.cancelUpgrade(); + instance.pendingCancelUpgrade = false; + return instance.getState(); + } + + // When upgrade is cancelled, and container re-init fails + instance.component.getCancelUpgradeStatus() + .decContainersThatNeedUpgrade(); + instance.upgradeInProgress.set(false); + stoppedTransition.transition(instance, event); + return ComponentInstanceState.INIT; + } + } + + private static class UpgradeTransition extends BaseTransition { + + @Override + public void transition(ComponentInstance instance, + ComponentInstanceEvent event) { + if (!instance.component.getCancelUpgradeStatus().isCompleted()) { + // last check to see if cancellation was triggered. The component may + // have processed the cancel upgrade event but the instance doesn't know + // it yet. If cancellation has been triggered then no point in + // upgrading. return; } - compInstance.containerSpec.setState(ContainerState.UPGRADING); - compInstance.component.decContainersReady(false); - ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent(); - compInstance.scheduler.getContainerLaunchService() - .reInitCompInstance(compInstance.scheduler.getApp(), compInstance, - compInstance.container, - compInstance.component.createLaunchContext( - upgradeEvent.getTargetSpec(), - upgradeEvent.getUpgradeVersion())); + instance.upgradeInProgress.set(true); + instance.setContainerState(ContainerState.UPGRADING); + instance.component.decContainersReady(false); + + Component.UpgradeStatus status = instance.component.getUpgradeStatus(); + instance.scheduler.getContainerLaunchService() + .reInitCompInstance(instance.scheduler.getApp(), instance, + instance.container, + instance.component.createLaunchContext( + status.getTargetSpec(), + status.getTargetVersion())); } } + private static class CancelUpgradeTransition implements + MultipleArcTransition { + + @Override + public ComponentInstanceState transition(ComponentInstance instance, + ComponentInstanceEvent event) { + if (instance.upgradeInProgress.compareAndSet(false, true)) { + + Component.UpgradeStatus cancelStatus = instance.component + .getCancelUpgradeStatus(); + + if (instance.getServiceVersion().equals( + cancelStatus.getTargetVersion())) { + // previous upgrade didn't happen so just go back to READY + LOG.info("{} nothing to cancel", event.getContainerId()); + cancelStatus.decContainersThatNeedUpgrade(); + instance.setContainerState(ContainerState.READY); + ComponentEvent checkState = new ComponentEvent( + instance.component.getName(), ComponentEventType.CHECK_STABLE); + instance.scheduler.getDispatcher().getEventHandler() + .handle(checkState); + return ComponentInstanceState.READY; + } else { + instance.component.decContainersReady(false); + instance.cancelUpgrade(); + } + } else { + LOG.info("{} pending cancellation", event.getContainerId()); + instance.pendingCancelUpgrade = true; + } + return ComponentInstanceState.CANCEL_UPGRADING; + } + } + + private void cancelUpgrade() { + LOG.info("{} cancelling upgrade", container.getId()); + setContainerState(ContainerState.UPGRADING); + Component.UpgradeStatus cancelStatus = component.getCancelUpgradeStatus(); + scheduler.getContainerLaunchService() + .reInitCompInstance(scheduler.getApp(), this, + this.container, this.component.createLaunchContext( + cancelStatus.getTargetSpec(), + cancelStatus.getTargetVersion())); + } + public ComponentInstanceState getState() { this.readLock.lock(); @@ -505,6 +650,26 @@ public class ComponentInstance implements EventHandler, } } + /** + * Sets the state of the container in the container spec. It is write + * protected. + * + * @param state container state + */ + public void setContainerState(ContainerState state) { + this.writeLock.lock(); + try { + ContainerState curState = containerSpec.getState(); + if (!curState.equals(state)) { + containerSpec.setState(state); + LOG.info("{} spec state state changed from {} -> {}", + getCompInstanceId(), curState, state); + } + } finally { + this.writeLock.unlock(); + } + } + @Override public void handle(ComponentInstanceEvent event) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java index 665b8faf554..b9181e5b3a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java @@ -23,5 +23,6 @@ public enum ComponentInstanceEventType { STOP, BECOME_READY, BECOME_NOT_READY, - UPGRADE + UPGRADE, + CANCEL_UPGRADE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java index f5de5cb3016..28cbcf570ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java @@ -22,5 +22,6 @@ public enum ComponentInstanceState { INIT, STARTED, READY, - UPGRADING + UPGRADING, + CANCEL_UPGRADING } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java index 3c856ec1907..153ab465f21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.component.ComponentEvent; @@ -113,7 +112,8 @@ public class ContainerLaunchService extends AbstractService{ .startContainerAsync(container, launcher.completeContainerLaunch()); } else { - LOG.info("reInitializing container {}", container.getId()); + LOG.info("reInitializing container {} with version {}", + container.getId(), componentLaunchContext.getServiceVersion()); instance.getComponent().getScheduler().getNmClient() .reInitializeContainerAsync(container.getId(), launcher.completeContainerLaunch(), true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java index 49ecd2e425f..6f3796768d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java @@ -30,6 +30,8 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; @@ -141,4 +143,15 @@ public class ClientAMProtocolPBClientImpl } return null; } + + @Override + public CancelUpgradeResponseProto cancelUpgrade( + CancelUpgradeRequestProto request) throws IOException, YarnException { + try { + return proxy.cancelUpgrade(null, request); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + } + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java index eab3f9fb959..071c3579ecf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.impl.pb.service; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; @@ -116,4 +118,15 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB { throw new ServiceException(e); } } + + @Override + public CancelUpgradeResponseProto cancelUpgrade( + RpcController controller, CancelUpgradeRequestProto request) + throws ServiceException { + try { + return real.cancelUpgrade(request); + } catch (IOException | YarnException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java index ac909925f0f..c12c3407eca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java @@ -167,9 +167,8 @@ public class ProviderUtils implements YarnServiceConstants { public static Path initCompInstanceDir(SliderFileSystem fs, ContainerLaunchService.ComponentLaunchContext compLaunchContext, ComponentInstance instance) { - Path compDir = new Path(new Path(fs.getAppDir(), "components"), - compLaunchContext.getServiceVersion() + "/" + - compLaunchContext.getName()); + Path compDir = fs.getComponentDir(compLaunchContext.getServiceVersion(), + compLaunchContext.getName()); Path compInstanceDir = new Path(compDir, instance.getCompInstanceName()); instance.setCompInstanceDir(compInstanceDir); return compInstanceDir; @@ -184,7 +183,9 @@ public class ProviderUtils implements YarnServiceConstants { ServiceContext context) throws IOException { Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext, instance); if (!fs.getFileSystem().exists(compInstanceDir)) { - log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir); + log.info("{} version {} : Creating dir on hdfs: {}", + instance.getCompInstanceId(), compLaunchContext.getServiceVersion(), + compInstanceDir); fs.getFileSystem().mkdirs(compInstanceDir, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index b588e88ae7f..0eb54ce10ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -603,7 +603,7 @@ public class ServiceApiUtil { public static void validateInstancesUpgrade(List liveContainers) throws YarnException { for (Container liveContainer : liveContainers) { - if (!liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) { + if (!isUpgradable(liveContainer)) { // Nothing to upgrade throw new YarnException(String.format( ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE, @@ -612,6 +612,16 @@ public class ServiceApiUtil { } } + /** + * Returns whether the container can be upgraded in the current state. + */ + public static boolean isUpgradable(Container container) { + + return container.getState() != null && + (container.getState().equals(ContainerState.NEEDS_UPGRADE) || + container.getState().equals(ContainerState.FAILED_UPGRADE)); + } + /** * Validates the components that are requested to upgrade require an upgrade. * It returns the instances of the components which need upgrade. @@ -629,7 +639,7 @@ public class ServiceApiUtil { ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName())); } liveComp.getContainers().forEach(liveContainer -> { - if (liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) { + if (isUpgradable(liveContainer)) { containerNeedUpgrade.add(liveContainer); } }); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java index d6d664ea2e6..c7764764be8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -48,4 +50,51 @@ public class SliderFileSystem extends CoreFileSystem { public Path getAppDir() { return this.appDir; } + + /** + * Returns the component directory path. + * + * @param serviceVersion service version + * @param compName component name + * @return component directory + */ + public Path getComponentDir(String serviceVersion, String compName) { + return new Path(new Path(getAppDir(), "components"), + serviceVersion + "/" + compName); + } + + /** + * Deletes the component directory. + * + * @param serviceVersion + * @param compName + * @throws IOException + */ + public void deleteComponentDir(String serviceVersion, String compName) + throws IOException { + Path path = getComponentDir(serviceVersion, compName); + if (fileSystem.exists(path)) { + fileSystem.delete(path, true); + LOG.debug("deleted dir {}", path); + } + } + + /** + * Deletes the components version directory. + * + * @param serviceVersion + * @throws IOException + */ + public void deleteComponentsVersionDirIfEmpty(String serviceVersion) + throws IOException { + Path path = new Path(new Path(getAppDir(), "components"), serviceVersion); + if (fileSystem.exists(path) && fileSystem.listStatus(path).length == 0) { + fileSystem.delete(path, true); + LOG.info("deleted dir {}", path); + } + } + + + private static final Logger LOG = LoggerFactory.getLogger( + SliderFileSystem.class); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto index 169f765b8a8..bcf893e1c48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto @@ -28,6 +28,8 @@ service ClientAMProtocolService { rpc stop(StopRequestProto) returns (StopResponseProto); rpc upgradeService(UpgradeServiceRequestProto) returns (UpgradeServiceResponseProto); + rpc cancelUpgrade(CancelUpgradeRequestProto) + returns (CancelUpgradeResponseProto); rpc restartService(RestartServiceRequestProto) returns (RestartServiceResponseProto); rpc upgrade(CompInstancesUpgradeRequestProto) returns @@ -73,6 +75,12 @@ message UpgradeServiceResponseProto { optional string error = 1; } +message CancelUpgradeRequestProto { +} + +message CancelUpgradeResponseProto { +} + message RestartServiceRequestProto { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java index 321b2cda3e9..b685f4b370e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java @@ -94,21 +94,25 @@ public class MockRunningServiceContext extends ServiceContext { return mockLaunchService; } - @Override public ServiceUtils.ProcessTerminationHandler - getTerminationHandler() { + @Override + public ServiceUtils.ProcessTerminationHandler getTerminationHandler() { return new - ServiceUtils.ProcessTerminationHandler() { - public void terminate(int exitCode) { - } - }; + ServiceUtils.ProcessTerminationHandler() { + public void terminate(int exitCode) { + } + }; + } + + @Override + protected ServiceManager createServiceManager() { + return ServiceTestUtils.createServiceManager( + MockRunningServiceContext.this); } }; this.scheduler.init(fsWatcher.getConf()); - ServiceTestUtils.createServiceManager(this); - doNothing().when(mockLaunchService). reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject()); stabilizeComponents(this); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 6b49ab07c1a..58db752767c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -384,6 +384,7 @@ public class ServiceTestUtils { conf.set(YARN_SERVICE_BASE_PATH, serviceBasePath.toString()); try { fs = new SliderFileSystem(conf); + fs.setAppDir(new Path(serviceBasePath.toString())); } catch (IOException e) { Throwables.propagate(e); } @@ -532,7 +533,6 @@ public class ServiceTestUtils { GenericTestUtils.waitFor(() -> { try { Service retrievedApp = client.getStatus(exampleApp.getName()); - System.out.println(retrievedApp); return retrievedApp.getState() == desiredState; } catch (Exception e) { e.printStackTrace(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java index a37cabe38c4..406eea486b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; @@ -65,7 +66,7 @@ public class TestServiceManager { initUpgrade(context, "v2", false, false, false); ServiceManager manager = context.getServiceManager(); //make components stable by upgrading all instances - upgradeAllInstances(context); + upgradeAndReadyAllInstances(context); context.scheduler.getDispatcher().getEventHandler().handle( new ServiceEvent(ServiceEventType.START)); @@ -83,7 +84,7 @@ public class TestServiceManager { initUpgrade(context, "v2", false, true, false); ServiceManager manager = context.getServiceManager(); //make components stable by upgrading all instances - upgradeAllInstances(context); + upgradeAndReadyAllInstances(context); GenericTestUtils.waitFor(()-> context.service.getState().equals(ServiceState.STABLE), @@ -115,7 +116,7 @@ public class TestServiceManager { manager.getServiceSpec().getState()); //make components stable by upgrading all instances - upgradeAllInstances(context); + upgradeAndReadyAllInstances(context); // finalize service context.scheduler.getDispatcher().getEventHandler().handle( @@ -138,7 +139,7 @@ public class TestServiceManager { initUpgrade(context, "v2", true, true, false); // make components stable - upgradeAllInstances(context); + upgradeAndReadyAllInstances(context); GenericTestUtils.waitFor(() -> context.service.getState().equals(ServiceState.STABLE), @@ -174,18 +175,17 @@ public class TestServiceManager { public void testExpressUpgrade() throws Exception { ServiceContext context = createServiceContext("testExpressUpgrade"); ServiceManager manager = context.getServiceManager(); - manager.getServiceSpec().setState( - ServiceState.EXPRESS_UPGRADING); + manager.getServiceSpec().setState(ServiceState.EXPRESS_UPGRADING); initUpgrade(context, "v2", true, true, true); List comps = ServiceApiUtil.resolveCompsDependency(context.service); - // wait till instances of first component are in upgrade - String comp1 = comps.get(0); - upgradeInstancesOf(context, comp1); + // wait till instances of first component are upgraded and ready + String compA = comps.get(0); + makeInstancesReadyAfterUpgrade(context, compA); - // wait till instances of second component are in upgrade - String comp2 = comps.get(1); - upgradeInstancesOf(context, comp2); + // wait till instances of second component are upgraded and ready + String compB = comps.get(1); + makeInstancesReadyAfterUpgrade(context, compB); GenericTestUtils.waitFor(() -> context.service.getState().equals(ServiceState.STABLE), @@ -196,6 +196,57 @@ public class TestServiceManager { validateUpgradeFinalization(manager.getName(), "v2"); } + @Test(timeout = TIMEOUT) + public void testCancelUpgrade() throws Exception { + ServiceContext context = createServiceContext("testCancelUpgrade"); + writeInitialDef(context.service); + initUpgrade(context, "v2", true, false, false); + ServiceManager manager = context.getServiceManager(); + Assert.assertEquals("service not upgrading", ServiceState.UPGRADING, + manager.getServiceSpec().getState()); + + List comps = ServiceApiUtil.resolveCompsDependency(context.service); + // wait till instances of first component are upgraded and ready + String compA = comps.get(0); + // upgrade the instances + upgradeInstances(context, compA); + makeInstancesReadyAfterUpgrade(context, compA); + + // cancel upgrade + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.CANCEL_UPGRADE)); + makeInstancesReadyAfterUpgrade(context, compA); + + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); + Assert.assertEquals("service upgrade not cancelled", ServiceState.STABLE, + manager.getServiceSpec().getState()); + + validateUpgradeFinalization(manager.getName(), "v1"); + } + + @Test(timeout = TIMEOUT) + public void testCancelUpgradeAfterInitiate() throws Exception { + ServiceContext context = createServiceContext("testCancelUpgrade"); + writeInitialDef(context.service); + initUpgrade(context, "v2", true, false, false); + ServiceManager manager = context.getServiceManager(); + Assert.assertEquals("service not upgrading", ServiceState.UPGRADING, + manager.getServiceSpec().getState()); + + // cancel upgrade + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.CANCEL_UPGRADE)); + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); + Assert.assertEquals("service upgrade not cancelled", ServiceState.STABLE, + manager.getServiceSpec().getState()); + + validateUpgradeFinalization(manager.getName(), "v1"); + } + private void validateUpgradeFinalization(String serviceName, String expectedVersion) throws IOException { Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName); @@ -225,21 +276,23 @@ public class TestServiceManager { } writeUpgradedDef(upgradedDef); serviceManager.processUpgradeRequest(version, autoFinalize, expressUpgrade); - ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE); - upgradeEvent.setVersion(version).setExpressUpgrade(expressUpgrade) - .setAutoFinalize(autoFinalize); - - GenericTestUtils.waitFor(()-> { - ServiceState serviceState = context.service.getState(); - if (serviceState.equals(ServiceState.UPGRADING) || - serviceState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) || - serviceState.equals(ServiceState.EXPRESS_UPGRADING)) { - return true; + GenericTestUtils.waitFor(() -> { + for (Component comp : context.scheduler.getAllComponents().values()) { + if (!comp.getComponentSpec().getState().equals( + ComponentState.NEEDS_UPGRADE)) { + return false; + } } - return false; + return true; }, CHECK_EVERY_MILLIS, TIMEOUT); } + private void upgradeAndReadyAllInstances(ServiceContext context) throws + TimeoutException, InterruptedException { + upgradeAllInstances(context); + makeAllInstancesReady(context); + } + private void upgradeAllInstances(ServiceContext context) throws TimeoutException, InterruptedException { // upgrade the instances @@ -248,8 +301,10 @@ public class TestServiceManager { ComponentInstanceEventType.UPGRADE); context.scheduler.getDispatcher().getEventHandler().handle(event); })); + } - // become ready + private void makeAllInstancesReady(ServiceContext context) + throws TimeoutException, InterruptedException { context.scheduler.getLiveInstances().forEach(((containerId, instance) -> { ComponentInstanceEvent event = new ComponentInstanceEvent(containerId, ComponentInstanceEventType.BECOME_READY); @@ -267,7 +322,19 @@ public class TestServiceManager { }, CHECK_EVERY_MILLIS, TIMEOUT); } - private void upgradeInstancesOf(ServiceContext context, String compName) + private void upgradeInstances(ServiceContext context, String compName) { + Collection compInstances = context.scheduler + .getAllComponents().get(compName).getAllComponentInstances(); + compInstances.forEach(instance -> { + ComponentInstanceEvent event = new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.UPGRADE); + context.scheduler.getDispatcher().getEventHandler().handle(event); + }); + } + + private void makeInstancesReadyAfterUpgrade(ServiceContext context, + String compName) throws TimeoutException, InterruptedException { Collection compInstances = context.scheduler .getAllComponents().get(compName).getAllComponentInstances(); @@ -289,6 +356,15 @@ public class TestServiceManager { context.scheduler.getDispatcher().getEventHandler().handle(event); }); + + GenericTestUtils.waitFor(() -> { + for (ComponentInstance instance : compInstances) { + if (!instance.getContainerState().equals(ContainerState.READY)) { + return false; + } + } + return true; + }, CHECK_EVERY_MILLIS, TIMEOUT); } private ServiceContext createServiceContext(String name) @@ -324,6 +400,14 @@ public class TestServiceManager { return artifact; } + private void writeInitialDef(Service service) + throws IOException, SliderException { + Path servicePath = rule.getFs().buildClusterDirPath( + service.getName()); + ServiceApiUtil.createDirAndPersistApp(rule.getFs(), servicePath, + service); + } + private void writeUpgradedDef(Service upgradedDef) throws IOException, SliderException { Path upgradePath = rule.getFs().buildClusterUpgradeDirPath( @@ -332,6 +416,6 @@ public class TestServiceManager { upgradedDef); } - private static final int TIMEOUT = 200000; + private static final int TIMEOUT = 10000; private static final int CHECK_EVERY_MILLIS = 100; } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 216d88fc4c3..3e23a109afd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service; +import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; @@ -450,6 +451,49 @@ public class TestYarnNativeServices extends ServiceTestUtils { client.actionDestroy(service.getName()); } + @Test(timeout = 200000) + public void testCancelUpgrade() throws Exception { + setupInternal(NUM_NMS); + getConf().setBoolean(YARN_SERVICE_UPGRADE_ENABLED, true); + ServiceClient client = createClient(getConf()); + + Service service = createExampleApplication(); + Component component = service.getComponents().iterator().next(); + component.getConfiguration().getEnv().put("key1", "val0"); + + client.actionCreate(service); + waitForServiceToBeStable(client, service); + + // upgrade the service + service.setState(ServiceState.UPGRADING); + service.setVersion("v2"); + component.getConfiguration().getEnv().put("key1", "val1"); + client.initiateUpgrade(service); + + // wait for service to be in upgrade state + waitForServiceToBeInState(client, service, ServiceState.UPGRADING); + + // upgrade 1 container + Service liveService = client.getStatus(service.getName()); + Container container = liveService.getComponent(component.getName()) + .getContainers().iterator().next(); + client.actionUpgrade(service, Lists.newArrayList(container)); + + Thread.sleep(500); + // cancel the upgrade + client.actionCancelUpgrade(service.getName()); + waitForServiceToBeStable(client, service); + Service active = client.getStatus(service.getName()); + Assert.assertEquals("component not stable", ComponentState.STABLE, + active.getComponent(component.getName()).getState()); + Assert.assertEquals("comp does not have new env", "val0", + active.getComponent(component.getName()).getConfiguration() + .getEnv("key1")); + LOG.info("Stop/destroy service {}", service); + client.actionStop(service.getName(), true); + client.actionDestroy(service.getName()); + } + // Test to verify ANTI_AFFINITY placement policy // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler // 2. Create an example service with 3 containers diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java index 0e047c20b25..41be8c7e576 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java @@ -221,6 +221,17 @@ public class TestServiceCLI { Assert.assertEquals(result, 0); } + @Test + public void testCancelUpgrade() throws Exception { + conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE, + DummyServiceClient.class.getName()); + cli.setConf(conf); + String[] args = {"app", "-upgrade", "app-1", + "-cancel", "-appTypes", DUMMY_APP_TYPE}; + int result = cli.run(ApplicationCLI.preProcessArgs(args)); + Assert.assertEquals(result, 0); + } + @Test (timeout = 180000) public void testEnableFastLaunch() throws Exception { fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar")) @@ -332,5 +343,11 @@ public class TestServiceCLI { throws IOException, YarnException { return ""; } + + @Override + public int actionCancelUpgrade(String appName) throws IOException, + YarnException { + return 0; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index e1a4d9d7553..f11d871021c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -19,8 +19,8 @@ package org.apache.hadoop.yarn.service.component; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.MockRunningServiceContext; import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.ServiceTestUtils; import org.apache.hadoop.yarn.service.TestServiceManager; @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; -import org.apache.hadoop.yarn.service.MockRunningServiceContext; import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Rule; @@ -38,9 +37,8 @@ import org.junit.Test; import java.util.Iterator; +import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_READY; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Tests for {@link Component}. @@ -78,14 +76,14 @@ public class TestComponent { "val1")).setUpgradeVersion("v2")); // one instance finished upgrading - comp.decContainersThatNeedUpgrade(); + comp.getUpgradeStatus().decContainersThatNeedUpgrade(); comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.CHECK_STABLE)); Assert.assertEquals("component not in need upgrade state", ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); // second instance finished upgrading - comp.decContainersThatNeedUpgrade(); + comp.getUpgradeStatus().decContainersThatNeedUpgrade(); comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.CHECK_STABLE)); @@ -97,7 +95,7 @@ public class TestComponent { @Test public void testContainerCompletedWhenUpgrading() throws Exception { - String serviceName = "testContainerComplete"; + String serviceName = "testContainerCompletedWhenUpgrading"; MockRunningServiceContext context = createTestContext(rule, serviceName); Component comp = context.scheduler.getAllComponents().entrySet().iterator() .next().getValue(); @@ -105,48 +103,233 @@ public class TestComponent { comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE) .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1", "val1")).setUpgradeVersion("v2")); - comp.getAllComponentInstances().forEach(instance -> { - instance.handle(new ComponentInstanceEvent( - instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE)); - }); - Iterator instanceIter = comp. - getAllComponentInstances().iterator(); + comp.getAllComponentInstances().forEach(instance -> + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE))); // reinitialization of a container failed - ContainerStatus status = mock(ContainerStatus.class); - when(status.getExitStatus()).thenReturn(ContainerExitStatus.ABORTED); - ComponentInstance instance = instanceIter.next(); + for(ComponentInstance instance : comp.getAllComponentInstances()) { + ComponentEvent stopEvent = new ComponentEvent(comp.getName(), + ComponentEventType.CONTAINER_COMPLETED) + .setInstance(instance) + .setContainerId(instance.getContainer().getId()); + comp.handle(stopEvent); + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), STOP)); + } + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in needs upgrade state", + ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); + } + + @Test + public void testCancelUpgrade() throws Exception { + ServiceContext context = createTestContext(rule, "testCancelUpgrade"); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + + ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(), + ComponentEventType.CANCEL_UPGRADE); + comp.handle(upgradeEvent); + Assert.assertEquals("component not in need upgrade state", + ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); + + Assert.assertEquals( + org.apache.hadoop.yarn.service.component.ComponentState + .CANCEL_UPGRADING, comp.getState()); + } + + @Test + public void testContainerCompletedCancelUpgrade() throws Exception { + String serviceName = "testContainerCompletedCancelUpgrade"; + MockRunningServiceContext context = createTestContext(rule, serviceName); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + + // upgrade completes + comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE) + .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1", + "val1")).setUpgradeVersion("v2")); + comp.getAllComponentInstances().forEach(instance -> + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.UPGRADE))); + + // reinitialization of a container done + for(ComponentInstance instance : comp.getAllComponentInstances()) { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), BECOME_READY)); + } + + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CANCEL_UPGRADE) + .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1", + "val0")).setUpgradeVersion("v1")); + comp.getAllComponentInstances().forEach(instance -> + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.CANCEL_UPGRADE))); + + Iterator iter = comp.getAllComponentInstances() + .iterator(); + + // cancel upgrade failed of a container + ComponentInstance instance1 = iter.next(); ComponentEvent stopEvent = new ComponentEvent(comp.getName(), ComponentEventType.CONTAINER_COMPLETED) - .setInstance(instance).setContainerId(instance.getContainer().getId()) - .setStatus(status); + .setInstance(instance1) + .setContainerId(instance1.getContainer().getId()); comp.handle(stopEvent); - instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), - STOP).setStatus(status)); + instance1.handle(new ComponentInstanceEvent( + instance1.getContainer().getId(), STOP)); + Assert.assertEquals( + org.apache.hadoop.yarn.service.component.ComponentState + .CANCEL_UPGRADING, comp.getState()); + + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in needs upgrade state", + ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); + Assert.assertEquals( + org.apache.hadoop.yarn.service.component.ComponentState + .CANCEL_UPGRADING, comp.getState()); + + // second instance finished upgrading + ComponentInstance instance2 = iter.next(); + instance2.handle(new ComponentInstanceEvent( + instance2.getContainer().getId(), + ComponentInstanceEventType.BECOME_READY)); comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.CHECK_STABLE)); Assert.assertEquals("component not in flexing state", ComponentState.FLEXING, comp.getComponentSpec().getState()); - // new container get allocated context.assignNewContainer(context.attemptId, 10, comp); - // second instance finished upgrading - ComponentInstance instance2 = instanceIter.next(); - instance2.handle(new ComponentInstanceEvent( - instance2.getContainer().getId(), - ComponentInstanceEventType.BECOME_READY)); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in stable state", + ComponentState.STABLE, comp.getComponentSpec().getState()); + Assert.assertEquals("cancel upgrade failed", "val0", + comp.getComponentSpec().getConfiguration().getEnv("key1")); + } + + @Test + public void testCancelUpgradeSuccessWhileUpgrading() throws Exception { + String serviceName = "testCancelUpgradeWhileUpgrading"; + MockRunningServiceContext context = createTestContext(rule, serviceName); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + cancelUpgradeWhileUpgrading(context, comp); + + // cancel upgrade successful for both instances + for(ComponentInstance instance : comp.getAllComponentInstances()) { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.BECOME_READY)); + } + comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.CHECK_STABLE)); Assert.assertEquals("component not in stable state", ComponentState.STABLE, comp.getComponentSpec().getState()); - Assert.assertEquals("component did not upgrade successfully", "val1", + Assert.assertEquals("cancel upgrade failed", "val0", comp.getComponentSpec().getConfiguration().getEnv("key1")); } + @Test + public void testCancelUpgradeFailureWhileUpgrading() throws Exception { + String serviceName = "testCancelUpgradeFailureWhileUpgrading"; + MockRunningServiceContext context = createTestContext(rule, serviceName); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + cancelUpgradeWhileUpgrading(context, comp); + + // cancel upgrade failed for both instances + for(ComponentInstance instance : comp.getAllComponentInstances()) { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.STOP)); + } + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in flexing state", + ComponentState.FLEXING, comp.getComponentSpec().getState()); + + for (ComponentInstance instance : comp.getAllComponentInstances()) { + // new container get allocated + context.assignNewContainer(context.attemptId, 10, comp); + } + + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in stable state", + ComponentState.STABLE, comp.getComponentSpec().getState()); + Assert.assertEquals("cancel upgrade failed", "val0", + comp.getComponentSpec().getConfiguration().getEnv("key1")); + } + + private void cancelUpgradeWhileUpgrading( + MockRunningServiceContext context, Component comp) + throws Exception { + + comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE) + .setTargetSpec(createSpecWithEnv(context.service.getName(), + comp.getName(), "key1", "val1")).setUpgradeVersion("v0")); + + Iterator iter = comp.getAllComponentInstances() + .iterator(); + + ComponentInstance instance1 = iter.next(); + + // instance1 is triggered to upgrade + instance1.handle(new ComponentInstanceEvent( + instance1.getContainer().getId(), ComponentInstanceEventType.UPGRADE)); + + // component upgrade is cancelled + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CANCEL_UPGRADE) + .setTargetSpec(createSpecWithEnv(context.service.getName(), + comp.getName(), "key1", + "val0")).setUpgradeVersion("v0")); + + // all instances upgrade is cancelled. + comp.getAllComponentInstances().forEach(instance -> + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.CANCEL_UPGRADE))); + + // regular upgrade failed for instance 1 + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CONTAINER_COMPLETED).setInstance(instance1) + .setContainerId(instance1.getContainer().getId())); + instance1.handle(new ComponentInstanceEvent( + instance1.getContainer().getId(), STOP)); + + // component should be in cancel upgrade + Assert.assertEquals( + org.apache.hadoop.yarn.service.component.ComponentState + .CANCEL_UPGRADING, comp.getState()); + + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in needs upgrade state", + ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); + Assert.assertEquals( + org.apache.hadoop.yarn.service.component.ComponentState + .CANCEL_UPGRADING, comp.getState()); + } + @Test public void testComponentStateReachesStableStateWithTerminatingComponents() throws @@ -249,8 +432,6 @@ public class TestComponent { serviceState); } - - private static org.apache.hadoop.yarn.service.api.records.Component createSpecWithEnv(String serviceName, String compName, String key, String val) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index e0399816db8..c5a96317caa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.service.component.instance; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -86,7 +87,7 @@ public class TestComponentInstance { @Test public void testContainerReadyAfterUpgrade() throws Exception { ServiceContext context = TestComponent.createTestContext(rule, - "testContainerStarted"); + "testContainerReadyAfterUpgrade"); Component component = context.scheduler.getAllComponents().entrySet() .iterator().next().getValue(); upgradeComponent(component); @@ -105,12 +106,186 @@ public class TestComponentInstance { .getId().toString()).getState()); } + + @Test + public void testContainerUpgradeFailed() throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testContainerUpgradeFailed"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + upgradeComponent(component); + + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); + + ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); + instance.handle(upgradeEvent); + + ContainerStatus containerStatus = mock(ContainerStatus.class); + when(containerStatus.getExitStatus()).thenReturn( + ContainerExitStatus.ABORTED); + ComponentInstanceEvent stopEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.STOP) + .setStatus(containerStatus); + // this is the call back from NM for the upgrade + instance.handle(stopEvent); + Assert.assertEquals("instance did not fail", ContainerState.FAILED_UPGRADE, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + } + + @Test + public void testCancelNothingToUpgrade() throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testCancelUpgradeWhenContainerReady"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + cancelCompUpgrade(component); + + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); + + ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.CANCEL_UPGRADE); + instance.handle(cancelEvent); + + Assert.assertEquals("instance not ready", ContainerState.READY, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + } + + @Test + public void testCancelUpgradeFailed() throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testCancelUpgradeFailed"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + cancelCompUpgrade(component); + + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); + + ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.CANCEL_UPGRADE); + instance.handle(cancelEvent); + + instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), + ComponentInstanceEventType.STOP)); + Assert.assertEquals("instance not init", ComponentInstanceState.INIT, + instance.getState()); + } + + @Test + public void testCancelAfterCompProcessedCancel() throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testCancelAfterCompProcessedCancel"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + upgradeComponent(component); + cancelCompUpgrade(component); + + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); + ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); + instance.handle(upgradeEvent); + + Assert.assertEquals("instance should start upgrading", + ContainerState.NEEDS_UPGRADE, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + } + + @Test + public void testCancelWhileUpgradeWithSuccess() throws Exception { + validateCancelWhileUpgrading(true, true); + } + + @Test + public void testCancelWhileUpgradeWithFailure() throws Exception { + validateCancelWhileUpgrading(false, true); + } + + @Test + public void testCancelFailedWhileUpgradeWithSuccess() throws Exception { + validateCancelWhileUpgrading(true, false); + } + + @Test + public void testCancelFailedWhileUpgradeWithFailure() throws Exception { + validateCancelWhileUpgrading(false, false); + } + + private void validateCancelWhileUpgrading(boolean upgradeSuccessful, + boolean cancelUpgradeSuccessful) + throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testCancelWhileUpgrading"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + upgradeComponent(component); + + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); + ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); + instance.handle(upgradeEvent); + + Assert.assertEquals("instance should be upgrading", + ContainerState.UPGRADING, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + + cancelCompUpgrade(component); + ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.CANCEL_UPGRADE); + instance.handle(cancelEvent); + + // either upgrade failed or successful + ComponentInstanceEvent readyOrStopEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), + upgradeSuccessful ? ComponentInstanceEventType.BECOME_READY : + ComponentInstanceEventType.STOP); + + instance.handle(readyOrStopEvent); + Assert.assertEquals("instance not upgrading", ContainerState.UPGRADING, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + + // response for cancel received + ComponentInstanceEvent readyOrStopCancel = new ComponentInstanceEvent( + instance.getContainer().getId(), + cancelUpgradeSuccessful ? ComponentInstanceEventType.BECOME_READY : + ComponentInstanceEventType.STOP); + + instance.handle(readyOrStopCancel); + if (cancelUpgradeSuccessful) { + Assert.assertEquals("instance not ready", ContainerState.READY, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + } else { + Assert.assertEquals("instance not init", ComponentInstanceState.INIT, + instance.getState()); + } + } + private void upgradeComponent(Component component) { component.handle(new ComponentEvent(component.getName(), ComponentEventType.UPGRADE).setTargetSpec(component.getComponentSpec()) .setUpgradeVersion("v2")); } + private void cancelCompUpgrade(Component component) { + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.CANCEL_UPGRADE) + .setTargetSpec(component.getComponentSpec()) + .setUpgradeVersion("v1")); + } + private Component createComponent(ServiceScheduler scheduler, org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum restartPolicy, int nSucceededInstances, int nFailedInstances, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties new file mode 100644 index 00000000000..81a3f6ad5d2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index a0e4e02b0a4..b0e12bc3398 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -102,6 +102,7 @@ public class ApplicationCLI extends YarnCLI { public static final String ENABLE_FAST_LAUNCH = "enableFastLaunch"; public static final String UPGRADE_CMD = "upgrade"; public static final String UPGRADE_EXPRESS = "express"; + public static final String UPGRADE_CANCEL = "cancel"; public static final String UPGRADE_INITIATE = "initiate"; public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize"; public static final String UPGRADE_FINALIZE = "finalize"; @@ -265,6 +266,8 @@ public class ApplicationCLI extends YarnCLI { opts.addOption(UPGRADE_AUTO_FINALIZE, false, "Works with -upgrade and " + "-initiate options to initiate the upgrade of the application with " + "the ability to finalize the upgrade automatically."); + opts.addOption(UPGRADE_CANCEL, false, "Works with -upgrade option to " + + "cancel current upgrade."); opts.getOption(LAUNCH_CMD).setArgName("Application Name> Moves application to a new"); pw.println(" queue. ApplicationId can be"); pw.println(" passed using 'appId' option."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java index 232666db7d6..df11ffd0e4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -300,4 +300,17 @@ public abstract class AppAdminClient extends CompositeService { @Unstable public abstract int actionUpgradeExpress(String appName, File fileName) throws IOException, YarnException; + + /** + * Cancels the upgrade of the service. + * + * @param appName the name of the application + * @return exit code + * @throws IOException + * @throws YarnException + */ + @Public + @Unstable + public abstract int actionCancelUpgrade(String appName) throws IOException, + YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 27a7c80515d..01d70afc7ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -1836,6 +1836,7 @@ public class ContainerManagerImpl extends CompositeService implements public void reInitializeContainer(ContainerId containerId, ContainerLaunchContext reInitLaunchContext, boolean autoCommit) throws YarnException { + LOG.debug("{} requested reinit", containerId); Container container = preReInitializeOrLocalizeCheck(containerId, ReInitOp.RE_INIT); ResourceSet resourceSet = new ResourceSet();