YARN-8665. Added Yarn service cancel upgrade option.

Contributed by Chandni Singh
This commit is contained in:
Eric Yang 2018-09-26 14:51:35 -04:00
parent e0ff8e2c10
commit 913f87dada
36 changed files with 1479 additions and 344 deletions

View File

@ -723,4 +723,24 @@ public class ApiServiceClient extends AppAdminClient {
}
return null;
}
@Override
public int actionCancelUpgrade(
String appName) throws IOException, YarnException {
int result;
try {
Service service = new Service();
service.setName(appName);
service.setState(ServiceState.CANCEL_UPGRADING);
String buffer = jsonSerDeser.toJson(service);
LOG.info("Cancel upgrade in progress. Please wait..");
ClientResponse response = getApiClient(getServicePath(appName))
.put(ClientResponse.class, buffer);
result = processResponse(response);
} catch (Exception e) {
LOG.error("Failed to cancel upgrade: ", e);
result = EXIT_EXCEPTION_THROWN;
}
return result;
}
}

View File

@ -58,6 +58,7 @@ import java.util.*;
import java.util.stream.Collectors;
import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
import static org.apache.hadoop.yarn.service.api.records.ServiceState.CANCEL_UPGRADING;
import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*;
@ -445,6 +446,12 @@ public class ApiServer {
return upgradeService(updateServiceData, ugi);
}
// If CANCEL_UPGRADING is requested
if (updateServiceData.getState() != null &&
updateServiceData.getState() == CANCEL_UPGRADING) {
return cancelUpgradeService(appName, ugi);
}
// If new lifetime value specified then update it
if (updateServiceData.getLifetime() != null
&& updateServiceData.getLifetime() > 0) {
@ -460,8 +467,7 @@ public class ApiServer {
LOG.error(message, e);
return formatResponse(Status.NOT_FOUND, e.getMessage());
} catch (YarnException e) {
String message = "Service is not found in hdfs: " + appName;
LOG.error(message, e);
LOG.error(e.getMessage(), e);
return formatResponse(Status.NOT_FOUND, e.getMessage());
} catch (Exception e) {
String message = "Error while performing operation for app: " + appName;
@ -707,6 +713,27 @@ public class ApiServer {
return formatResponse(Status.ACCEPTED, status);
}
private Response cancelUpgradeService(String serviceName,
final UserGroupInformation ugi) throws IOException, InterruptedException {
int result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
ServiceClient sc = getServiceClient();
sc.init(YARN_CONFIG);
sc.start();
int exitCode = sc.actionCancelUpgrade(serviceName);
sc.close();
return exitCode;
});
if (result == EXIT_SUCCESS) {
ServiceStatus status = new ServiceStatus();
LOG.info("Service {} cancelling upgrade", serviceName);
status.setDiagnostics("Service " + serviceName +
" cancelling upgrade.");
status.setState(ServiceState.ACCEPTED);
return formatResponse(Status.ACCEPTED, status);
}
return Response.status(Status.BAD_REQUEST).build();
}
private Response processComponentsUpgrade(UserGroupInformation ugi,
String serviceName, Set<String> compNames) throws YarnException,
IOException, InterruptedException {
@ -734,7 +761,8 @@ public class ApiServer {
Service service, List<Container> containers) throws YarnException,
IOException, InterruptedException {
if (service.getState() != ServiceState.UPGRADING) {
if (!service.getState().equals(ServiceState.UPGRADING) &&
!service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
throw new YarnException(
String.format("The upgrade of service %s has not been initiated.",
service.getName()));

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@ -60,4 +62,7 @@ public interface ClientAMProtocol {
GetCompInstancesResponseProto getCompInstances(
GetCompInstancesRequestProto request) throws IOException, YarnException;
CancelUpgradeResponseProto cancelUpgrade(
CancelUpgradeRequestProto request) throws IOException, YarnException;
}

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
@ -208,4 +210,14 @@ public class ClientAMService extends AbstractService
ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(containers.toArray(
new Container[containers.size()]))).build();
}
@Override
public CancelUpgradeResponseProto cancelUpgrade(
CancelUpgradeRequestProto request) throws IOException, YarnException {
LOG.info("Cancel service upgrade by {}",
UserGroupInformation.getCurrentUser());
ServiceEvent event = new ServiceEvent(ServiceEventType.CANCEL_UPGRADE);
context.scheduler.getDispatcher().getEventHandler().handle(event);
return CancelUpgradeResponseProto.newBuilder().build();
}
}

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.service;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.service.api.records.Component;
import java.util.Queue;
import java.util.List;
/**
* Events are handled by {@link ServiceManager} to manage the service
@ -33,7 +33,8 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
private String version;
private boolean autoFinalize;
private boolean expressUpgrade;
private Queue<Component> compsToUpgradeInOrder;
// For express upgrade they should be in order.
private List<Component> compsToUpgrade;
public ServiceEvent(ServiceEventType serviceEventType) {
super(serviceEventType);
@ -71,13 +72,12 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
return this;
}
public Queue<Component> getCompsToUpgradeInOrder() {
return compsToUpgradeInOrder;
public List<Component> getCompsToUpgrade() {
return compsToUpgrade;
}
public ServiceEvent setCompsToUpgradeInOrder(
Queue<Component> compsToUpgradeInOrder) {
this.compsToUpgradeInOrder = compsToUpgradeInOrder;
public ServiceEvent setCompsToUpgrade(List<Component> compsToUpgrade) {
this.compsToUpgrade = compsToUpgrade;
return this;
}

View File

@ -24,5 +24,6 @@ package org.apache.hadoop.yarn.service;
public enum ServiceEventType {
START,
UPGRADE,
CHECK_STABLE
CHECK_STABLE,
CANCEL_UPGRADE
}

View File

@ -20,32 +20,31 @@ package org.apache.hadoop.yarn.service;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.state.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
@ -71,8 +70,10 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
private final SliderFileSystem fs;
private String upgradeVersion;
private Queue<org.apache.hadoop.yarn.service.api.records
.Component> compsToUpgradeInOrder;
private List<org.apache.hadoop.yarn.service.api.records
.Component> componentsToUpgrade;
private List<String> compsAffectedByUpgrade = new ArrayList<>();
private String cancelledVersion;
private static final StateMachineFactory<ServiceManager, State,
ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY =
@ -88,11 +89,14 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
.addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
State.UPGRADING), ServiceEventType.START,
new CheckStableTransition())
new StartFromUpgradeTransition())
.addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
State.UPGRADING), ServiceEventType.CHECK_STABLE,
new CheckStableTransition())
.addTransition(State.UPGRADING, State.UPGRADING,
ServiceEventType.CANCEL_UPGRADE, new CancelUpgradeTransition())
.installTopology();
public ServiceManager(ServiceContext context) {
@ -148,19 +152,25 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
public State transition(ServiceManager serviceManager,
ServiceEvent event) {
serviceManager.upgradeVersion = event.getVersion();
serviceManager.componentsToUpgrade = event.getCompsToUpgrade();
event.getCompsToUpgrade().forEach(comp ->
serviceManager.compsAffectedByUpgrade.add(comp.getName()));
try {
if (event.isExpressUpgrade()) {
serviceManager.serviceSpec.setState(ServiceState.EXPRESS_UPGRADING);
serviceManager.compsToUpgradeInOrder = event
.getCompsToUpgradeInOrder();
serviceManager.upgradeNextCompIfAny();
} else if (event.isAutoFinalize()) {
serviceManager.serviceSpec.setState(ServiceState
.UPGRADING_AUTO_FINALIZE);
serviceManager.dispatchNeedUpgradeEvents(false);
serviceManager.upgradeNextCompIfAny(false);
} else {
serviceManager.serviceSpec.setState(
ServiceState.UPGRADING);
serviceManager.dispatchNeedUpgradeEvents(false);
}
if (event.isExpressUpgrade()) {
serviceManager.setServiceState(ServiceState.EXPRESS_UPGRADING);
} else if (event.isAutoFinalize()) {
serviceManager.setServiceState(ServiceState.UPGRADING_AUTO_FINALIZE);
} else {
serviceManager.setServiceState(ServiceState.UPGRADING);
}
return State.UPGRADING;
} catch (Throwable e) {
LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(),
@ -181,24 +191,32 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
if (currState.equals(ServiceState.STABLE)) {
return State.STABLE;
}
if (currState.equals(ServiceState.EXPRESS_UPGRADING)) {
org.apache.hadoop.yarn.service.api.records.Component component =
serviceManager.compsToUpgradeInOrder.peek();
if (!component.getState().equals(ComponentState.NEEDS_UPGRADE) &&
!component.getState().equals(ComponentState.UPGRADING)) {
serviceManager.compsToUpgradeInOrder.remove();
if (currState.equals(ServiceState.EXPRESS_UPGRADING) ||
currState.equals(ServiceState.CANCEL_UPGRADING)) {
if (!serviceManager.componentsToUpgrade.isEmpty()) {
org.apache.hadoop.yarn.service.api.records.Component compSpec =
serviceManager.componentsToUpgrade.get(0);
Component component = serviceManager.scheduler.getAllComponents()
.get(compSpec.getName());
if (!component.isUpgrading()) {
serviceManager.componentsToUpgrade.remove(0);
serviceManager.upgradeNextCompIfAny(
currState.equals(ServiceState.CANCEL_UPGRADING));
}
}
serviceManager.upgradeNextCompIfAny();
}
if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
event.getType().equals(ServiceEventType.START) ||
(currState.equals(ServiceState.EXPRESS_UPGRADING) &&
serviceManager.compsToUpgradeInOrder.isEmpty())) {
((currState.equals(ServiceState.EXPRESS_UPGRADING) ||
currState.equals(ServiceState.CANCEL_UPGRADING)) &&
serviceManager.componentsToUpgrade.isEmpty())) {
ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
if (targetState.equals(ServiceState.STABLE)) {
if (serviceManager.finalizeUpgrade()) {
LOG.info("Service def state changed from {} -> {}", currState,
serviceManager.serviceSpec.getState());
if (serviceManager.finalizeUpgrade(
currState.equals(ServiceState.CANCEL_UPGRADING))) {
return State.STABLE;
}
}
@ -207,52 +225,149 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
}
}
private void upgradeNextCompIfAny() {
if (!compsToUpgradeInOrder.isEmpty()) {
org.apache.hadoop.yarn.service.api.records.Component component =
compsToUpgradeInOrder.peek();
private static class StartFromUpgradeTransition implements
MultipleArcTransition<ServiceManager, ServiceEvent, State> {
ComponentEvent needUpgradeEvent = new ComponentEvent(
component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
component).setUpgradeVersion(upgradeVersion).setExpressUpgrade(true);
context.scheduler.getDispatcher().getEventHandler().handle(
needUpgradeEvent);
@Override
public State transition(ServiceManager serviceManager, ServiceEvent event) {
ServiceState currState = serviceManager.serviceSpec.getState();
ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
if (targetState.equals(ServiceState.STABLE)) {
if (serviceManager.finalizeUpgrade(
currState.equals(ServiceState.CANCEL_UPGRADING))) {
return State.STABLE;
}
}
return State.UPGRADING;
}
}
private static class CancelUpgradeTransition implements
SingleArcTransition<ServiceManager, ServiceEvent> {
@Override
public void transition(ServiceManager serviceManager,
ServiceEvent event) {
if (!serviceManager.getState().equals(State.UPGRADING)) {
LOG.info("[SERVICE]: Cannot cancel the upgrade in {} state",
serviceManager.getState());
return;
}
try {
Service targetSpec = ServiceApiUtil.loadService(
serviceManager.context.fs, serviceManager.getName());
Service sourceSpec = ServiceApiUtil.loadServiceUpgrade(
serviceManager.context.fs, serviceManager.getName(),
serviceManager.upgradeVersion);
serviceManager.cancelledVersion = serviceManager.upgradeVersion;
LOG.info("[SERVICE] cancel version {}",
serviceManager.cancelledVersion);
serviceManager.upgradeVersion = serviceManager.serviceSpec.getVersion();
serviceManager.componentsToUpgrade = serviceManager
.resolveCompsToUpgrade(sourceSpec, targetSpec);
serviceManager.compsAffectedByUpgrade.clear();
serviceManager.componentsToUpgrade.forEach(comp ->
serviceManager.compsAffectedByUpgrade.add(comp.getName()));
serviceManager.dispatchNeedUpgradeEvents(true);
serviceManager.upgradeNextCompIfAny(true);
serviceManager.setServiceState(ServiceState.CANCEL_UPGRADING);
} catch (Throwable e) {
LOG.error("[SERVICE]: Cancellation of upgrade failed", e);
}
}
}
private void upgradeNextCompIfAny(boolean cancelUpgrade) {
if (!componentsToUpgrade.isEmpty()) {
org.apache.hadoop.yarn.service.api.records.Component component =
componentsToUpgrade.get(0);
serviceSpec.getComponent(component.getName()).getContainers().forEach(
container -> {
ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
ContainerId.fromString(container.getId()),
!cancelUpgrade ? ComponentInstanceEventType.UPGRADE :
ComponentInstanceEventType.CANCEL_UPGRADE);
LOG.info("Upgrade container {} {}", container.getId(),
cancelUpgrade);
dispatcher.getEventHandler().handle(upgradeEvent);
});
}
}
private void dispatchNeedUpgradeEvents(boolean cancelUpgrade) {
if (componentsToUpgrade != null) {
componentsToUpgrade.forEach(component -> {
ComponentEvent needUpgradeEvent = new ComponentEvent(
component.getName(), !cancelUpgrade ? ComponentEventType.UPGRADE :
ComponentEventType.CANCEL_UPGRADE)
.setTargetSpec(component)
.setUpgradeVersion(upgradeVersion);
LOG.info("Upgrade component {} {}", component.getName(), cancelUpgrade);
context.scheduler.getDispatcher().getEventHandler()
.handle(needUpgradeEvent);
});
}
}
/**
* @return whether finalization of upgrade was successful.
*/
private boolean finalizeUpgrade() {
try {
// save the application id and state to
Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
fs, getName(), upgradeVersion);
targetSpec.setId(serviceSpec.getId());
targetSpec.setState(ServiceState.STABLE);
Map<String, Component> allComps = scheduler.getAllComponents();
targetSpec.getComponents().forEach(compSpec -> {
Component comp = allComps.get(compSpec.getName());
compSpec.setState(comp.getComponentSpec().getState());
});
jsonSerDeser.save(fs.getFileSystem(),
ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true);
fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
} catch (IOException e) {
LOG.error("Upgrade did not complete because unable to re-write the" +
" service definition", e);
return false;
private boolean finalizeUpgrade(boolean cancelUpgrade) {
if (!cancelUpgrade) {
try {
// save the application id and state to
Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
fs, getName(), upgradeVersion);
targetSpec.setId(serviceSpec.getId());
targetSpec.setState(ServiceState.STABLE);
Map<String, Component> allComps = scheduler.getAllComponents();
targetSpec.getComponents().forEach(compSpec -> {
Component comp = allComps.get(compSpec.getName());
compSpec.setState(comp.getComponentSpec().getState());
});
jsonSerDeser.save(fs.getFileSystem(),
ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true);
} catch (IOException e) {
LOG.error("Upgrade did not complete because unable to re-write the" +
" service definition", e);
return false;
}
}
try {
fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
String upgradeVersionToDel = cancelUpgrade? cancelledVersion :
upgradeVersion;
LOG.info("[SERVICE]: delete upgrade dir version {}", upgradeVersionToDel);
fs.deleteClusterUpgradeDir(getName(), upgradeVersionToDel);
for (String comp : compsAffectedByUpgrade) {
String compDirVersionToDel = cancelUpgrade? cancelledVersion :
serviceSpec.getVersion();
LOG.info("[SERVICE]: delete {} dir version {}", comp,
compDirVersionToDel);
fs.deleteComponentDir(compDirVersionToDel, comp);
}
if (cancelUpgrade) {
fs.deleteComponentsVersionDirIfEmpty(cancelledVersion);
} else {
fs.deleteComponentsVersionDirIfEmpty(serviceSpec.getVersion());
}
} catch (IOException e) {
LOG.warn("Unable to delete upgrade definition for service {} " +
"version {}", getName(), upgradeVersion);
}
serviceSpec.setState(ServiceState.STABLE);
setServiceState(ServiceState.STABLE);
serviceSpec.setVersion(upgradeVersion);
upgradeVersion = null;
cancelledVersion = null;
compsAffectedByUpgrade.clear();
return true;
}
@ -290,9 +405,59 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
context.fs, context.service.getName(), upgradeVersion);
List<org.apache.hadoop.yarn.service.api.records.Component>
compsNeedUpgradeList = resolveCompsToUpgrade(context.service,
targetSpec);
ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
.setVersion(upgradeVersion)
.setAutoFinalize(autoFinalize)
.setExpressUpgrade(expressUpgrade);
if (expressUpgrade) {
// In case of express upgrade components need to be upgraded in order.
// Once the service manager gets notified that a component finished
// upgrading, it then issues event to upgrade the next component.
Map<String, org.apache.hadoop.yarn.service.api.records.Component>
compsNeedUpgradeByName = new HashMap<>();
if (compsNeedUpgradeList != null) {
compsNeedUpgradeList.forEach(component ->
compsNeedUpgradeByName.put(component.getName(), component));
}
List<String> resolvedComps = ServiceApiUtil
.resolveCompsDependency(targetSpec);
List<org.apache.hadoop.yarn.service.api.records.Component>
orderedCompUpgrade = new LinkedList<>();
resolvedComps.forEach(compName -> {
org.apache.hadoop.yarn.service.api.records.Component component =
compsNeedUpgradeByName.get(compName);
if (component != null ) {
orderedCompUpgrade.add(component);
}
});
event.setCompsToUpgrade(orderedCompUpgrade);
} else {
event.setCompsToUpgrade(compsNeedUpgradeList);
}
context.scheduler.getDispatcher().getEventHandler().handle(
event);
if (autoFinalize && (compsNeedUpgradeList == null ||
compsNeedUpgradeList.isEmpty())) {
// nothing to upgrade and auto finalize is requested, trigger a
// state check.
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.CHECK_STABLE));
}
}
private List<org.apache.hadoop.yarn.service.api.records.Component>
resolveCompsToUpgrade(Service sourceSpec, Service targetSpec) {
List<org.apache.hadoop.yarn.service.api.records.Component>
compsNeedUpgradeList = componentsFinder.
findTargetComponentSpecs(context.service, targetSpec);
findTargetComponentSpecs(sourceSpec, targetSpec);
// remove all components from need upgrade list if there restart policy
// doesn't all upgrade.
@ -316,54 +481,20 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
});
}
ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
.setVersion(upgradeVersion)
.setAutoFinalize(autoFinalize)
.setExpressUpgrade(expressUpgrade);
return compsNeedUpgradeList;
}
if (expressUpgrade) {
// In case of express upgrade components need to be upgraded in order.
// Once the service manager gets notified that a component finished
// upgrading, it then issues event to upgrade the next component.
Map<String, org.apache.hadoop.yarn.service.api.records.Component>
compsNeedUpgradeByName = new HashMap<>();
if (compsNeedUpgradeList != null) {
compsNeedUpgradeList.forEach(component ->
compsNeedUpgradeByName.put(component.getName(), component));
}
List<String> resolvedComps = ServiceApiUtil
.resolveCompsDependency(targetSpec);
Queue<org.apache.hadoop.yarn.service.api.records.Component>
orderedCompUpgrade = new LinkedList<>();
resolvedComps.forEach(compName -> {
org.apache.hadoop.yarn.service.api.records.Component component =
compsNeedUpgradeByName.get(compName);
if (component != null ) {
orderedCompUpgrade.add(component);
}
});
event.setCompsToUpgradeInOrder(orderedCompUpgrade);
}
context.scheduler.getDispatcher().getEventHandler().handle(event);
if (compsNeedUpgradeList != null && !compsNeedUpgradeList.isEmpty()) {
if (!expressUpgrade) {
compsNeedUpgradeList.forEach(component -> {
ComponentEvent needUpgradeEvent = new ComponentEvent(
component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
component).setUpgradeVersion(event.getVersion());
context.scheduler.getDispatcher().getEventHandler().handle(
needUpgradeEvent);
});
}
} else if (autoFinalize) {
// nothing to upgrade if upgrade auto finalize is requested, trigger a
// state check.
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.CHECK_STABLE));
/**
* Sets the state of the service in the service spec.
* @param state service state
*/
private void setServiceState(
org.apache.hadoop.yarn.service.api.records.ServiceState state) {
org.apache.hadoop.yarn.service.api.records.ServiceState curState =
serviceSpec.getState();
if (!curState.equals(state)) {
serviceSpec.setState(state);
LOG.info("[SERVICE] spec state changed from {} -> {}", curState, state);
}
}

