YARN-8080. Add restart policy for YARN services.

Contributed by Suma Shivaprasad
This commit is contained in:
Eric Yang 2018-05-17 17:16:50 -04:00
parent 7802af6e9a
commit 7f083ed869
19 changed files with 1447 additions and 122 deletions

View File

@ -424,6 +424,14 @@ definitions:
items:
type: string
description: A list of quicklink keys defined at the service level, and to be resolved by this component.
restartPolicy:
type: string
description: Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases)
enum:
- ALWAYS
- ON_FAILURE
- NEVER
default: ALWAYS
ReadinessCheck:
description: A check to be performed to determine the readiness of a component instance (a container). If no readiness check is specified, the default readiness check will be used unless the yarn.service.default-readiness-check.enabled configuration property is set to false at the component, service, or system level. The artifact field is currently unsupported but may be implemented in the future, enabling a pluggable helper container to support advanced use cases.
required:

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
@ -130,7 +131,7 @@ public class ClientAMService extends AbstractService
LOG.info("Stop the service by {}", UserGroupInformation.getCurrentUser());
context.scheduler.getDiagnostics()
.append("Stopped by user " + UserGroupInformation.getCurrentUser());
context.scheduler.setGracefulStop();
context.scheduler.setGracefulStop(FinalApplicationStatus.ENDED);
// Stop the service in 2 seconds delay to make sure this rpc call is completed.
// shutdown hook will be executed which will stop AM gracefully.
@ -157,10 +158,10 @@ public class ClientAMService extends AbstractService
public UpgradeServiceResponseProto upgrade(
UpgradeServiceRequestProto request) throws IOException {
try {
context.getServiceManager().processUpgradeRequest(request.getVersion(),
request.getAutoFinalize());
LOG.info("Upgrading service to version {} by {}", request.getVersion(),
UserGroupInformation.getCurrentUser());
context.getServiceManager().processUpgradeRequest(request.getVersion(),
request.getAutoFinalize());
return UpgradeServiceResponseProto.newBuilder().build();
} catch (Exception ex) {
return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage())

View File

@ -56,4 +56,8 @@ public class ServiceContext {
void setServiceManager(ServiceManager serviceManager) {
this.serviceManager = Preconditions.checkNotNull(serviceManager);
}
public Service getService() {
return service;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@ -266,12 +267,24 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
event.setAutoFinalize(true);
}
compsThatNeedUpgrade.forEach(component -> {
ComponentEvent needUpgradeEvent = new ComponentEvent(
component.getName(), ComponentEventType.UPGRADE)
.setTargetSpec(component)
.setUpgradeVersion(event.getVersion());
context.scheduler.getDispatcher().getEventHandler().handle(
needUpgradeEvent);
org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
restartPolicy = component.getRestartPolicy();
final ComponentRestartPolicy restartPolicyHandler =
Component.getRestartPolicyHandler(restartPolicy);
// Do not allow upgrades for components which have NEVER/ON_FAILURE
// restart policy
if (restartPolicyHandler.allowUpgrades()) {
ComponentEvent needUpgradeEvent = new ComponentEvent(
component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
component).setUpgradeVersion(event.getVersion());
context.scheduler.getDispatcher().getEventHandler().handle(
needUpgradeEvent);
} else {
LOG.info("The component {} has a restart "
+ "policy that doesnt allow upgrades {} ", component.getName(),
component.getRestartPolicy().toString());
}
});
} else {
// nothing to upgrade if upgrade auto finalize is requested, trigger a

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink;
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.util.BoundedAppender;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
@ -89,8 +91,10 @@ import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -101,6 +105,10 @@ import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_AFTER_APP_COMPLETION;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
.EXIT_FALSE;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
.EXIT_SUCCESS;
/**
*
@ -158,8 +166,15 @@ public class ServiceScheduler extends CompositeService {
private boolean gracefulStop = false;
private volatile FinalApplicationStatus finalApplicationStatus =
FinalApplicationStatus.ENDED;
// For unit test override since we don't want to terminate UT process.
private ServiceUtils.ProcessTerminationHandler
terminationHandler = new ServiceUtils.ProcessTerminationHandler();
public ServiceScheduler(ServiceContext context) {
super(context.service.getName());
super(context.getService().getName());
this.context = context;
}
@ -256,8 +271,9 @@ public class ServiceScheduler extends CompositeService {
.createAMRMClientAsync(1000, new AMRMClientCallback());
}
protected void setGracefulStop() {
public void setGracefulStop(FinalApplicationStatus applicationStatus) {
this.gracefulStop = true;
this.finalApplicationStatus = applicationStatus;
nmClient.getClient().cleanupRunningContainersOnStop(true);
}
@ -877,4 +893,57 @@ public class ServiceScheduler extends CompositeService {
public boolean hasAtLeastOnePlacementConstraint() {
return hasAtLeastOnePlacementConstraint;
}
/*
* Check if all components of the scheduler finished.
* If all components finished
* (which #failed-instances + #suceeded-instances = #total-n-containers)
* The service will be terminated.
*/
public synchronized void terminateServiceIfAllComponentsFinished() {
boolean shouldTerminate = true;
// Succeeded comps and failed comps, for logging purposes.
Set<String> succeededComponents = new HashSet<>();
Set<String> failedComponents = new HashSet<>();
for (Component comp : getAllComponents().values()) {
ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
if (!restartPolicy.shouldTerminate(comp)) {
shouldTerminate = false;
break;
}
long nFailed = comp.getNumFailedInstances();
if (nFailed > 0) {
failedComponents.add(comp.getName());
} else{
succeededComponents.add(comp.getName());
}
}
if (shouldTerminate) {
LOG.info("All component finished, exiting Service Master... "
+ ", final status=" + (failedComponents.isEmpty() ?
"Succeeded" :
"Failed"));
LOG.info("Succeeded components: [" + org.apache.commons.lang3.StringUtils
.join(succeededComponents, ",") + "]");
LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils
.join(failedComponents, ",") + "]");
if (failedComponents.isEmpty()) {
setGracefulStop(FinalApplicationStatus.SUCCEEDED);
getTerminationHandler().terminate(EXIT_SUCCESS);
} else{
setGracefulStop(FinalApplicationStatus.FAILED);
getTerminationHandler().terminate(EXIT_FALSE);
}
}
}
public ServiceUtils.ProcessTerminationHandler getTerminationHandler() {
return terminationHandler;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.service.api.records;
import com.fasterxml.jackson.annotation.JsonValue;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@ -29,7 +30,9 @@ import java.util.Objects;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlEnum;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlType;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -98,6 +101,74 @@ public class Component implements Serializable {
private List<Container> containers =
Collections.synchronizedList(new ArrayList<Container>());
@JsonProperty("restart_policy")
@XmlElement(name = "restart_policy")
private RestartPolicyEnum restartPolicy = RestartPolicyEnum.ALWAYS;
/**
* Policy of restart component. Including ALWAYS - Long lived components
* (Always restart component instance even if instance exit code &#x3D; 0.);
*
* ON_FAILURE (Only restart component instance if instance exit code !&#x3D;
* 0);
* NEVER (Do not restart in any cases)
*
* @return restartPolicy
**/
@XmlType(name = "restart_policy")
@XmlEnum
public enum RestartPolicyEnum {
ALWAYS("ALWAYS"),
ON_FAILURE("ON_FAILURE"),
NEVER("NEVER");
private String value;
RestartPolicyEnum(String value) {
this.value = value;
}
@Override
@JsonValue
public String toString() {
return value;
}
}
public Component restartPolicy(RestartPolicyEnum restartPolicyEnumVal) {
this.restartPolicy = restartPolicyEnumVal;
return this;
}
/**
* Policy of restart component.
*
* Including
* ALWAYS (Always restart component instance even if instance exit
* code &#x3D; 0);
*
* ON_FAILURE (Only restart component instance if instance exit code !&#x3D;
* 0);
*
* NEVER (Do not restart in any cases)
*
* @return restartPolicy
**/
@ApiModelProperty(value = "Policy of restart component. Including ALWAYS "
+ "(Always restart component even if instance exit code = 0); "
+ "ON_FAILURE (Only restart component if instance exit code != 0); "
+ "NEVER (Do not restart in any cases)")
public RestartPolicyEnum getRestartPolicy() {
return restartPolicy;
}
public void setRestartPolicy(RestartPolicyEnum restartPolicy) {
this.restartPolicy = restartPolicy;
}
/**
* Name of the service component (mandatory).
**/

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.component;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
/**
* Always restart policy allows for restarts for long live components which
* never terminate.
*/
public final class AlwaysRestartPolicy implements ComponentRestartPolicy {
private static AlwaysRestartPolicy INSTANCE = new AlwaysRestartPolicy();
private AlwaysRestartPolicy() {
}
public static AlwaysRestartPolicy getInstance() {
return INSTANCE;
}
@Override public boolean isLongLived() {
return true;
}
/**
* This is always false since these components never terminate
*
* @param component
* @return
*/
@Override public boolean hasCompleted(Component component) {
return false;
}
/**
* This is always false since these components never terminate
*
* @param component
* @return
*/
@Override public boolean hasCompletedSuccessfully(Component component) {
return false;
}
@Override public boolean shouldRelaunchInstance(
ComponentInstance componentInstance, ContainerStatus containerStatus) {
return true;
}
@Override public boolean isReadyForDownStream(Component dependentComponent) {
if (dependentComponent.getNumReadyInstances() < dependentComponent
.getNumDesiredInstances()) {
return false;
}
return true;
}
@Override public boolean allowUpgrades() {
return true;
}
@Override public boolean shouldTerminate(Component component) {
return false;
}
}

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.yarn.service.component;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import static org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -111,6 +114,13 @@ public class Component implements EventHandler<ComponentEvent> {
// The number of containers failed since last reset. This excludes preempted,
// disk_failed containers etc. This will be reset to 0 periodically.
public AtomicInteger currentContainerFailure = new AtomicInteger(0);
//succeeded and Failed instances are Populated only for RestartPolicyEnum
//.ON_FAILURE/NEVER
private Map<String, ComponentInstance> succeededInstances =
new ConcurrentHashMap<>();
private Map<String, ComponentInstance> failedInstances =
new ConcurrentHashMap<>();
private boolean healthThresholdMonitorEnabled = false;
private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
@ -297,7 +307,7 @@ public class Component implements EventHandler<ComponentEvent> {
@Override
public ComponentState transition(Component component,
ComponentEvent event) {
component.setDesiredContainers((int)event.getDesired());
component.setDesiredContainers((int) event.getDesired());
if (!component.areDependenciesReady()) {
LOG.info("[FLEX COMPONENT {}]: Flex deferred because dependencies not"
+ " satisfied.", component.getName());
@ -402,11 +412,37 @@ public class Component implements EventHandler<ComponentEvent> {
}
}
private static ComponentState checkIfStable(Component component) {
@VisibleForTesting
static ComponentState checkIfStable(Component component) {
if (component.getRestartPolicyHandler().isLongLived()) {
return updateStateForLongRunningComponents(component);
} else{
//NEVER/ON_FAILURE
return updateStateForTerminatingComponents(component);
}
}
private static ComponentState updateStateForTerminatingComponents(
Component component) {
if (component.getNumRunningInstances() + component
.getNumSucceededInstances() + component.getNumFailedInstances()
< component.getComponentSpec().getNumberOfContainers()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
return FLEXING;
} else{
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
return STABLE;
}
}
private static ComponentState updateStateForLongRunningComponents(
Component component) {
// if desired == running
if (component.componentMetrics.containersReady.value() == component
.getComponentSpec().getNumberOfContainers() &&
component.numContainersThatNeedUpgrade.get() == 0) {
.getComponentSpec().getNumberOfContainers()
&& component.numContainersThatNeedUpgrade.get() == 0) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
return STABLE;
@ -425,17 +461,41 @@ public class Component implements EventHandler<ComponentEvent> {
// This method should be called whenever there is an increment or decrement
// of a READY state container of a component
public static synchronized void checkAndUpdateComponentState(
//This should not matter for terminating components
private static synchronized void checkAndUpdateComponentState(
Component component, boolean isIncrement) {
org.apache.hadoop.yarn.service.api.records.ComponentState curState =
component.componentSpec.getState();
if (isIncrement) {
// check if all containers are in READY state
if (component.numContainersThatNeedUpgrade.get() == 0 &&
component.componentMetrics.containersReady.value() ==
component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
if (component.getRestartPolicyHandler().isLongLived()) {
if (isIncrement) {
// check if all containers are in READY state
if (component.numContainersThatNeedUpgrade.get() == 0
&& component.componentMetrics.containersReady.value()
== component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
if (curState != component.componentSpec.getState()) {
LOG.info("[COMPONENT {}] state changed from {} -> {}",
component.componentSpec.getName(), curState,
component.componentSpec.getState());
}
// component state change will trigger re-check of service state
component.context.getServiceManager().checkAndUpdateServiceState();
}
} else{
// container moving out of READY state could be because of FLEX down so
// still need to verify the count before changing the component state
if (component.componentMetrics.containersReady.value()
< component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState
.FLEXING);
} else if (component.componentMetrics.containersReady.value()
== component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
}
if (curState != component.componentSpec.getState()) {
LOG.info("[COMPONENT {}] state changed from {} -> {}",
component.componentSpec.getName(), curState,
@ -445,44 +505,38 @@ public class Component implements EventHandler<ComponentEvent> {
component.context.getServiceManager().checkAndUpdateServiceState();
}
} else {
// container moving out of READY state could be because of FLEX down so
// still need to verify the count before changing the component state
if (component.componentMetrics.containersReady
.value() < component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
} else if (component.componentMetrics.containersReady
.value() == component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
}
if (curState != component.componentSpec.getState()) {
LOG.info("[COMPONENT {}] state changed from {} -> {}",
component.componentSpec.getName(), curState,
component.componentSpec.getState());
}
// component state change will trigger re-check of service state
component.context.getServiceManager().checkAndUpdateServiceState();
}
// when the service is stable then the state of component needs to
// transition to stable
component.dispatcher.getEventHandler().handle(new ComponentEvent(
component.getName(), ComponentEventType.CHECK_STABLE));
component.dispatcher.getEventHandler().handle(
new ComponentEvent(component.getName(),
ComponentEventType.CHECK_STABLE));
}
private static class ContainerCompletedTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
component.updateMetrics(event.getStatus());
component.dispatcher.getEventHandler().handle(
new ComponentInstanceEvent(event.getStatus().getContainerId(),
STOP).setStatus(event.getStatus()));
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
if (component.context.service.getState().equals(ServiceState.STABLE)) {
component.getScheduler().getApp().setState(ServiceState.STARTED);
LOG.info("Service def state changed from {} -> {}",
ServiceState.STABLE, ServiceState.STARTED);
new ComponentInstanceEvent(event.getStatus().getContainerId(), STOP)
.setStatus(event.getStatus()));
ComponentRestartPolicy restartPolicy =
component.getRestartPolicyHandler();
if (restartPolicy.shouldRelaunchInstance(event.getInstance(),
event.getStatus())) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
if (component.context.service.getState().equals(ServiceState.STABLE)) {
component.getScheduler().getApp().setState(ServiceState.STARTED);
LOG.info("Service def state changed from {} -> {}",
ServiceState.STABLE, ServiceState.STARTED);
}
}
}
}
@ -725,8 +779,6 @@ public class Component implements EventHandler<ComponentEvent> {
componentMetrics.containersDesired.set(n);
}
private void updateMetrics(ContainerStatus status) {
switch (status.getExitStatus()) {
case SUCCESS:
@ -753,7 +805,7 @@ public class Component implements EventHandler<ComponentEvent> {
String host = scheduler.getLiveInstances().get(status.getContainerId())
.getNodeId().getHost();
failureTracker.incNodeFailure(host);
currentContainerFailure.getAndIncrement() ;
currentContainerFailure.getAndIncrement();
}
}
@ -763,17 +815,18 @@ public class Component implements EventHandler<ComponentEvent> {
return true;
}
for (String dependency : dependencies) {
Component dependentComponent =
scheduler.getAllComponents().get(dependency);
Component dependentComponent = scheduler.getAllComponents().get(
dependency);
if (dependentComponent == null) {
LOG.error("Couldn't find dependency {} for {} (should never happen)",
dependency, getName());
continue;
}
if (dependentComponent.getNumReadyInstances() < dependentComponent
.getNumDesiredInstances()) {
if (!dependentComponent.isReadyForDownstream()) {
LOG.info("[COMPONENT {}]: Dependency {} not satisfied, only {} of {}"
+ " instances are ready.", getName(), dependency,
+ " instances are ready or the dependent component has not "
+ "completed ", getName(), dependency,
dependentComponent.getNumReadyInstances(),
dependentComponent.getNumDesiredInstances());
return false;
@ -782,6 +835,7 @@ public class Component implements EventHandler<ComponentEvent> {
return true;
}
public Map<String, String> getDependencyHostIpTokens() {
Map<String, String> tokens = new HashMap<>();
List<String> dependencies = componentSpec.getDependencies();
@ -955,4 +1009,67 @@ public class Component implements EventHandler<ComponentEvent> {
boolean healthThresholdMonitorEnabled) {
this.healthThresholdMonitorEnabled = healthThresholdMonitorEnabled;
}
public Collection<ComponentInstance> getSucceededInstances() {
return succeededInstances.values();
}
public long getNumSucceededInstances() {
return succeededInstances.size();
}
public long getNumFailedInstances() {
return failedInstances.size();
}
public Collection<ComponentInstance> getFailedInstances() {
return failedInstances.values();
}
public synchronized void markAsSucceeded(ComponentInstance instance) {
removeFailedInstanceIfExists(instance);
succeededInstances.put(instance.getCompInstanceName(), instance);
}
public synchronized void markAsFailed(ComponentInstance instance) {
removeSuccessfulInstanceIfExists(instance);
failedInstances.put(instance.getCompInstanceName(), instance);
}
public boolean removeFailedInstanceIfExists(ComponentInstance instance) {
if (failedInstances.containsKey(instance.getCompInstanceName())) {
failedInstances.remove(instance.getCompInstanceName());
return true;
}
return false;
}
public boolean removeSuccessfulInstanceIfExists(ComponentInstance instance) {
if (succeededInstances.containsKey(instance.getCompInstanceName())) {
succeededInstances.remove(instance.getCompInstanceName());
return true;
}
return false;
}
public boolean isReadyForDownstream() {
return getRestartPolicyHandler().isReadyForDownStream(this);
}
public static ComponentRestartPolicy getRestartPolicyHandler(
RestartPolicyEnum restartPolicyEnum) {
if (RestartPolicyEnum.NEVER == restartPolicyEnum) {
return NeverRestartPolicy.getInstance();
} else if (RestartPolicyEnum.ON_FAILURE == restartPolicyEnum) {
return OnFailureRestartPolicy.getInstance();
} else{
return AlwaysRestartPolicy.getInstance();
}
}
public ComponentRestartPolicy getRestartPolicyHandler() {
RestartPolicyEnum restartPolicyEnum = getComponentSpec().getRestartPolicy();
return getRestartPolicyHandler(restartPolicyEnum);
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.component;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
/**
* Interface for Component Restart policies.
* Which is used to make decisions on termination/restart of components and
* their instances.
*/
public interface ComponentRestartPolicy {
boolean isLongLived();
boolean hasCompleted(Component component);
boolean hasCompletedSuccessfully(Component component);
boolean shouldRelaunchInstance(ComponentInstance componentInstance,
ContainerStatus containerStatus);
boolean isReadyForDownStream(Component component);
boolean allowUpgrades();
boolean shouldTerminate(Component component);
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.component;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
/**
* Policy for components with instances that do not require/support a restart.
*/
public final class NeverRestartPolicy implements ComponentRestartPolicy {
private static NeverRestartPolicy INSTANCE = new NeverRestartPolicy();
private NeverRestartPolicy() {
}
public static NeverRestartPolicy getInstance() {
return INSTANCE;
}
@Override public boolean isLongLived() {
return false;
}
@Override public boolean hasCompleted(Component component) {
if (component.getNumSucceededInstances() + component.getNumFailedInstances()
< component.getNumDesiredInstances()) {
return false;
}
return true;
}
@Override public boolean hasCompletedSuccessfully(Component component) {
if (component.getNumSucceededInstances() == component
.getNumDesiredInstances()) {
return true;
}
return false;
}
@Override public boolean shouldRelaunchInstance(
ComponentInstance componentInstance, ContainerStatus containerStatus) {
return false;
}
@Override public boolean isReadyForDownStream(Component component) {
if (hasCompleted(component)) {
return true;
}
return false;
}
@Override public boolean allowUpgrades() {
return false;
}
@Override public boolean shouldTerminate(Component component) {
long nSucceeded = component.getNumSucceededInstances();
long nFailed = component.getNumFailedInstances();
if (nSucceeded + nFailed < component.getComponentSpec()
.getNumberOfContainers()) {
return false;
}
return true;
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.component;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
/**
* Policy for components that require restarts for instances on failure.
*/
public final class OnFailureRestartPolicy implements ComponentRestartPolicy {
private static OnFailureRestartPolicy INSTANCE = new OnFailureRestartPolicy();
private OnFailureRestartPolicy() {
}
public static OnFailureRestartPolicy getInstance() {
return INSTANCE;
}
@Override public boolean isLongLived() {
return false;
}
@Override public boolean hasCompleted(Component component) {
if (hasCompletedSuccessfully(component)) {
return true;
}
return false;
}
@Override public boolean hasCompletedSuccessfully(Component component) {
if (component.getNumSucceededInstances() == component
.getNumDesiredInstances()) {
return true;
}
return false;
}
@Override public boolean shouldRelaunchInstance(
ComponentInstance componentInstance, ContainerStatus containerStatus) {
if (ComponentInstance.hasContainerFailed(containerStatus)) {
return true;
}
return false;
}
@Override public boolean isReadyForDownStream(Component component) {
if (hasCompletedSuccessfully(component)) {
return true;
}
return false;
}
@Override public boolean allowUpgrades() {
return false;
}
@Override public boolean shouldTerminate(Component component) {
long nSucceeded = component.getNumSucceededInstances();
if (nSucceeded < component.getComponentSpec().getNumberOfContainers()) {
return false;
}
return true;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service.component.instance;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.registry.client.api.RegistryConstants;
@ -25,9 +26,9 @@ import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
@ -96,8 +98,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// This container object is used for rest API query
private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
private static final StateMachineFactory<ComponentInstance,
ComponentInstanceState, ComponentInstanceEventType, ComponentInstanceEvent>
ComponentInstanceState, ComponentInstanceEventType,
ComponentInstanceEvent>
stateMachineFactory =
new StateMachineFactory<ComponentInstance, ComponentInstanceState,
ComponentInstanceEventType, ComponentInstanceEvent>(INIT)
@ -230,6 +234,47 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
}
@VisibleForTesting
static void handleComponentInstanceRelaunch(
ComponentInstance compInstance, ComponentInstanceEvent event) {
Component comp = compInstance.getComponent();
// Do we need to relaunch the service?
boolean hasContainerFailed = hasContainerFailed(event.getStatus());
ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) {
// re-ask the failed container.
comp.requestContainers(1);
comp.reInsertPendingInstance(compInstance);
LOG.info(compInstance.getCompInstanceId()
+ ": {} completed. Reinsert back to pending list and requested " +
"a new container." + System.lineSeparator() +
" exitStatus={}, diagnostics={}.",
event.getContainerId(), event.getStatus().getExitStatus(),
event.getStatus().getDiagnostics());
} else {
// When no relaunch, update component's #succeeded/#failed
// instances.
if (hasContainerFailed) {
comp.markAsFailed(compInstance);
} else {
comp.markAsSucceeded(compInstance);
}
LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ?
" succeeded" :
" failed") + " without retry, exitStatus=" + event.getStatus());
comp.getScheduler().terminateServiceIfAllComponentsFinished();
}
}
public static boolean hasContainerFailed(ContainerStatus containerStatus) {
//Mark conainer as failed if we cant get its exit status i.e null?
return containerStatus == null || containerStatus.getExitStatus() !=
ContainerExitStatus.SUCCESS;
}
private static class ContainerStoppedTransition extends BaseTransition {
// whether the container failed before launched by AM or not.
boolean failedBeforeLaunching = false;
@ -244,9 +289,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
@Override
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
// re-ask the failed container.
Component comp = compInstance.component;
comp.requestContainers(1);
String containerDiag =
compInstance.getCompInstanceId() + ": " + event.getStatus()
.getDiagnostics();
@ -259,7 +303,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
compInstance.component.decContainersReady(true);
}
compInstance.component.decRunningContainers();
boolean shouldExit = false;
// Should we fail (terminate) the service?
boolean shouldFailService = false;
final ServiceScheduler scheduler = comp.getScheduler();
// Check if it exceeds the failure threshold, but only if health threshold
// monitor is not enabled
if (!comp.isHealthThresholdMonitorEnabled()
@ -271,10 +318,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp);
compInstance.diagnostics.append(exitDiag);
// append to global diagnostics that will be reported to RM.
comp.getScheduler().getDiagnostics().append(containerDiag);
comp.getScheduler().getDiagnostics().append(exitDiag);
scheduler.getDiagnostics().append(containerDiag);
scheduler.getDiagnostics().append(exitDiag);
LOG.warn(exitDiag);
shouldExit = true;
shouldFailService = true;
}
if (!failedBeforeLaunching) {
@ -296,25 +343,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
// remove the failed ContainerId -> CompInstance mapping
comp.getScheduler().removeLiveCompInstance(event.getContainerId());
scheduler.removeLiveCompInstance(event.getContainerId());
comp.reInsertPendingInstance(compInstance);
// According to component restart policy, handle container restart
// or finish the service (if all components finished)
handleComponentInstanceRelaunch(compInstance, event);
LOG.info(compInstance.getCompInstanceId()
+ ": {} completed. Reinsert back to pending list and requested " +
"a new container." + System.lineSeparator() +
" exitStatus={}, diagnostics={}.",
event.getContainerId(), event.getStatus().getExitStatus(),
event.getStatus().getDiagnostics());
if (shouldExit) {
// Sleep for 5 seconds in hope that the state can be recorded in ATS.
// in case there's a client polling the comp state, it can be notified.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
LOG.error("Interrupted on sleep while exiting.", e);
}
ExitUtil.terminate(-1);
if (shouldFailService) {
scheduler.getTerminationHandler().terminate(-1);
}
}
}
@ -630,4 +666,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
>>> 32));
return result;
}
@VisibleForTesting public org.apache.hadoop.yarn.service.api.records
.Container getContainerSpec() {
return containerSpec;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
@ -571,4 +572,21 @@ public final class ServiceUtils {
// Fallback to querying the default hostname as we did before.
return InetAddress.getLocalHost().getCanonicalHostName();
}
/**
* Process termination handler - exist with specified exit code after
* waiting a while for ATS state to be in sync.
*/
public static class ProcessTerminationHandler {
public void terminate(int exitCode) {
// Sleep for 5 seconds in hope that the state can be recorded in ATS.
// in case there's a client polling the comp state, it can be notified.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
log.info("Interrupted on sleep while exiting.", e);
}
ExitUtil.terminate(exitCode);
}
}
}

View File

@ -57,6 +57,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
@ -99,8 +101,32 @@ public class ServiceTestUtils {
return exampleApp;
}
// Example service definition
// 2 components, each of which has 2 containers.
public static Service createTerminatingJobExample(String serviceName) {
Service exampleApp = new Service();
exampleApp.setName(serviceName);
exampleApp.setVersion("v1");
exampleApp.addComponent(
createComponent("terminating-comp1", 2, "sleep " + "1000",
Component.RestartPolicyEnum.NEVER, null));
exampleApp.addComponent(
createComponent("terminating-comp2", 2, "sleep 1000",
Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{
add("terminating-comp1");
}}));
exampleApp.addComponent(
createComponent("terminating-comp3", 2, "sleep 1000",
Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{
add("terminating-comp2");
}}));
return exampleApp;
}
public static Component createComponent(String name) {
return createComponent(name, 2L, "sleep 1000");
return createComponent(name, 2L, "sleep 1000",
Component.RestartPolicyEnum.ALWAYS, null);
}
protected static Component createComponent(String name, long numContainers,
@ -116,6 +142,18 @@ public class ServiceTestUtils {
return comp1;
}
protected static Component createComponent(String name, long numContainers,
String command, Component.RestartPolicyEnum restartPolicyEnum,
List<String> dependencies) {
Component comp = createComponent(name, numContainers, command);
comp.setRestartPolicy(restartPolicyEnum);
if (dependencies != null) {
comp.dependencies(dependencies);
}
return comp;
}
public static SliderFileSystem initMockFs() throws IOException {
return initMockFs(null);
}
@ -306,6 +344,12 @@ public class ServiceTestUtils {
return client;
}
public static ServiceManager createServiceManager(ServiceContext context) {
ServiceManager serviceManager = new ServiceManager(context);
context.setServiceManager(serviceManager);
return serviceManager;
}
/**
* Creates a YarnClient for test purposes.
*/

View File

@ -227,14 +227,16 @@ public class TestServiceManager {
}
public static Service createBaseDef(String name) {
return createDef(name, ServiceTestUtils.createExampleApplication());
}
public static Service createDef(String name, Service serviceDef) {
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
Service serviceDef = ServiceTestUtils.createExampleApplication();
serviceDef.setId(applicationId.toString());
serviceDef.setName(name);
serviceDef.setState(ServiceState.STARTED);
Artifact artifact = createTestArtifact("1");
serviceDef.getComponents().forEach(component ->
component.setArtifact(artifact));
return serviceDef;

View File

@ -38,8 +38,10 @@ import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -50,6 +52,7 @@ import java.util.Iterator;
import java.util.Map;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
@ -60,6 +63,9 @@ import static org.mockito.Mockito.when;
*/
public class TestComponent {
private static final int WAIT_MS_PER_LOOP = 1000;
static final Logger LOG = Logger.getLogger(TestComponent.class);
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@ -158,6 +164,57 @@ public class TestComponent {
comp.getComponentSpec().getConfiguration().getEnv("key1"));
}
@Test
public void testComponentStateUpdatesWithTerminatingComponents() throws
Exception {
final String serviceName =
"testComponentStateUpdatesWithTerminatingComponents";
Service testService = ServiceTestUtils.createTerminatingJobExample(
serviceName);
TestServiceManager.createDef(serviceName, testService);
ServiceContext context = createTestContext(rule, testService);
for (Component comp : context.scheduler.getAllComponents().values()) {
Iterator<ComponentInstance> instanceIter = comp.
getAllComponentInstances().iterator();
ComponentInstance componentInstance = instanceIter.next();
Container instanceContainer = componentInstance.getContainer();
Assert.assertEquals(0, comp.getNumSucceededInstances());
Assert.assertEquals(0, comp.getNumFailedInstances());
Assert.assertEquals(2, comp.getNumRunningInstances());
Assert.assertEquals(2, comp.getNumReadyInstances());
Assert.assertEquals(0, comp.getPendingInstances().size());
//stop 1 container
ContainerStatus containerStatus = ContainerStatus.newInstance(
instanceContainer.getId(),
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
"successful", 0);
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus));
componentInstance.handle(
new ComponentInstanceEvent(componentInstance.getContainer().getId(),
ComponentInstanceEventType.STOP).setStatus(containerStatus));
Assert.assertEquals(1, comp.getNumSucceededInstances());
Assert.assertEquals(0, comp.getNumFailedInstances());
Assert.assertEquals(1, comp.getNumRunningInstances());
Assert.assertEquals(1, comp.getNumReadyInstances());
Assert.assertEquals(0, comp.getPendingInstances().size());
org.apache.hadoop.yarn.service.component.ComponentState componentState =
Component.checkIfStable(comp);
Assert.assertEquals(
org.apache.hadoop.yarn.service.component.ComponentState.STABLE,
componentState);
}
}
private static org.apache.hadoop.yarn.service.api.records.Component
createSpecWithEnv(String serviceName, String compName, String key,
String val) {
@ -171,31 +228,38 @@ public class TestComponent {
public static ServiceContext createTestContext(
ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName)
throws Exception {
return createTestContext(fsWatcher,
TestServiceManager.createBaseDef(serviceName));
}
public static ServiceContext createTestContext(
ServiceTestUtils.ServiceFSWatcher fsWatcher, Service serviceDef)
throws Exception {
ServiceContext context = new ServiceContext();
context.service = TestServiceManager.createBaseDef(serviceName);
context.service = serviceDef;
context.fs = fsWatcher.getFs();
ContainerLaunchService mockLaunchService = mock(
ContainerLaunchService.class);
context.scheduler = new ServiceScheduler(context) {
@Override
protected YarnRegistryViewForProviders createYarnRegistryOperations(
@Override protected YarnRegistryViewForProviders
createYarnRegistryOperations(
ServiceContext context, RegistryOperations registryClient) {
return mock(YarnRegistryViewForProviders.class);
}
@Override
public NMClientAsync createNMClient() {
@Override public NMClientAsync createNMClient() {
NMClientAsync nmClientAsync = super.createNMClient();
NMClient nmClient = mock(NMClient.class);
try {
when(nmClient.getContainerStatus(anyObject(), anyObject()))
.thenAnswer((Answer<ContainerStatus>) invocation ->
ContainerStatus.newInstance(
(ContainerId) invocation.getArguments()[0],
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
"", 0));
.thenAnswer(
(Answer<ContainerStatus>) invocation -> ContainerStatus
.newInstance((ContainerId) invocation.getArguments()[0],
org.apache.hadoop.yarn.api.records.ContainerState
.RUNNING,
"", 0));
} catch (YarnException | IOException e) {
throw new RuntimeException(e);
}
@ -203,16 +267,18 @@ public class TestComponent {
return nmClientAsync;
}
@Override
public ContainerLaunchService getContainerLaunchService() {
@Override public ContainerLaunchService getContainerLaunchService() {
return mockLaunchService;
}
};
context.scheduler.init(fsWatcher.getConf());
ServiceTestUtils.createServiceManager(context);
doNothing().when(mockLaunchService).
reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
stabilizeComponents(context);
return context;
}
@ -223,6 +289,8 @@ public class TestComponent {
context.attemptId = attemptId;
Map<String, Component>
componentState = context.scheduler.getAllComponents();
int counter = 0;
for (org.apache.hadoop.yarn.service.api.records.Component componentSpec :
context.service.getComponents()) {
Component component = new org.apache.hadoop.yarn.service.component.
@ -230,9 +298,12 @@ public class TestComponent {
componentState.put(component.getName(), component);
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.FLEX));
for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
assignNewContainer(attemptId, i + 1, context, component);
counter++;
assignNewContainer(attemptId, counter, context, component);
}
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.CHECK_STABLE));
}
@ -241,6 +312,8 @@ public class TestComponent {
private static void assignNewContainer(
ApplicationAttemptId attemptId, long containerNum,
ServiceContext context, Component component) {
Container container = org.apache.hadoop.yarn.api.records.Container
.newInstance(ContainerId.newContainerId(attemptId, containerNum),
NODE_ID, "localhost", null, null,

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.component;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests for ComponentRestartPolicy implementations.
*/
public class TestComponentRestartPolicy {
@Test
public void testAlwaysRestartPolicy() throws Exception {
AlwaysRestartPolicy alwaysRestartPolicy = AlwaysRestartPolicy.getInstance();
Component component = mock(Component.class);
when(component.getNumReadyInstances()).thenReturn(1);
when(component.getNumDesiredInstances()).thenReturn(2);
ComponentInstance instance = mock(ComponentInstance.class);
when(instance.getComponent()).thenReturn(component);
ContainerStatus containerStatus = mock(ContainerStatus.class);
assertEquals(true, alwaysRestartPolicy.isLongLived());
assertEquals(true, alwaysRestartPolicy.allowUpgrades());
assertEquals(false, alwaysRestartPolicy.hasCompleted(component));
assertEquals(false,
alwaysRestartPolicy.hasCompletedSuccessfully(component));
assertEquals(true,
alwaysRestartPolicy.shouldRelaunchInstance(instance, containerStatus));
assertEquals(false, alwaysRestartPolicy.isReadyForDownStream(component));
}
@Test
public void testNeverRestartPolicy() throws Exception {
NeverRestartPolicy restartPolicy = NeverRestartPolicy.getInstance();
Component component = mock(Component.class);
when(component.getNumSucceededInstances()).thenReturn(new Long(1));
when(component.getNumFailedInstances()).thenReturn(new Long(2));
when(component.getNumDesiredInstances()).thenReturn(3);
ComponentInstance instance = mock(ComponentInstance.class);
when(instance.getComponent()).thenReturn(component);
ContainerStatus containerStatus = mock(ContainerStatus.class);
assertEquals(false, restartPolicy.isLongLived());
assertEquals(false, restartPolicy.allowUpgrades());
assertEquals(true, restartPolicy.hasCompleted(component));
assertEquals(false,
restartPolicy.hasCompletedSuccessfully(component));
assertEquals(false,
restartPolicy.shouldRelaunchInstance(instance, containerStatus));
assertEquals(true, restartPolicy.isReadyForDownStream(component));
}
@Test
public void testOnFailureRestartPolicy() throws Exception {
OnFailureRestartPolicy restartPolicy = OnFailureRestartPolicy.getInstance();
Component component = mock(Component.class);
when(component.getNumSucceededInstances()).thenReturn(new Long(3));
when(component.getNumFailedInstances()).thenReturn(new Long(0));
when(component.getNumDesiredInstances()).thenReturn(3);
ComponentInstance instance = mock(ComponentInstance.class);
when(instance.getComponent()).thenReturn(component);
ContainerStatus containerStatus = mock(ContainerStatus.class);
when(containerStatus.getExitStatus()).thenReturn(0);
assertEquals(false, restartPolicy.isLongLived());
assertEquals(false, restartPolicy.allowUpgrades());
assertEquals(true, restartPolicy.hasCompleted(component));
assertEquals(true,
restartPolicy.hasCompletedSuccessfully(component));
assertEquals(false,
restartPolicy.shouldRelaunchInstance(instance, containerStatus));
assertEquals(true, restartPolicy.isReadyForDownStream(component));
when(component.getNumSucceededInstances()).thenReturn(new Long(2));
when(component.getNumFailedInstances()).thenReturn(new Long(1));
when(component.getNumDesiredInstances()).thenReturn(3);
assertEquals(false, restartPolicy.hasCompleted(component));
assertEquals(false,
restartPolicy.hasCompletedSuccessfully(component));
when(containerStatus.getExitStatus()).thenReturn(-1000);
assertEquals(true,
restartPolicy.shouldRelaunchInstance(instance, containerStatus));
assertEquals(false, restartPolicy.isReadyForDownStream(component));
}
}

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -18,56 +18,80 @@
package org.apache.hadoop.yarn.service.component.instance;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.component.TestComponent;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests for {@link ComponentInstance}.
*/
public class TestComponentInstance {
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
@Rule public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testContainerUpgrade() throws Exception {
@Test public void testContainerUpgrade() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testContainerUpgrade");
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
Component component =
context.scheduler.getAllComponents().entrySet().iterator().next()
.getValue();
upgradeComponent(component);
ComponentInstance instance = component.getAllComponentInstances()
.iterator().next();
ComponentInstance instance =
component.getAllComponentInstances().iterator().next();
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
instance.handle(instanceEvent);
Container containerSpec = component.getComponentSpec().getContainer(
instance.getContainer().getId().toString());
Assert.assertEquals("instance not upgrading",
ContainerState.UPGRADING, containerSpec.getState());
Assert.assertEquals("instance not upgrading", ContainerState.UPGRADING,
containerSpec.getState());
}
@Test
public void testContainerReadyAfterUpgrade() throws Exception {
@Test public void testContainerReadyAfterUpgrade() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testContainerStarted");
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
Component component =
context.scheduler.getAllComponents().entrySet().iterator().next()
.getValue();
upgradeComponent(component);
ComponentInstance instance = component.getAllComponentInstances()
.iterator().next();
ComponentInstance instance =
component.getAllComponentInstances().iterator().next();
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
@ -75,14 +99,426 @@ public class TestComponentInstance {
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));
Assert.assertEquals("instance not ready",
ContainerState.READY, instance.getCompSpec().getContainer(
instance.getContainer().getId().toString()).getState());
Assert.assertEquals("instance not ready", ContainerState.READY,
instance.getCompSpec()
.getContainer(instance.getContainer().getId().toString())
.getState());
}
private void upgradeComponent(Component component) {
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.UPGRADE)
.setTargetSpec(component.getComponentSpec()).setUpgradeVersion("v2"));
ComponentEventType.UPGRADE).setTargetSpec(component.getComponentSpec())
.setUpgradeVersion("v2"));
}
private Component createComponent(ServiceScheduler scheduler,
org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
restartPolicy,
int nSucceededInstances, int nFailedInstances, int totalAsk,
int componentId) {
assert (nSucceededInstances + nFailedInstances) <= totalAsk;
Component comp = mock(Component.class);
org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock(
org.apache.hadoop.yarn.service.api.records.Component.class);
when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy);
when(comp.getRestartPolicyHandler()).thenReturn(
Component.getRestartPolicyHandler(restartPolicy));
when(componentSpec.getNumberOfContainers()).thenReturn(
Long.valueOf(totalAsk));
when(comp.getComponentSpec()).thenReturn(componentSpec);
when(comp.getScheduler()).thenReturn(scheduler);
Map<String, ComponentInstance> succeeded = new ConcurrentHashMap<>();
Map<String, ComponentInstance> failed = new ConcurrentHashMap<>();
scheduler.getAllComponents().put("comp" + componentId, comp);
Map<String, ComponentInstance> componentInstances = new HashMap<>();
for (int i = 0; i < nSucceededInstances; i++) {
ComponentInstance componentInstance = createComponentInstance(comp, i);
componentInstances.put(componentInstance.getCompInstanceName(),
componentInstance);
succeeded.put(componentInstance.getCompInstanceName(), componentInstance);
}
for (int i = 0; i < nFailedInstances; i++) {
ComponentInstance componentInstance = createComponentInstance(comp,
i + nSucceededInstances);
componentInstances.put(componentInstance.getCompInstanceName(),
componentInstance);
failed.put(componentInstance.getCompInstanceName(), componentInstance);
}
int delta = totalAsk - nFailedInstances - nSucceededInstances;
for (int i = 0; i < delta; i++) {
ComponentInstance componentInstance = createComponentInstance(comp,
i + nSucceededInstances + nFailedInstances);
componentInstances.put(componentInstance.getCompInstanceName(),
componentInstance);
}
when(comp.getAllComponentInstances()).thenReturn(
componentInstances.values());
when(comp.getSucceededInstances()).thenReturn(succeeded.values());
when(comp.getFailedInstances()).thenReturn(failed.values());
return comp;
}
private Component createComponent(ServiceScheduler scheduler,
org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
restartPolicy,
int totalAsk, int componentId) {
Component comp = mock(Component.class);
org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock(
org.apache.hadoop.yarn.service.api.records.Component.class);
when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy);
when(comp.getRestartPolicyHandler()).thenReturn(
Component.getRestartPolicyHandler(restartPolicy));
when(componentSpec.getNumberOfContainers()).thenReturn(
Long.valueOf(totalAsk));
when(comp.getComponentSpec()).thenReturn(componentSpec);
when(comp.getScheduler()).thenReturn(scheduler);
scheduler.getAllComponents().put("comp" + componentId, comp);
Map<String, ComponentInstance> componentInstances = new HashMap<>();
for (int i = 0; i < totalAsk; i++) {
ComponentInstance componentInstance = createComponentInstance(comp, i);
componentInstances.put(componentInstance.getCompInstanceName(),
componentInstance);
}
when(comp.getAllComponentInstances()).thenReturn(
componentInstances.values());
return comp;
}
private ComponentInstance createComponentInstance(Component component,
int instanceId) {
ComponentInstance componentInstance = mock(ComponentInstance.class);
when(componentInstance.getComponent()).thenReturn(component);
when(componentInstance.getCompInstanceName()).thenReturn(
"compInstance" + instanceId);
ServiceUtils.ProcessTerminationHandler terminationHandler = mock(
ServiceUtils.ProcessTerminationHandler.class);
when(component.getScheduler().getTerminationHandler()).thenReturn(
terminationHandler);
return componentInstance;
}
@Test public void testComponentRestartPolicy() {
Map<String, Component> allComponents = new HashMap<>();
Service mockService = mock(Service.class);
ServiceContext serviceContext = mock(ServiceContext.class);
when(serviceContext.getService()).thenReturn(mockService);
ServiceScheduler serviceSchedulerInstance = new ServiceScheduler(
serviceContext);
ServiceScheduler serviceScheduler = spy(serviceSchedulerInstance);
when(serviceScheduler.getAllComponents()).thenReturn(allComponents);
Mockito.doNothing().when(serviceScheduler).setGracefulStop(
any(FinalApplicationStatus.class));
ComponentInstanceEvent componentInstanceEvent = mock(
ComponentInstanceEvent.class);
ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId
.newInstance(ApplicationId.newInstance(1234L, 1), 1), 1);
ContainerStatus containerStatus = ContainerStatus.newInstance(containerId,
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, "hello", 0);
when(componentInstanceEvent.getStatus()).thenReturn(containerStatus);
// Test case1: one component, one instance, restart policy = ALWAYS, exit=0
Component comp = createComponent(serviceScheduler,
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.ALWAYS,
1, 0, 1, 0);
ComponentInstance componentInstance =
comp.getAllComponentInstances().iterator().next();
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
componentInstanceEvent);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(1)).reInsertPendingInstance(
any(ComponentInstance.class));
verify(serviceScheduler.getTerminationHandler(), never()).terminate(
anyInt());
// Test case2: one component, one instance, restart policy = ALWAYS, exit=1
comp = createComponent(serviceScheduler,
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.ALWAYS,
0, 1, 1, 0);
componentInstance = comp.getAllComponentInstances().iterator().next();
containerStatus.setExitStatus(1);
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
componentInstanceEvent);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(1)).reInsertPendingInstance(
any(ComponentInstance.class));
verify(serviceScheduler.getTerminationHandler(), never()).terminate(
anyInt());
// Test case3: one component, one instance, restart policy = NEVER, exit=0
// Should exit with code=0
comp = createComponent(serviceScheduler,
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.NEVER,
1, 0, 1, 0);
componentInstance = comp.getAllComponentInstances().iterator().next();
containerStatus.setExitStatus(0);
Map<String, ComponentInstance> succeededInstances = new HashMap<>();
succeededInstances.put(componentInstance.getCompInstanceName(),
componentInstance);
when(comp.getSucceededInstances()).thenReturn(succeededInstances.values());
when(comp.getNumSucceededInstances()).thenReturn(new Long(1));
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
componentInstanceEvent);
verify(comp, times(1)).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(0)).reInsertPendingInstance(
any(ComponentInstance.class));
verify(serviceScheduler.getTerminationHandler(), times(1)).terminate(eq(0));
// Test case4: one component, one instance, restart policy = NEVER, exit=1
// Should exit with code=-1
comp = createComponent(serviceScheduler,
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.NEVER,
0, 1, 1, 0);
componentInstance = comp.getAllComponentInstances().iterator().next();
containerStatus.setExitStatus(-1);
when(comp.getNumFailedInstances()).thenReturn(new Long(1));
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
componentInstanceEvent);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, times(1)).markAsFailed(any(ComponentInstance.class));
verify(comp, times(0)).reInsertPendingInstance(
any(ComponentInstance.class));
verify(serviceScheduler.getTerminationHandler(), times(1)).terminate(
eq(-1));
// Test case5: one component, one instance, restart policy = ON_FAILURE,
// exit=1
// Should continue run.
comp = createComponent(serviceScheduler,
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.ON_FAILURE,
0, 1, 1, 0);
componentInstance = comp.getAllComponentInstances().iterator().next();
containerStatus.setExitStatus(1);
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
componentInstanceEvent);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(1)).reInsertPendingInstance(
any(ComponentInstance.class));
verify(serviceScheduler.getTerminationHandler(), times(0)).terminate(
anyInt());
// Test case6: one component, 3 instances, restart policy = NEVER, exit=1
// 2 of the instances not completed, it should continue run.
comp = createComponent(serviceScheduler,
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.NEVER,
0, 1, 3, 0);
componentInstance = comp.getAllComponentInstances().iterator().next();
containerStatus.setExitStatus(1);
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
componentInstanceEvent);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, times(1)).markAsFailed(any(ComponentInstance.class));
verify(comp, times(0)).reInsertPendingInstance(
any(ComponentInstance.class));
verify(serviceScheduler.getTerminationHandler(), times(0)).terminate(
anyInt());
// Test case7: one component, 3 instances, restart policy = ON_FAILURE,
// exit=1
// 2 of the instances completed, it should continue run.
comp = createComponent(serviceScheduler,
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.ON_FAILURE,
0, 1, 3, 0);
Iterator<ComponentInstance> iter =
comp.getAllComponentInstances().iterator();
containerStatus.setExitStatus(1);
ComponentInstance commponentInstance = iter.next();
ComponentInstance.handleComponentInstanceRelaunch(commponentInstance,
componentInstanceEvent);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(1)).reInsertPendingInstance(
any(ComponentInstance.class));
verify(serviceScheduler.getTerminationHandler(), times(0)).terminate(
anyInt());
// Test case8: 2 components, 2 instances for each
// comp2 already finished.
// comp1 has a new instance finish, we should terminate the service
comp = createComponent(serviceScheduler,
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.NEVER,
2, 0);
Collection<ComponentInstance> component1Instances =
comp.getAllComponentInstances();
containerStatus.setExitStatus(-1);
Component comp2 = createComponent(
componentInstance.getComponent().getScheduler(),
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.NEVER,
2, 1);
Collection<ComponentInstance> component2Instances =
comp2.getAllComponentInstances();
Map<String, ComponentInstance> failed2Instances = new HashMap<>();
for (ComponentInstance component2Instance : component2Instances) {
failed2Instances.put(component2Instance.getCompInstanceName(),
component2Instance);
when(component2Instance.getComponent().getFailedInstances()).thenReturn(
failed2Instances.values());
when(component2Instance.getComponent().getNumFailedInstances())
.thenReturn(new Long(failed2Instances.size()));
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
componentInstanceEvent);
}
Map<String, ComponentInstance> failed1Instances = new HashMap<>();
// 2nd component, already finished.
for (ComponentInstance component1Instance : component1Instances) {
failed1Instances.put(component1Instance.getCompInstanceName(),
component1Instance);
when(component1Instance.getComponent().getFailedInstances()).thenReturn(
failed1Instances.values());
when(component1Instance.getComponent().getNumFailedInstances())
.thenReturn(new Long(failed1Instances.size()));
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
componentInstanceEvent);
}
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, times(2)).markAsFailed(any(ComponentInstance.class));
verify(comp, times(0)).reInsertPendingInstance(
any(ComponentInstance.class));
verify(serviceScheduler.getTerminationHandler(), times(1)).terminate(
eq(-1));
// Test case9: 2 components, 2 instances for each
// comp2 already finished.
// comp1 has a new instance finish, we should terminate the service
// All instance finish with 0, service should exit with 0 as well.
containerStatus.setExitStatus(0);
comp = createComponent(serviceScheduler,
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.ON_FAILURE,
2, 0);
component1Instances = comp.getAllComponentInstances();
comp2 = createComponent(componentInstance.getComponent().getScheduler(),
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.ON_FAILURE,
2, 1);
component2Instances = comp2.getAllComponentInstances();
Map<String, ComponentInstance> succeeded2Instances = new HashMap<>();
for (ComponentInstance component2Instance : component2Instances) {
succeeded2Instances.put(component2Instance.getCompInstanceName(),
component2Instance);
when(component2Instance.getComponent().getSucceededInstances())
.thenReturn(succeeded2Instances.values());
when(component2Instance.getComponent().getNumSucceededInstances())
.thenReturn(new Long(succeeded2Instances.size()));
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
componentInstanceEvent);
}
Map<String, ComponentInstance> succeeded1Instances = new HashMap<>();
// 2nd component, already finished.
for (ComponentInstance component1Instance : component1Instances) {
succeeded1Instances.put(component1Instance.getCompInstanceName(),
component1Instance);
when(component1Instance.getComponent().getSucceededInstances())
.thenReturn(succeeded1Instances.values());
when(component1Instance.getComponent().getNumSucceededInstances())
.thenReturn(new Long(succeeded1Instances.size()));
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
componentInstanceEvent);
}
verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance(
any(ComponentInstance.class));
verify(serviceScheduler.getTerminationHandler(), times(1)).terminate(eq(0));
// Test case10: 2 components, 2 instances for each
// comp2 hasn't finished
// comp1 finished.
// Service should continue run.
comp = createComponent(serviceScheduler,
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.NEVER,
2, 0);
component1Instances = comp.getAllComponentInstances();
comp2 = createComponent(componentInstance.getComponent().getScheduler(),
org.apache.hadoop.yarn.service.api.records.Component
.RestartPolicyEnum.NEVER,
2, 1);
component2Instances = comp2.getAllComponentInstances();
for (ComponentInstance component2Instance : component2Instances) {
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
componentInstanceEvent);
}
succeeded1Instances = new HashMap<>();
// 2nd component, already finished.
for (ComponentInstance component1Instance : component1Instances) {
succeeded1Instances.put(component1Instance.getCompInstanceName(),
component1Instance);
when(component1Instance.getComponent().getSucceededInstances())
.thenReturn(succeeded1Instances.values());
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
componentInstanceEvent);
}
verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance(
any(ComponentInstance.class));
verify(serviceScheduler.getTerminationHandler(), never()).terminate(eq(0));
}
}

View File

@ -233,6 +233,8 @@ One or more components of the service. If the service is HBase say, then the com
|placement_policy|Advanced scheduling and placement policies for all containers of this component.|false|PlacementPolicy||
|configuration|Config properties for this component.|false|Configuration||
|quicklinks|A list of quicklink keys defined at the service level, and to be resolved by this component.|false|string array||
|restart_policy|Policy of restart component. Including ALWAYS (Always restart
component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases). Flexing is not supported for components which have restart_policy=ON_FAILURE/NEVER|false|string|ALWAYS|
### ComponentState