View File

@ -27,5 +27,5 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
public enum ContainerState {
RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING, SUCCEEDED,
FAILED;
FAILED, FAILED_UPGRADE;
}

View File

@ -29,5 +29,10 @@ import org.apache.hadoop.classification.InterfaceStability;
@ApiModel(description = "The current state of an service.")
public enum ServiceState {
ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED;
UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED, CANCEL_UPGRADING;
public static boolean isUpgrading(ServiceState state) {
return state.equals(UPGRADING) || state.equals(UPGRADING_AUTO_FINALIZE)
|| state.equals(EXPRESS_UPGRADING);
}
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@ -352,6 +353,26 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return actionUpgrade(persistedService, containersToUpgrade);
}
@Override
public int actionCancelUpgrade(String appName) throws IOException,
YarnException {
Service liveService = getStatus(appName);
if (liveService == null ||
!ServiceState.isUpgrading(liveService.getState())) {
throw new YarnException("Service " + appName + " is not upgrading, " +
"so nothing to cancel.");
}
ApplicationReport appReport = yarnClient.getApplicationReport(
getAppId(appName));
if (StringUtils.isEmpty(appReport.getHost())) {
throw new YarnException(appName + " AM hostname is empty");
}
ClientAMProtocol proxy = createAMProxy(appName, appReport);
proxy.cancelUpgrade(CancelUpgradeRequestProto.newBuilder().build());
return EXIT_SUCCESS;
}
@Override
public int actionCleanUp(String appName, String userName) throws
IOException, YarnException {

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.service.component;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import static org.apache.hadoop.yarn.service.api.records.Component
@ -44,7 +43,6 @@ import org.apache.hadoop.yarn.service.ServiceEventType;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
import org.apache.hadoop.yarn.service.ContainerFailureTracker;
import org.apache.hadoop.yarn.service.ServiceContext;
@ -89,6 +87,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.CANCEL_UPGRADE;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.UPGRADE;
import static org.apache.hadoop.yarn.service.component.ComponentState.*;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*;
@ -126,9 +126,8 @@ public class Component implements EventHandler<ComponentEvent> {
new ConcurrentHashMap<>();
private boolean healthThresholdMonitorEnabled = false;
private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
private ComponentEvent upgradeEvent;
private AtomicLong numContainersThatNeedUpgrade = new AtomicLong(0);
private UpgradeStatus upgradeStatus = new UpgradeStatus();
private UpgradeStatus cancelUpgradeStatus = new UpgradeStatus();
private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
stateMachine;
@ -160,6 +159,8 @@ public class Component implements EventHandler<ComponentEvent> {
// Flex while previous flex is still in progress
.addTransition(FLEXING, EnumSet.of(FLEXING, STABLE), FLEX,
new FlexComponentTransition())
.addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
CHECK_STABLE, new CheckStableTransition())
// container failed while stable
.addTransition(STABLE, FLEXING, CONTAINER_COMPLETED,
@ -172,19 +173,28 @@ public class Component implements EventHandler<ComponentEvent> {
// For flex down, go to STABLE state
.addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
FLEX, new FlexComponentTransition())
.addTransition(STABLE, UPGRADING, ComponentEventType.UPGRADE,
new ComponentNeedsUpgradeTransition())
//Upgrade while previous upgrade is still in progress
.addTransition(UPGRADING, UPGRADING, ComponentEventType.UPGRADE,
new ComponentNeedsUpgradeTransition())
.addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE),
CHECK_STABLE, new CheckStableTransition())
.addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
CHECK_STABLE, new CheckStableTransition())
.addTransition(STABLE, UPGRADING, UPGRADE,
new NeedsUpgradeTransition())
.addTransition(STABLE, CANCEL_UPGRADING, CANCEL_UPGRADE,
new NeedsUpgradeTransition())
.addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE,
new CheckStableTransition())
.addTransition(UPGRADING, FLEXING, CONTAINER_COMPLETED,
new ContainerCompletedTransition())
// Cancel upgrade while previous upgrade is still in progress
.addTransition(UPGRADING, CANCEL_UPGRADING,
CANCEL_UPGRADE, new NeedsUpgradeTransition())
.addTransition(UPGRADING, EnumSet.of(UPGRADING, STABLE),
CHECK_STABLE, new CheckStableTransition())
.addTransition(UPGRADING, UPGRADING, CONTAINER_COMPLETED,
new CompletedAfterUpgradeTransition())
.addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, FLEXING,
STABLE), CHECK_STABLE, new CheckStableTransition())
.addTransition(CANCEL_UPGRADING, CANCEL_UPGRADING,
CONTAINER_COMPLETED, new CompletedAfterUpgradeTransition())
.addTransition(CANCEL_UPGRADING, FLEXING, CONTAINER_ALLOCATED,
new ContainerAllocatedTransition())
.installTopology();
public Component(
@ -332,7 +342,7 @@ public class Component implements EventHandler<ComponentEvent> {
+ before + " to " + event.getDesired());
component.requestContainers(delta);
component.createNumCompInstances(delta);
component.componentSpec.setState(
component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
component.getScheduler().getApp().setState(ServiceState.STARTED);
return FLEXING;
@ -430,11 +440,11 @@ public class Component implements EventHandler<ComponentEvent> {
if (component.getNumRunningInstances() + component
.getNumSucceededInstances() + component.getNumFailedInstances()
< component.getComponentSpec().getNumberOfContainers()) {
component.componentSpec.setState(
component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
return FLEXING;
} else{
component.componentSpec.setState(
component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
return STABLE;
}
@ -444,22 +454,22 @@ public class Component implements EventHandler<ComponentEvent> {
Component component) {
// if desired == running
if (component.componentMetrics.containersReady.value() == component
.getComponentSpec().getNumberOfContainers()
&& component.numContainersThatNeedUpgrade.get() == 0) {
component.componentSpec.setState(
.getComponentSpec().getNumberOfContainers() &&
!component.doesNeedUpgrade()) {
component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
return STABLE;
} else if (component.doesNeedUpgrade()) {
component.setComponentState(org.apache.hadoop.yarn.service.api.records.
ComponentState.NEEDS_UPGRADE);
return component.getState();
} else if (component.componentMetrics.containersReady.value() != component
.getComponentSpec().getNumberOfContainers()) {
component.componentSpec.setState(
component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
return FLEXING;
} else {
// component.numContainersThatNeedUpgrade.get() > 0
component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
records.ComponentState.NEEDS_UPGRADE);
return UPGRADING;
}
return component.getState();
}
// This method should be called whenever there is an increment or decrement
@ -467,22 +477,16 @@ public class Component implements EventHandler<ComponentEvent> {
//This should not matter for terminating components
private static synchronized void checkAndUpdateComponentState(
Component component, boolean isIncrement) {
org.apache.hadoop.yarn.service.api.records.ComponentState curState =
component.componentSpec.getState();
if (component.getRestartPolicyHandler().isLongLived()) {
if (isIncrement) {
// check if all containers are in READY state
if (component.numContainersThatNeedUpgrade.get() == 0
&& component.componentMetrics.containersReady.value()
== component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
if (!component.upgradeStatus.areContainersUpgrading() &&
!component.cancelUpgradeStatus.areContainersUpgrading() &&
component.componentMetrics.containersReady.value() ==
component.componentMetrics.containersDesired.value()) {
component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
if (curState != component.componentSpec.getState()) {
LOG.info("[COMPONENT {}] state changed from {} -> {}",
component.componentSpec.getName(), curState,
component.componentSpec.getState());
}
// component state change will trigger re-check of service state
component.context.getServiceManager().checkAndUpdateServiceState();
}
@ -491,19 +495,14 @@ public class Component implements EventHandler<ComponentEvent> {
// still need to verify the count before changing the component state
if (component.componentMetrics.containersReady.value()
< component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState
.FLEXING);
} else if (component.componentMetrics.containersReady.value()
== component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
component.setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
}
if (curState != component.componentSpec.getState()) {
LOG.info("[COMPONENT {}] state changed from {} -> {}",
component.componentSpec.getName(), curState,
component.componentSpec.getState());
}
// component state change will trigger re-check of service state
component.context.getServiceManager().checkAndUpdateServiceState();
}
@ -511,8 +510,8 @@ public class Component implements EventHandler<ComponentEvent> {
// component state change will trigger re-check of service state
component.context.getServiceManager().checkAndUpdateServiceState();
}
// when the service is stable then the state of component needs to
// transition to stable
// triggers the state machine in component to reach appropriate state
// once the state in spec is changed.
component.dispatcher.getEventHandler().handle(
new ComponentEvent(component.getName(),
ComponentEventType.CHECK_STABLE));
@ -544,25 +543,43 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
private static class ComponentNeedsUpgradeTransition extends BaseTransition {
private static class CompletedAfterUpgradeTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
component.upgradeInProgress.set(true);
component.upgradeEvent = event;
component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
records.ComponentState.NEEDS_UPGRADE);
component.numContainersThatNeedUpgrade.set(
Preconditions.checkNotNull(event.getContainerId());
component.updateMetrics(event.getStatus());
component.dispatcher.getEventHandler().handle(
new ComponentInstanceEvent(event.getContainerId(), STOP)
.setStatus(event.getStatus()));
}
}
private static class NeedsUpgradeTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
boolean isCancel = event.getType().equals(CANCEL_UPGRADE);
UpgradeStatus status = !isCancel ? component.upgradeStatus :
component.cancelUpgradeStatus;
status.inProgress.set(true);
status.targetSpec = event.getTargetSpec();
status.targetVersion = event.getUpgradeVersion();
LOG.info("[COMPONENT {}]: need upgrade to {}",
component.getName(), status.targetVersion);
status.containersNeedUpgrade.set(
component.componentSpec.getNumberOfContainers());
component.componentSpec.getContainers().forEach(container -> {
container.setState(ContainerState.NEEDS_UPGRADE);
if (event.isExpressUpgrade()) {
ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
ContainerId.fromString(container.getId()),
ComponentInstanceEventType.UPGRADE);
LOG.info("Upgrade container {}", container.getId());
component.dispatcher.getEventHandler().handle(upgradeEvent);
}
component.setComponentState(org.apache.hadoop.yarn.service.api.
records.ComponentState.NEEDS_UPGRADE);
component.getAllComponentInstances().forEach(instance -> {
instance.setContainerState(ContainerState.NEEDS_UPGRADE);
});
if (event.getType().equals(CANCEL_UPGRADE)) {
component.upgradeStatus.reset();
}
}
}
@ -572,22 +589,22 @@ public class Component implements EventHandler<ComponentEvent> {
@Override
public ComponentState transition(Component component,
ComponentEvent componentEvent) {
org.apache.hadoop.yarn.service.api.records.ComponentState currState =
component.componentSpec.getState();
if (currState.equals(org.apache.hadoop.yarn.service.api.records
.ComponentState.STABLE)) {
return ComponentState.STABLE;
}
// checkIfStable also updates the state in definition when STABLE
ComponentState targetState = checkIfStable(component);
if (targetState.equals(STABLE) && component.upgradeInProgress.get()) {
component.componentSpec.overwrite(
component.upgradeEvent.getTargetSpec());
component.upgradeEvent = null;
if (targetState.equals(STABLE) &&
!(component.upgradeStatus.isCompleted() &&
component.cancelUpgradeStatus.isCompleted())) {
// Component stable after upgrade or cancel upgrade
UpgradeStatus status = !component.cancelUpgradeStatus.isCompleted() ?
component.cancelUpgradeStatus : component.upgradeStatus;
component.componentSpec.overwrite(status.getTargetSpec());
status.reset();
ServiceEvent checkStable = new ServiceEvent(ServiceEventType.
CHECK_STABLE);
component.dispatcher.getEventHandler().handle(checkStable);
component.upgradeInProgress.set(false);
}
return targetState;
}
@ -625,11 +642,14 @@ public class Component implements EventHandler<ComponentEvent> {
"[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ",
getName(), container.getId(), instance.getCompInstanceName(),
container.getNodeId());
if (upgradeInProgress.get()) {
if (!(upgradeStatus.isCompleted() && cancelUpgradeStatus.isCompleted())) {
UpgradeStatus status = !cancelUpgradeStatus.isCompleted() ?
cancelUpgradeStatus : upgradeStatus;
scheduler.getContainerLaunchService()
.launchCompInstance(scheduler.getApp(), instance, container,
createLaunchContext(upgradeEvent.getTargetSpec(),
upgradeEvent.getUpgradeVersion()));
createLaunchContext(status.getTargetSpec(),
status.getTargetVersion()));
} else {
scheduler.getContainerLaunchService().launchCompInstance(
scheduler.getApp(), instance, container,
@ -830,6 +850,12 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
private boolean doesNeedUpgrade() {
return cancelUpgradeStatus.areContainersUpgrading() ||
upgradeStatus.areContainersUpgrading() ||
upgradeStatus.failed.get();
}
public boolean areDependenciesReady() {
List<String> dependencies = componentSpec.getDependencies();
if (ServiceUtils.isEmpty(dependencies)) {
@ -911,10 +937,6 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
public void decContainersThatNeedUpgrade() {
numContainersThatNeedUpgrade.decrementAndGet();
}
public int getNumReadyInstances() {
return componentMetrics.containersReady.value();
}
@ -972,10 +994,33 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
public ComponentEvent getUpgradeEvent() {
/**
* Returns whether a component is upgrading or not.
*/
public boolean isUpgrading() {
this.readLock.lock();
try {
return !(upgradeStatus.isCompleted() &&
cancelUpgradeStatus.isCompleted());
} finally {
this.readLock.unlock();
}
}
public UpgradeStatus getUpgradeStatus() {
this.readLock.lock();
try {
return upgradeEvent;
return upgradeStatus;
} finally {
this.readLock.unlock();
}
}
public UpgradeStatus getCancelUpgradeStatus() {
this.readLock.lock();
try {
return cancelUpgradeStatus;
} finally {
this.readLock.unlock();
}
@ -1013,6 +1058,70 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
/**
* Sets the state of the component in the component spec.
* @param state component state
*/
private void setComponentState(
org.apache.hadoop.yarn.service.api.records.ComponentState state) {
org.apache.hadoop.yarn.service.api.records.ComponentState curState =
componentSpec.getState();
if (!curState.equals(state)) {
componentSpec.setState(state);
LOG.info("[COMPONENT {}] spec state changed from {} -> {}",
componentSpec.getName(), curState, state);
}
}
/**
* Status of upgrade.
*/
public static class UpgradeStatus {
private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
private String targetVersion;
private AtomicBoolean inProgress = new AtomicBoolean(false);
private AtomicLong containersNeedUpgrade = new AtomicLong(0);
private AtomicBoolean failed = new AtomicBoolean(false);
public org.apache.hadoop.yarn.service.api.records.
Component getTargetSpec() {
return targetSpec;
}
public String getTargetVersion() {
return targetVersion;
}
/*
* @return whether the upgrade is completed or not
*/
public boolean isCompleted() {
return !inProgress.get();
}
public void decContainersThatNeedUpgrade() {
if (inProgress.get()) {
containersNeedUpgrade.decrementAndGet();
}
}
public void containerFailedUpgrade() {
failed.set(true);
}
void reset() {
containersNeedUpgrade.set(0);
targetSpec = null;
targetVersion = null;
inProgress.set(false);
failed.set(false);
}
boolean areContainersUpgrading() {
return containersNeedUpgrade.get() != 0;
}
}
public ServiceContext getContext() {
return context;
}

View File

@ -35,7 +35,6 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
private ContainerId containerId;
private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
private String upgradeVersion;
private boolean expressUpgrade;
public ContainerId getContainerId() {
return containerId;
@ -114,13 +113,4 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
this.upgradeVersion = upgradeVersion;
return this;
}
public boolean isExpressUpgrade() {
return expressUpgrade;
}
public ComponentEvent setExpressUpgrade(boolean expressUpgrade) {
this.expressUpgrade = expressUpgrade;
return this;
}
}

View File

@ -24,6 +24,7 @@ public enum ComponentEventType {
CONTAINER_RECOVERED,
CONTAINER_STARTED,
CONTAINER_COMPLETED,
CANCEL_UPGRADE,
UPGRADE,
CHECK_STABLE
}

View File

@ -22,5 +22,6 @@ public enum ComponentState {
INIT,
FLEXING,
STABLE,
UPGRADING
UPGRADING,
CANCEL_UPGRADING
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@ -64,8 +65,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Date;
import java.util.EnumSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@ -84,8 +87,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
LoggerFactory.getLogger(ComponentInstance.class);
private static final String FAILED_BEFORE_LAUNCH_DIAG =
"failed before launch";
private static final String UPGRADE_FAILED = "upgrade failed";
private StateMachine<ComponentInstanceState, ComponentInstanceEventType,
private StateMachine<ComponentInstanceState, ComponentInstanceEventType,
ComponentInstanceEvent> stateMachine;
private Component component;
private final ReadLock readLock;
@ -106,7 +110,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// This container object is used for rest API query
private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
private String serviceVersion;
private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
private boolean pendingCancelUpgrade = false;
private static final StateMachineFactory<ComponentInstance,
ComponentInstanceState, ComponentInstanceEventType,
@ -132,13 +137,23 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
.addTransition(READY, STARTED, BECOME_NOT_READY,
new ContainerBecomeNotReadyTransition())
.addTransition(READY, INIT, STOP, new ContainerStoppedTransition())
.addTransition(READY, UPGRADING, UPGRADE,
new ContainerUpgradeTransition())
.addTransition(UPGRADING, UPGRADING, UPGRADE,
new ContainerUpgradeTransition())
.addTransition(UPGRADING, READY, BECOME_READY,
new ContainerBecomeReadyTransition())
.addTransition(UPGRADING, INIT, STOP, new ContainerStoppedTransition())
.addTransition(READY, UPGRADING, UPGRADE, new UpgradeTransition())
.addTransition(READY, EnumSet.of(READY, CANCEL_UPGRADING), CANCEL_UPGRADE,
new CancelUpgradeTransition())
// FROM UPGRADING
.addTransition(UPGRADING, EnumSet.of(READY, CANCEL_UPGRADING),
CANCEL_UPGRADE, new CancelUpgradeTransition())
.addTransition(UPGRADING, EnumSet.of(READY), BECOME_READY,
new ReadyAfterUpgradeTransition())
.addTransition(UPGRADING, UPGRADING, STOP,
new StoppedAfterUpgradeTransition())
// FROM CANCEL_UPGRADING
.addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, READY),
BECOME_READY, new ReadyAfterUpgradeTransition())
.addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT),
STOP, new StoppedAfterCancelUpgradeTransition())
.installTopology();
public ComponentInstance(Component component,
@ -217,24 +232,53 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
@Override
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
compInstance.containerSpec.setState(ContainerState.READY);
if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
compInstance.component.incContainersReady(false);
compInstance.component.decContainersThatNeedUpgrade();
compInstance.serviceVersion = compInstance.component.getUpgradeEvent()
.getUpgradeVersion();
ComponentEvent checkState = new ComponentEvent(
compInstance.component.getName(), ComponentEventType.CHECK_STABLE);
compInstance.scheduler.getDispatcher().getEventHandler().handle(
checkState);
compInstance.setContainerState(ContainerState.READY);
compInstance.component.incContainersReady(true);
compInstance.postContainerReady();
}
}
} else {
compInstance.component.incContainersReady(true);
}
if (compInstance.timelineServiceEnabled) {
compInstance.serviceTimelinePublisher
.componentInstanceBecomeReady(compInstance.containerSpec);
private static class ReadyAfterUpgradeTransition implements
MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
ComponentInstanceState> {
@Override
public ComponentInstanceState transition(ComponentInstance instance,
ComponentInstanceEvent event) {
if (instance.pendingCancelUpgrade) {
// cancellation of upgrade was triggered before the upgrade was
// finished.
LOG.info("{} received ready but cancellation pending",
event.getContainerId());
instance.upgradeInProgress.set(true);
instance.cancelUpgrade();
instance.pendingCancelUpgrade = false;
return instance.getState();
}
instance.upgradeInProgress.set(false);
instance.setContainerState(ContainerState.READY);
instance.component.incContainersReady(false);
Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
instance.component.getUpgradeStatus() :
instance.component.getCancelUpgradeStatus();
status.decContainersThatNeedUpgrade();
instance.serviceVersion = status.getTargetVersion();
ComponentEvent checkState = new ComponentEvent(
instance.component.getName(),
ComponentEventType.CHECK_STABLE);
instance.scheduler.getDispatcher().getEventHandler().handle(checkState);
instance.postContainerReady();
return ComponentInstanceState.READY;
}
}
private void postContainerReady() {
if (timelineServiceEnabled) {
serviceTimelinePublisher.componentInstanceBecomeReady(containerSpec);
}
}
@ -242,7 +286,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
@Override
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY);
compInstance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
compInstance.component.decContainersReady(true);
}
}
@ -276,11 +320,13 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
" completed. Reinsert back to pending list and requested ");
builder.append("a new container.").append(System.lineSeparator());
builder.append(" exitStatus=").append(
failureBeforeLaunch ? null : event.getStatus().getExitStatus());
failureBeforeLaunch || event.getStatus() == null ? null :
event.getStatus().getExitStatus());
builder.append(", diagnostics=");
builder.append(failureBeforeLaunch ?
FAILED_BEFORE_LAUNCH_DIAG :
event.getStatus().getDiagnostics());
(event.getStatus() != null ? event.getStatus().getDiagnostics() :
UPGRADE_FAILED));
if (event.getStatus() != null && event.getStatus().getExitStatus() != 0) {
LOG.error(builder.toString());
@ -342,15 +388,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
ComponentInstanceEvent event) {
Component comp = compInstance.component;
ContainerStatus status = event.getStatus();
// status is not available when upgrade fails
String containerDiag = compInstance.getCompInstanceId() + ": " + (
failedBeforeLaunching ?
FAILED_BEFORE_LAUNCH_DIAG :
event.getStatus().getDiagnostics());
failedBeforeLaunching ? FAILED_BEFORE_LAUNCH_DIAG :
(status != null ? status.getDiagnostics() : UPGRADE_FAILED));
compInstance.diagnostics.append(containerDiag + System.lineSeparator());
compInstance.cancelContainerStatusRetriever();
if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
compInstance.component.decContainersThatNeedUpgrade();
}
if (compInstance.getState().equals(READY)) {
compInstance.component.decContainersReady(true);
}
@ -387,10 +432,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// record in ATS
compInstance.scheduler.getServiceTimelinePublisher()
.componentInstanceFinished(compInstance.getContainer().getId(),
failedBeforeLaunching ?
-1 :
event.getStatus().getExitStatus(), ContainerState.FAILED,
containerDiag);
failedBeforeLaunching || status == null ? -1 :
status.getExitStatus(),
ContainerState.FAILED, containerDiag);
// mark other component-instances/containers as STOPPED
for (ContainerId containerId : scheduler.getLiveInstances()
@ -449,28 +493,129 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
.equals(state) || ContainerState.SUCCEEDED.equals(state);
}
private static class ContainerUpgradeTransition extends BaseTransition {
private static class StoppedAfterUpgradeTransition extends
BaseTransition {
@Override
public void transition(ComponentInstance compInstance,
public void transition(ComponentInstance instance,
ComponentInstanceEvent event) {
if (!compInstance.containerSpec.getState().equals(
ContainerState.NEEDS_UPGRADE)) {
//nothing to upgrade. this may happen with express upgrade.
instance.component.getUpgradeStatus().decContainersThatNeedUpgrade();
instance.component.decRunningContainers();
final ServiceScheduler scheduler = instance.component.getScheduler();
scheduler.getAmRMClient().releaseAssignedContainer(
event.getContainerId());
instance.scheduler.executorService.submit(
() -> instance.cleanupRegistry(event.getContainerId()));
scheduler.removeLiveCompInstance(event.getContainerId());
instance.component.getUpgradeStatus().containerFailedUpgrade();
instance.setContainerState(ContainerState.FAILED_UPGRADE);
instance.upgradeInProgress.set(false);
}
}
private static class StoppedAfterCancelUpgradeTransition implements
MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
ComponentInstanceState> {
private ContainerStoppedTransition stoppedTransition =
new ContainerStoppedTransition();
@Override
public ComponentInstanceState transition(ComponentInstance instance,
ComponentInstanceEvent event) {
if (instance.pendingCancelUpgrade) {
// cancellation of upgrade was triggered before the upgrade was
// finished.
LOG.info("{} received stopped but cancellation pending",
event.getContainerId());
instance.upgradeInProgress.set(true);
instance.cancelUpgrade();
instance.pendingCancelUpgrade = false;
return instance.getState();
}
// When upgrade is cancelled, and container re-init fails
instance.component.getCancelUpgradeStatus()
.decContainersThatNeedUpgrade();
instance.upgradeInProgress.set(false);
stoppedTransition.transition(instance, event);
return ComponentInstanceState.INIT;
}
}
private static class UpgradeTransition extends BaseTransition {
@Override
public void transition(ComponentInstance instance,
ComponentInstanceEvent event) {
if (!instance.component.getCancelUpgradeStatus().isCompleted()) {
// last check to see if cancellation was triggered. The component may
// have processed the cancel upgrade event but the instance doesn't know
// it yet. If cancellation has been triggered then no point in
// upgrading.
return;
}
compInstance.containerSpec.setState(ContainerState.UPGRADING);
compInstance.component.decContainersReady(false);
ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent();
compInstance.scheduler.getContainerLaunchService()
.reInitCompInstance(compInstance.scheduler.getApp(), compInstance,
compInstance.container,
compInstance.component.createLaunchContext(
upgradeEvent.getTargetSpec(),
upgradeEvent.getUpgradeVersion()));
instance.upgradeInProgress.set(true);
instance.setContainerState(ContainerState.UPGRADING);
instance.component.decContainersReady(false);
Component.UpgradeStatus status = instance.component.getUpgradeStatus();
instance.scheduler.getContainerLaunchService()
.reInitCompInstance(instance.scheduler.getApp(), instance,
instance.container,
instance.component.createLaunchContext(
status.getTargetSpec(),
status.getTargetVersion()));
}
}
private static class CancelUpgradeTransition implements
MultipleArcTransition<ComponentInstance, ComponentInstanceEvent,
ComponentInstanceState> {
@Override
public ComponentInstanceState transition(ComponentInstance instance,
ComponentInstanceEvent event) {
if (instance.upgradeInProgress.compareAndSet(false, true)) {
Component.UpgradeStatus cancelStatus = instance.component
.getCancelUpgradeStatus();
if (instance.getServiceVersion().equals(
cancelStatus.getTargetVersion())) {
// previous upgrade didn't happen so just go back to READY
LOG.info("{} nothing to cancel", event.getContainerId());
cancelStatus.decContainersThatNeedUpgrade();
instance.setContainerState(ContainerState.READY);
ComponentEvent checkState = new ComponentEvent(
instance.component.getName(), ComponentEventType.CHECK_STABLE);
instance.scheduler.getDispatcher().getEventHandler()
.handle(checkState);
return ComponentInstanceState.READY;
} else {
instance.component.decContainersReady(false);
instance.cancelUpgrade();
}
} else {
LOG.info("{} pending cancellation", event.getContainerId());
instance.pendingCancelUpgrade = true;
}
return ComponentInstanceState.CANCEL_UPGRADING;
}
}
private void cancelUpgrade() {
LOG.info("{} cancelling upgrade", container.getId());
setContainerState(ContainerState.UPGRADING);
Component.UpgradeStatus cancelStatus = component.getCancelUpgradeStatus();
scheduler.getContainerLaunchService()
.reInitCompInstance(scheduler.getApp(), this,
this.container, this.component.createLaunchContext(
cancelStatus.getTargetSpec(),
cancelStatus.getTargetVersion()));
}
public ComponentInstanceState getState() {
this.readLock.lock();
@ -505,6 +650,26 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
}
/**
* Sets the state of the container in the container spec. It is write
* protected.
*
* @param state container state
*/
public void setContainerState(ContainerState state) {
this.writeLock.lock();
try {
ContainerState curState = containerSpec.getState();
if (!curState.equals(state)) {
containerSpec.setState(state);
LOG.info("{} spec state state changed from {} -> {}",
getCompInstanceId(), curState, state);
}
} finally {
this.writeLock.unlock();
}
}
@Override
public void handle(ComponentInstanceEvent event) {
try {

View File

@ -23,5 +23,6 @@ public enum ComponentInstanceEventType {
STOP,
BECOME_READY,
BECOME_NOT_READY,
UPGRADE
UPGRADE,
CANCEL_UPGRADE
}

View File

@ -22,5 +22,6 @@ public enum ComponentInstanceState {
INIT,
STARTED,
READY,
UPGRADING
UPGRADING,
CANCEL_UPGRADING
}

View File

@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
@ -113,7 +112,8 @@ public class ContainerLaunchService extends AbstractService{
.startContainerAsync(container,
launcher.completeContainerLaunch());
} else {
LOG.info("reInitializing container {}", container.getId());
LOG.info("reInitializing container {} with version {}",
container.getId(), componentLaunchContext.getServiceVersion());
instance.getComponent().getScheduler().getNmClient()
.reInitializeContainerAsync(container.getId(),
launcher.completeContainerLaunch(), true);

View File

@ -30,6 +30,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@ -141,4 +143,15 @@ public class ClientAMProtocolPBClientImpl
}
return null;
}
@Override
public CancelUpgradeResponseProto cancelUpgrade(
CancelUpgradeRequestProto request) throws IOException, YarnException {
try {
return proxy.cancelUpgrade(null, request);
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
}
return null;
}
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.impl.pb.service;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
@ -116,4 +118,15 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB {
throw new ServiceException(e);
}
}
@Override
public CancelUpgradeResponseProto cancelUpgrade(
RpcController controller, CancelUpgradeRequestProto request)
throws ServiceException {
try {
return real.cancelUpgrade(request);
} catch (IOException | YarnException e) {
throw new ServiceException(e);
}
}
}

View File

@ -167,9 +167,8 @@ public class ProviderUtils implements YarnServiceConstants {
public static Path initCompInstanceDir(SliderFileSystem fs,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
ComponentInstance instance) {
Path compDir = new Path(new Path(fs.getAppDir(), "components"),
compLaunchContext.getServiceVersion() + "/" +
compLaunchContext.getName());
Path compDir = fs.getComponentDir(compLaunchContext.getServiceVersion(),
compLaunchContext.getName());
Path compInstanceDir = new Path(compDir, instance.getCompInstanceName());
instance.setCompInstanceDir(compInstanceDir);
return compInstanceDir;
@ -184,7 +183,9 @@ public class ProviderUtils implements YarnServiceConstants {
ServiceContext context) throws IOException {
Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext, instance);
if (!fs.getFileSystem().exists(compInstanceDir)) {
log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir);
log.info("{} version {} : Creating dir on hdfs: {}",
instance.getCompInstanceId(), compLaunchContext.getServiceVersion(),
compInstanceDir);
fs.getFileSystem().mkdirs(compInstanceDir,
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
} else {

View File

@ -603,7 +603,7 @@ public class ServiceApiUtil {
public static void validateInstancesUpgrade(List<Container>
liveContainers) throws YarnException {
for (Container liveContainer : liveContainers) {
if (!liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) {
if (!isUpgradable(liveContainer)) {
// Nothing to upgrade
throw new YarnException(String.format(
ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE,
@ -612,6 +612,16 @@ public class ServiceApiUtil {
}
}
/**
* Returns whether the container can be upgraded in the current state.
*/
public static boolean isUpgradable(Container container) {
return container.getState() != null &&
(container.getState().equals(ContainerState.NEEDS_UPGRADE) ||
container.getState().equals(ContainerState.FAILED_UPGRADE));
}
/**
* Validates the components that are requested to upgrade require an upgrade.
* It returns the instances of the components which need upgrade.
@ -629,7 +639,7 @@ public class ServiceApiUtil {
ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName()));
}
liveComp.getContainers().forEach(liveContainer -> {
if (liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) {
if (isUpgradable(liveContainer)) {
containerNeedUpgrade.add(liveContainer);
}
});

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
@ -48,4 +50,51 @@ public class SliderFileSystem extends CoreFileSystem {
public Path getAppDir() {
return this.appDir;
}
/**
* Returns the component directory path.
*
* @param serviceVersion service version
* @param compName component name
* @return component directory
*/
public Path getComponentDir(String serviceVersion, String compName) {
return new Path(new Path(getAppDir(), "components"),
serviceVersion + "/" + compName);
}
/**
* Deletes the component directory.
*
* @param serviceVersion
* @param compName
* @throws IOException
*/
public void deleteComponentDir(String serviceVersion, String compName)
throws IOException {
Path path = getComponentDir(serviceVersion, compName);
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
LOG.debug("deleted dir {}", path);
}
}
/**
* Deletes the components version directory.
*
* @param serviceVersion
* @throws IOException
*/
public void deleteComponentsVersionDirIfEmpty(String serviceVersion)
throws IOException {
Path path = new Path(new Path(getAppDir(), "components"), serviceVersion);
if (fileSystem.exists(path) && fileSystem.listStatus(path).length == 0) {
fileSystem.delete(path, true);
LOG.info("deleted dir {}", path);
}
}
private static final Logger LOG = LoggerFactory.getLogger(
SliderFileSystem.class);
}

View File

@ -28,6 +28,8 @@ service ClientAMProtocolService {
rpc stop(StopRequestProto) returns (StopResponseProto);
rpc upgradeService(UpgradeServiceRequestProto)
returns (UpgradeServiceResponseProto);
rpc cancelUpgrade(CancelUpgradeRequestProto)
returns (CancelUpgradeResponseProto);
rpc restartService(RestartServiceRequestProto)
returns (RestartServiceResponseProto);
rpc upgrade(CompInstancesUpgradeRequestProto) returns
@ -73,6 +75,12 @@ message UpgradeServiceResponseProto {
optional string error = 1;
}
message CancelUpgradeRequestProto {
}
message CancelUpgradeResponseProto {
}
message RestartServiceRequestProto {
}

View File

@ -94,21 +94,25 @@ public class MockRunningServiceContext extends ServiceContext {
return mockLaunchService;
}
@Override public ServiceUtils.ProcessTerminationHandler
getTerminationHandler() {
@Override
public ServiceUtils.ProcessTerminationHandler getTerminationHandler() {
return new
ServiceUtils.ProcessTerminationHandler() {
public void terminate(int exitCode) {
}
};
ServiceUtils.ProcessTerminationHandler() {
public void terminate(int exitCode) {
}
};
}
@Override
protected ServiceManager createServiceManager() {
return ServiceTestUtils.createServiceManager(
MockRunningServiceContext.this);
}
};
this.scheduler.init(fsWatcher.getConf());
ServiceTestUtils.createServiceManager(this);
doNothing().when(mockLaunchService).
reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
stabilizeComponents(this);

View File

@ -384,6 +384,7 @@ public class ServiceTestUtils {
conf.set(YARN_SERVICE_BASE_PATH, serviceBasePath.toString());
try {
fs = new SliderFileSystem(conf);
fs.setAppDir(new Path(serviceBasePath.toString()));
} catch (IOException e) {
Throwables.propagate(e);
}
@ -532,7 +533,6 @@ public class ServiceTestUtils {
GenericTestUtils.waitFor(() -> {
try {
Service retrievedApp = client.getStatus(exampleApp.getName());
System.out.println(retrievedApp);
return retrievedApp.getState() == desiredState;
} catch (Exception e) {
e.printStackTrace();

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
@ -65,7 +66,7 @@ public class TestServiceManager {
initUpgrade(context, "v2", false, false, false);
ServiceManager manager = context.getServiceManager();
//make components stable by upgrading all instances
upgradeAllInstances(context);
upgradeAndReadyAllInstances(context);
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.START));
@ -83,7 +84,7 @@ public class TestServiceManager {
initUpgrade(context, "v2", false, true, false);
ServiceManager manager = context.getServiceManager();
//make components stable by upgrading all instances
upgradeAllInstances(context);
upgradeAndReadyAllInstances(context);
GenericTestUtils.waitFor(()->
context.service.getState().equals(ServiceState.STABLE),
@ -115,7 +116,7 @@ public class TestServiceManager {
manager.getServiceSpec().getState());
//make components stable by upgrading all instances
upgradeAllInstances(context);
upgradeAndReadyAllInstances(context);
// finalize service
context.scheduler.getDispatcher().getEventHandler().handle(
@ -138,7 +139,7 @@ public class TestServiceManager {
initUpgrade(context, "v2", true, true, false);
// make components stable
upgradeAllInstances(context);
upgradeAndReadyAllInstances(context);
GenericTestUtils.waitFor(() ->
context.service.getState().equals(ServiceState.STABLE),
@ -174,18 +175,17 @@ public class TestServiceManager {
public void testExpressUpgrade() throws Exception {
ServiceContext context = createServiceContext("testExpressUpgrade");
ServiceManager manager = context.getServiceManager();
manager.getServiceSpec().setState(
ServiceState.EXPRESS_UPGRADING);
manager.getServiceSpec().setState(ServiceState.EXPRESS_UPGRADING);
initUpgrade(context, "v2", true, true, true);
List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service);
// wait till instances of first component are in upgrade
String comp1 = comps.get(0);
upgradeInstancesOf(context, comp1);
// wait till instances of first component are upgraded and ready
String compA = comps.get(0);
makeInstancesReadyAfterUpgrade(context, compA);
// wait till instances of second component are in upgrade
String comp2 = comps.get(1);
upgradeInstancesOf(context, comp2);
// wait till instances of second component are upgraded and ready
String compB = comps.get(1);
makeInstancesReadyAfterUpgrade(context, compB);
GenericTestUtils.waitFor(() ->
context.service.getState().equals(ServiceState.STABLE),
@ -196,6 +196,57 @@ public class TestServiceManager {
validateUpgradeFinalization(manager.getName(), "v2");
}
@Test(timeout = TIMEOUT)
public void testCancelUpgrade() throws Exception {
ServiceContext context = createServiceContext("testCancelUpgrade");
writeInitialDef(context.service);
initUpgrade(context, "v2", true, false, false);
ServiceManager manager = context.getServiceManager();
Assert.assertEquals("service not upgrading", ServiceState.UPGRADING,
manager.getServiceSpec().getState());
List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service);
// wait till instances of first component are upgraded and ready
String compA = comps.get(0);
// upgrade the instances
upgradeInstances(context, compA);
makeInstancesReadyAfterUpgrade(context, compA);
// cancel upgrade
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.CANCEL_UPGRADE));
makeInstancesReadyAfterUpgrade(context, compA);
GenericTestUtils.waitFor(()->
context.service.getState().equals(ServiceState.STABLE),
CHECK_EVERY_MILLIS, TIMEOUT);
Assert.assertEquals("service upgrade not cancelled", ServiceState.STABLE,
manager.getServiceSpec().getState());
validateUpgradeFinalization(manager.getName(), "v1");
}
@Test(timeout = TIMEOUT)
public void testCancelUpgradeAfterInitiate() throws Exception {
ServiceContext context = createServiceContext("testCancelUpgrade");
writeInitialDef(context.service);
initUpgrade(context, "v2", true, false, false);
ServiceManager manager = context.getServiceManager();
Assert.assertEquals("service not upgrading", ServiceState.UPGRADING,
manager.getServiceSpec().getState());
// cancel upgrade
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.CANCEL_UPGRADE));
GenericTestUtils.waitFor(()->
context.service.getState().equals(ServiceState.STABLE),
CHECK_EVERY_MILLIS, TIMEOUT);
Assert.assertEquals("service upgrade not cancelled", ServiceState.STABLE,
manager.getServiceSpec().getState());
validateUpgradeFinalization(manager.getName(), "v1");
}
private void validateUpgradeFinalization(String serviceName,
String expectedVersion) throws IOException {
Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName);
@ -225,21 +276,23 @@ public class TestServiceManager {
}
writeUpgradedDef(upgradedDef);
serviceManager.processUpgradeRequest(version, autoFinalize, expressUpgrade);
ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
upgradeEvent.setVersion(version).setExpressUpgrade(expressUpgrade)
.setAutoFinalize(autoFinalize);
GenericTestUtils.waitFor(()-> {
ServiceState serviceState = context.service.getState();
if (serviceState.equals(ServiceState.UPGRADING) ||
serviceState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
serviceState.equals(ServiceState.EXPRESS_UPGRADING)) {
return true;
GenericTestUtils.waitFor(() -> {
for (Component comp : context.scheduler.getAllComponents().values()) {
if (!comp.getComponentSpec().getState().equals(
ComponentState.NEEDS_UPGRADE)) {
return false;
}
}
return false;
return true;
}, CHECK_EVERY_MILLIS, TIMEOUT);
}
private void upgradeAndReadyAllInstances(ServiceContext context) throws
TimeoutException, InterruptedException {
upgradeAllInstances(context);
makeAllInstancesReady(context);
}
private void upgradeAllInstances(ServiceContext context) throws
TimeoutException, InterruptedException {
// upgrade the instances
@ -248,8 +301,10 @@ public class TestServiceManager {
ComponentInstanceEventType.UPGRADE);
context.scheduler.getDispatcher().getEventHandler().handle(event);
}));
}
// become ready
private void makeAllInstancesReady(ServiceContext context)
throws TimeoutException, InterruptedException {
context.scheduler.getLiveInstances().forEach(((containerId, instance) -> {
ComponentInstanceEvent event = new ComponentInstanceEvent(containerId,
ComponentInstanceEventType.BECOME_READY);
@ -267,7 +322,19 @@ public class TestServiceManager {
}, CHECK_EVERY_MILLIS, TIMEOUT);
}
private void upgradeInstancesOf(ServiceContext context, String compName)
private void upgradeInstances(ServiceContext context, String compName) {
Collection<ComponentInstance> compInstances = context.scheduler
.getAllComponents().get(compName).getAllComponentInstances();
compInstances.forEach(instance -> {
ComponentInstanceEvent event = new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.UPGRADE);
context.scheduler.getDispatcher().getEventHandler().handle(event);
});
}
private void makeInstancesReadyAfterUpgrade(ServiceContext context,
String compName)
throws TimeoutException, InterruptedException {
Collection<ComponentInstance> compInstances = context.scheduler
.getAllComponents().get(compName).getAllComponentInstances();
@ -289,6 +356,15 @@ public class TestServiceManager {
context.scheduler.getDispatcher().getEventHandler().handle(event);
});
GenericTestUtils.waitFor(() -> {
for (ComponentInstance instance : compInstances) {
if (!instance.getContainerState().equals(ContainerState.READY)) {
return false;
}
}
return true;
}, CHECK_EVERY_MILLIS, TIMEOUT);
}
private ServiceContext createServiceContext(String name)
@ -324,6 +400,14 @@ public class TestServiceManager {
return artifact;
}
private void writeInitialDef(Service service)
throws IOException, SliderException {
Path servicePath = rule.getFs().buildClusterDirPath(
service.getName());
ServiceApiUtil.createDirAndPersistApp(rule.getFs(), servicePath,
service);
}
private void writeUpgradedDef(Service upgradedDef)
throws IOException, SliderException {
Path upgradePath = rule.getFs().buildClusterUpgradeDirPath(
@ -332,6 +416,6 @@ public class TestServiceManager {
upgradedDef);
}
private static final int TIMEOUT = 200000;
private static final int TIMEOUT = 10000;
private static final int CHECK_EVERY_MILLIS = 100;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
@ -450,6 +451,49 @@ public class TestYarnNativeServices extends ServiceTestUtils {
client.actionDestroy(service.getName());
}
@Test(timeout = 200000)
public void testCancelUpgrade() throws Exception {
setupInternal(NUM_NMS);
getConf().setBoolean(YARN_SERVICE_UPGRADE_ENABLED, true);
ServiceClient client = createClient(getConf());
Service service = createExampleApplication();
Component component = service.getComponents().iterator().next();
component.getConfiguration().getEnv().put("key1", "val0");
client.actionCreate(service);
waitForServiceToBeStable(client, service);
// upgrade the service
service.setState(ServiceState.UPGRADING);
service.setVersion("v2");
component.getConfiguration().getEnv().put("key1", "val1");
client.initiateUpgrade(service);
// wait for service to be in upgrade state
waitForServiceToBeInState(client, service, ServiceState.UPGRADING);
// upgrade 1 container
Service liveService = client.getStatus(service.getName());
Container container = liveService.getComponent(component.getName())
.getContainers().iterator().next();
client.actionUpgrade(service, Lists.newArrayList(container));
Thread.sleep(500);
// cancel the upgrade
client.actionCancelUpgrade(service.getName());
waitForServiceToBeStable(client, service);
Service active = client.getStatus(service.getName());
Assert.assertEquals("component not stable", ComponentState.STABLE,
active.getComponent(component.getName()).getState());
Assert.assertEquals("comp does not have new env", "val0",
active.getComponent(component.getName()).getConfiguration()
.getEnv("key1"));
LOG.info("Stop/destroy service {}", service);
client.actionStop(service.getName(), true);
client.actionDestroy(service.getName());
}
// Test to verify ANTI_AFFINITY placement policy
// 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler
// 2. Create an example service with 3 containers

View File

@ -221,6 +221,17 @@ public class TestServiceCLI {
Assert.assertEquals(result, 0);
}
@Test
public void testCancelUpgrade() throws Exception {
conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
DummyServiceClient.class.getName());
cli.setConf(conf);
String[] args = {"app", "-upgrade", "app-1",
"-cancel", "-appTypes", DUMMY_APP_TYPE};
int result = cli.run(ApplicationCLI.preProcessArgs(args));
Assert.assertEquals(result, 0);
}
@Test (timeout = 180000)
public void testEnableFastLaunch() throws Exception {
fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar"))
@ -332,5 +343,11 @@ public class TestServiceCLI {
throws IOException, YarnException {
return "";
}
@Override
public int actionCancelUpgrade(String appName) throws IOException,
YarnException {
return 0;
}
}
}

View File

@ -19,8 +19,8 @@
package org.apache.hadoop.yarn.service.component;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.TestServiceManager;
@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Rule;
@ -38,9 +37,8 @@ import org.junit.Test;
import java.util.Iterator;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_READY;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests for {@link Component}.
@ -78,14 +76,14 @@ public class TestComponent {
"val1")).setUpgradeVersion("v2"));
// one instance finished upgrading
comp.decContainersThatNeedUpgrade();
comp.getUpgradeStatus().decContainersThatNeedUpgrade();
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in need upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
// second instance finished upgrading
comp.decContainersThatNeedUpgrade();
comp.getUpgradeStatus().decContainersThatNeedUpgrade();
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
@ -97,7 +95,7 @@ public class TestComponent {
@Test
public void testContainerCompletedWhenUpgrading() throws Exception {
String serviceName = "testContainerComplete";
String serviceName = "testContainerCompletedWhenUpgrading";
MockRunningServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
@ -105,48 +103,233 @@ public class TestComponent {
comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
.setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
"val1")).setUpgradeVersion("v2"));
comp.getAllComponentInstances().forEach(instance -> {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE));
});
Iterator<ComponentInstance> instanceIter = comp.
getAllComponentInstances().iterator();
comp.getAllComponentInstances().forEach(instance ->
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE)));
// reinitialization of a container failed
ContainerStatus status = mock(ContainerStatus.class);
when(status.getExitStatus()).thenReturn(ContainerExitStatus.ABORTED);
ComponentInstance instance = instanceIter.next();
for(ComponentInstance instance : comp.getAllComponentInstances()) {
ComponentEvent stopEvent = new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED)
.setInstance(instance)
.setContainerId(instance.getContainer().getId());
comp.handle(stopEvent);
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), STOP));
}
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in needs upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
}
@Test
public void testCancelUpgrade() throws Exception {
ServiceContext context = createTestContext(rule, "testCancelUpgrade");
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(),
ComponentEventType.CANCEL_UPGRADE);
comp.handle(upgradeEvent);
Assert.assertEquals("component not in need upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
Assert.assertEquals(
org.apache.hadoop.yarn.service.component.ComponentState
.CANCEL_UPGRADING, comp.getState());
}
@Test
public void testContainerCompletedCancelUpgrade() throws Exception {
String serviceName = "testContainerCompletedCancelUpgrade";
MockRunningServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
// upgrade completes
comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
.setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
"val1")).setUpgradeVersion("v2"));
comp.getAllComponentInstances().forEach(instance ->
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.UPGRADE)));
// reinitialization of a container done
for(ComponentInstance instance : comp.getAllComponentInstances()) {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), BECOME_READY));
}
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CANCEL_UPGRADE)
.setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
"val0")).setUpgradeVersion("v1"));
comp.getAllComponentInstances().forEach(instance ->
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.CANCEL_UPGRADE)));
Iterator<ComponentInstance> iter = comp.getAllComponentInstances()
.iterator();
// cancel upgrade failed of a container
ComponentInstance instance1 = iter.next();
ComponentEvent stopEvent = new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED)
.setInstance(instance).setContainerId(instance.getContainer().getId())
.setStatus(status);
.setInstance(instance1)
.setContainerId(instance1.getContainer().getId());
comp.handle(stopEvent);
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
STOP).setStatus(status));
instance1.handle(new ComponentInstanceEvent(
instance1.getContainer().getId(), STOP));
Assert.assertEquals(
org.apache.hadoop.yarn.service.component.ComponentState
.CANCEL_UPGRADING, comp.getState());
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in needs upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
Assert.assertEquals(
org.apache.hadoop.yarn.service.component.ComponentState
.CANCEL_UPGRADING, comp.getState());
// second instance finished upgrading
ComponentInstance instance2 = iter.next();
instance2.handle(new ComponentInstanceEvent(
instance2.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in flexing state",
ComponentState.FLEXING, comp.getComponentSpec().getState());
// new container get allocated
context.assignNewContainer(context.attemptId, 10, comp);
// second instance finished upgrading
ComponentInstance instance2 = instanceIter.next();
instance2.handle(new ComponentInstanceEvent(
instance2.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in stable state",
ComponentState.STABLE, comp.getComponentSpec().getState());
Assert.assertEquals("cancel upgrade failed", "val0",
comp.getComponentSpec().getConfiguration().getEnv("key1"));
}
@Test
public void testCancelUpgradeSuccessWhileUpgrading() throws Exception {
String serviceName = "testCancelUpgradeWhileUpgrading";
MockRunningServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
cancelUpgradeWhileUpgrading(context, comp);
// cancel upgrade successful for both instances
for(ComponentInstance instance : comp.getAllComponentInstances()) {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));
}
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in stable state",
ComponentState.STABLE, comp.getComponentSpec().getState());
Assert.assertEquals("component did not upgrade successfully", "val1",
Assert.assertEquals("cancel upgrade failed", "val0",
comp.getComponentSpec().getConfiguration().getEnv("key1"));
}
@Test
public void testCancelUpgradeFailureWhileUpgrading() throws Exception {
String serviceName = "testCancelUpgradeFailureWhileUpgrading";
MockRunningServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
cancelUpgradeWhileUpgrading(context, comp);
// cancel upgrade failed for both instances
for(ComponentInstance instance : comp.getAllComponentInstances()) {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.STOP));
}
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in flexing state",
ComponentState.FLEXING, comp.getComponentSpec().getState());
for (ComponentInstance instance : comp.getAllComponentInstances()) {
// new container get allocated
context.assignNewContainer(context.attemptId, 10, comp);
}
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in stable state",
ComponentState.STABLE, comp.getComponentSpec().getState());
Assert.assertEquals("cancel upgrade failed", "val0",
comp.getComponentSpec().getConfiguration().getEnv("key1"));
}
private void cancelUpgradeWhileUpgrading(
MockRunningServiceContext context, Component comp)
throws Exception {
comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
.setTargetSpec(createSpecWithEnv(context.service.getName(),
comp.getName(), "key1", "val1")).setUpgradeVersion("v0"));
Iterator<ComponentInstance> iter = comp.getAllComponentInstances()
.iterator();
ComponentInstance instance1 = iter.next();
// instance1 is triggered to upgrade
instance1.handle(new ComponentInstanceEvent(
instance1.getContainer().getId(), ComponentInstanceEventType.UPGRADE));
// component upgrade is cancelled
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CANCEL_UPGRADE)
.setTargetSpec(createSpecWithEnv(context.service.getName(),
comp.getName(), "key1",
"val0")).setUpgradeVersion("v0"));
// all instances upgrade is cancelled.
comp.getAllComponentInstances().forEach(instance ->
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.CANCEL_UPGRADE)));
// regular upgrade failed for instance 1
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED).setInstance(instance1)
.setContainerId(instance1.getContainer().getId()));
instance1.handle(new ComponentInstanceEvent(
instance1.getContainer().getId(), STOP));
// component should be in cancel upgrade
Assert.assertEquals(
org.apache.hadoop.yarn.service.component.ComponentState
.CANCEL_UPGRADING, comp.getState());
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in needs upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
Assert.assertEquals(
org.apache.hadoop.yarn.service.component.ComponentState
.CANCEL_UPGRADING, comp.getState());
}
@Test
public void testComponentStateReachesStableStateWithTerminatingComponents()
throws
@ -249,8 +432,6 @@ public class TestComponent {
serviceState);
}
private static org.apache.hadoop.yarn.service.api.records.Component
createSpecWithEnv(String serviceName, String compName, String key,
String val) {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.service.component.instance;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -86,7 +87,7 @@ public class TestComponentInstance {
@Test
public void testContainerReadyAfterUpgrade() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testContainerStarted");
"testContainerReadyAfterUpgrade");
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
upgradeComponent(component);
@ -105,12 +106,186 @@ public class TestComponentInstance {
.getId().toString()).getState());
}
@Test
public void testContainerUpgradeFailed() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testContainerUpgradeFailed");
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
upgradeComponent(component);
ComponentInstance instance = component.getAllComponentInstances().iterator()
.next();
ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
instance.handle(upgradeEvent);
ContainerStatus containerStatus = mock(ContainerStatus.class);
when(containerStatus.getExitStatus()).thenReturn(
ContainerExitStatus.ABORTED);
ComponentInstanceEvent stopEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.STOP)
.setStatus(containerStatus);
// this is the call back from NM for the upgrade
instance.handle(stopEvent);
Assert.assertEquals("instance did not fail", ContainerState.FAILED_UPGRADE,
component.getComponentSpec().getContainer(instance.getContainer()
.getId().toString()).getState());
}
@Test
public void testCancelNothingToUpgrade() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testCancelUpgradeWhenContainerReady");
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
cancelCompUpgrade(component);
ComponentInstance instance = component.getAllComponentInstances().iterator()
.next();
ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.CANCEL_UPGRADE);
instance.handle(cancelEvent);
Assert.assertEquals("instance not ready", ContainerState.READY,
component.getComponentSpec().getContainer(instance.getContainer()
.getId().toString()).getState());
}
@Test
public void testCancelUpgradeFailed() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testCancelUpgradeFailed");
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
cancelCompUpgrade(component);
ComponentInstance instance = component.getAllComponentInstances().iterator()
.next();
ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.CANCEL_UPGRADE);
instance.handle(cancelEvent);
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
ComponentInstanceEventType.STOP));
Assert.assertEquals("instance not init", ComponentInstanceState.INIT,
instance.getState());
}
@Test
public void testCancelAfterCompProcessedCancel() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testCancelAfterCompProcessedCancel");
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
upgradeComponent(component);
cancelCompUpgrade(component);
ComponentInstance instance = component.getAllComponentInstances().iterator()
.next();
ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
instance.handle(upgradeEvent);
Assert.assertEquals("instance should start upgrading",
ContainerState.NEEDS_UPGRADE,
component.getComponentSpec().getContainer(instance.getContainer()
.getId().toString()).getState());
}
@Test
public void testCancelWhileUpgradeWithSuccess() throws Exception {
validateCancelWhileUpgrading(true, true);
}
@Test
public void testCancelWhileUpgradeWithFailure() throws Exception {
validateCancelWhileUpgrading(false, true);
}
@Test
public void testCancelFailedWhileUpgradeWithSuccess() throws Exception {
validateCancelWhileUpgrading(true, false);
}
@Test
public void testCancelFailedWhileUpgradeWithFailure() throws Exception {
validateCancelWhileUpgrading(false, false);
}
private void validateCancelWhileUpgrading(boolean upgradeSuccessful,
boolean cancelUpgradeSuccessful)
throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testCancelWhileUpgrading");
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
upgradeComponent(component);
ComponentInstance instance = component.getAllComponentInstances().iterator()
.next();
ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
instance.handle(upgradeEvent);
Assert.assertEquals("instance should be upgrading",
ContainerState.UPGRADING,
component.getComponentSpec().getContainer(instance.getContainer()
.getId().toString()).getState());
cancelCompUpgrade(component);
ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.CANCEL_UPGRADE);
instance.handle(cancelEvent);
// either upgrade failed or successful
ComponentInstanceEvent readyOrStopEvent = new ComponentInstanceEvent(
instance.getContainer().getId(),
upgradeSuccessful ? ComponentInstanceEventType.BECOME_READY :
ComponentInstanceEventType.STOP);
instance.handle(readyOrStopEvent);
Assert.assertEquals("instance not upgrading", ContainerState.UPGRADING,
component.getComponentSpec().getContainer(instance.getContainer()
.getId().toString()).getState());
// response for cancel received
ComponentInstanceEvent readyOrStopCancel = new ComponentInstanceEvent(
instance.getContainer().getId(),
cancelUpgradeSuccessful ? ComponentInstanceEventType.BECOME_READY :
ComponentInstanceEventType.STOP);
instance.handle(readyOrStopCancel);
if (cancelUpgradeSuccessful) {
Assert.assertEquals("instance not ready", ContainerState.READY,
component.getComponentSpec().getContainer(instance.getContainer()
.getId().toString()).getState());
} else {
Assert.assertEquals("instance not init", ComponentInstanceState.INIT,
instance.getState());
}
}
private void upgradeComponent(Component component) {
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.UPGRADE).setTargetSpec(component.getComponentSpec())
.setUpgradeVersion("v2"));
}
private void cancelCompUpgrade(Component component) {
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.CANCEL_UPGRADE)
.setTargetSpec(component.getComponentSpec())
.setUpgradeVersion("v1"));
}
private Component createComponent(ServiceScheduler scheduler,
org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
restartPolicy, int nSucceededInstances, int nFailedInstances,

View File

@ -0,0 +1,19 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# log4j configuration used during build and unit tests
log4j.rootLogger=info,stdout
log4j.threshold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n

View File

@ -102,6 +102,7 @@ public class ApplicationCLI extends YarnCLI {
public static final String ENABLE_FAST_LAUNCH = "enableFastLaunch";
public static final String UPGRADE_CMD = "upgrade";
public static final String UPGRADE_EXPRESS = "express";
public static final String UPGRADE_CANCEL = "cancel";
public static final String UPGRADE_INITIATE = "initiate";
public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize";
public static final String UPGRADE_FINALIZE = "finalize";
@ -265,6 +266,8 @@ public class ApplicationCLI extends YarnCLI {
opts.addOption(UPGRADE_AUTO_FINALIZE, false, "Works with -upgrade and " +
"-initiate options to initiate the upgrade of the application with " +
"the ability to finalize the upgrade automatically.");
opts.addOption(UPGRADE_CANCEL, false, "Works with -upgrade option to " +
"cancel current upgrade.");
opts.getOption(LAUNCH_CMD).setArgName("Application Name> <File Name");
opts.getOption(LAUNCH_CMD).setArgs(2);
opts.getOption(START_CMD).setArgName("Application Name");
@ -646,7 +649,7 @@ public class ApplicationCLI extends YarnCLI {
} else if (cliParser.hasOption(UPGRADE_CMD)) {
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_EXPRESS,
UPGRADE_INITIATE, UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE,
COMPONENT_INSTS, COMPONENTS, APP_TYPE_CMD)) {
UPGRADE_CANCEL, COMPONENT_INSTS, COMPONENTS, APP_TYPE_CMD)) {
printUsage(title, opts);
return exitCode;
}
@ -697,6 +700,13 @@ public class ApplicationCLI extends YarnCLI {
return exitCode;
}
return client.actionStart(appName);
} else if (cliParser.hasOption(UPGRADE_CANCEL)) {
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
UPGRADE_CANCEL, APP_TYPE_CMD)) {
printUsage(title, opts);
return exitCode;
}
return client.actionCancelUpgrade(appName);
}
} else {
syserr.println("Invalid Command Usage : ");

View File

@ -2143,6 +2143,8 @@ public class TestYarnCLI {
pw.println(" the upgrade of the application");
pw.println(" with the ability to finalize the");
pw.println(" upgrade automatically.");
pw.println(" -cancel Works with -upgrade option to");
pw.println(" cancel current upgrade.");
pw.println(" -changeQueue <Queue Name> Moves application to a new");
pw.println(" queue. ApplicationId can be");
pw.println(" passed using 'appId' option.");

View File

@ -300,4 +300,17 @@ public abstract class AppAdminClient extends CompositeService {
@Unstable
public abstract int actionUpgradeExpress(String appName, File fileName)
throws IOException, YarnException;
/**
* Cancels the upgrade of the service.
*
* @param appName the name of the application
* @return exit code
* @throws IOException
* @throws YarnException
*/
@Public
@Unstable
public abstract int actionCancelUpgrade(String appName) throws IOException,
YarnException;
}

View File

@ -1836,6 +1836,7 @@ public class ContainerManagerImpl extends CompositeService implements
public void reInitializeContainer(ContainerId containerId,
ContainerLaunchContext reInitLaunchContext, boolean autoCommit)
throws YarnException {
LOG.debug("{} requested reinit", containerId);
Container container = preReInitializeOrLocalizeCheck(containerId,
ReInitOp.RE_INIT);
ResourceSet resourceSet = new ResourceSet();