YARN-8080. Add restart policy for YARN services.
Contributed by Suma Shivaprasad
(cherry picked from commit 7f083ed869
)
This commit is contained in:
parent
c6c3b0afa0
commit
044573b628
|
@ -424,6 +424,14 @@ definitions:
|
||||||
items:
|
items:
|
||||||
type: string
|
type: string
|
||||||
description: A list of quicklink keys defined at the service level, and to be resolved by this component.
|
description: A list of quicklink keys defined at the service level, and to be resolved by this component.
|
||||||
|
restartPolicy:
|
||||||
|
type: string
|
||||||
|
description: Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases)
|
||||||
|
enum:
|
||||||
|
- ALWAYS
|
||||||
|
- ON_FAILURE
|
||||||
|
- NEVER
|
||||||
|
default: ALWAYS
|
||||||
ReadinessCheck:
|
ReadinessCheck:
|
||||||
description: A check to be performed to determine the readiness of a component instance (a container). If no readiness check is specified, the default readiness check will be used unless the yarn.service.default-readiness-check.enabled configuration property is set to false at the component, service, or system level. The artifact field is currently unsupported but may be implemented in the future, enabling a pluggable helper container to support advanced use cases.
|
description: A check to be performed to determine the readiness of a component instance (a container). If no readiness check is specified, the default readiness check will be used unless the yarn.service.default-readiness-check.enabled configuration property is set to false at the component, service, or system level. The artifact field is currently unsupported but may be implemented in the future, enabling a pluggable helper container to support advanced use cases.
|
||||||
required:
|
required:
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.ExitUtil;
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
|
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
|
||||||
|
@ -130,7 +131,7 @@ public class ClientAMService extends AbstractService
|
||||||
LOG.info("Stop the service by {}", UserGroupInformation.getCurrentUser());
|
LOG.info("Stop the service by {}", UserGroupInformation.getCurrentUser());
|
||||||
context.scheduler.getDiagnostics()
|
context.scheduler.getDiagnostics()
|
||||||
.append("Stopped by user " + UserGroupInformation.getCurrentUser());
|
.append("Stopped by user " + UserGroupInformation.getCurrentUser());
|
||||||
context.scheduler.setGracefulStop();
|
context.scheduler.setGracefulStop(FinalApplicationStatus.ENDED);
|
||||||
|
|
||||||
// Stop the service in 2 seconds delay to make sure this rpc call is completed.
|
// Stop the service in 2 seconds delay to make sure this rpc call is completed.
|
||||||
// shutdown hook will be executed which will stop AM gracefully.
|
// shutdown hook will be executed which will stop AM gracefully.
|
||||||
|
@ -157,10 +158,10 @@ public class ClientAMService extends AbstractService
|
||||||
public UpgradeServiceResponseProto upgrade(
|
public UpgradeServiceResponseProto upgrade(
|
||||||
UpgradeServiceRequestProto request) throws IOException {
|
UpgradeServiceRequestProto request) throws IOException {
|
||||||
try {
|
try {
|
||||||
context.getServiceManager().processUpgradeRequest(request.getVersion(),
|
|
||||||
request.getAutoFinalize());
|
|
||||||
LOG.info("Upgrading service to version {} by {}", request.getVersion(),
|
LOG.info("Upgrading service to version {} by {}", request.getVersion(),
|
||||||
UserGroupInformation.getCurrentUser());
|
UserGroupInformation.getCurrentUser());
|
||||||
|
context.getServiceManager().processUpgradeRequest(request.getVersion(),
|
||||||
|
request.getAutoFinalize());
|
||||||
return UpgradeServiceResponseProto.newBuilder().build();
|
return UpgradeServiceResponseProto.newBuilder().build();
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage())
|
return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage())
|
||||||
|
|
|
@ -56,4 +56,8 @@ public class ServiceContext {
|
||||||
void setServiceManager(ServiceManager serviceManager) {
|
void setServiceManager(ServiceManager serviceManager) {
|
||||||
this.serviceManager = Preconditions.checkNotNull(serviceManager);
|
this.serviceManager = Preconditions.checkNotNull(serviceManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Service getService() {
|
||||||
|
return service;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||||
import org.apache.hadoop.yarn.service.component.Component;
|
import org.apache.hadoop.yarn.service.component.Component;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
||||||
|
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
|
||||||
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
||||||
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
|
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
|
||||||
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
|
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
|
||||||
|
@ -266,12 +267,24 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
|
||||||
event.setAutoFinalize(true);
|
event.setAutoFinalize(true);
|
||||||
}
|
}
|
||||||
compsThatNeedUpgrade.forEach(component -> {
|
compsThatNeedUpgrade.forEach(component -> {
|
||||||
ComponentEvent needUpgradeEvent = new ComponentEvent(
|
org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
|
||||||
component.getName(), ComponentEventType.UPGRADE)
|
restartPolicy = component.getRestartPolicy();
|
||||||
.setTargetSpec(component)
|
|
||||||
.setUpgradeVersion(event.getVersion());
|
final ComponentRestartPolicy restartPolicyHandler =
|
||||||
context.scheduler.getDispatcher().getEventHandler().handle(
|
Component.getRestartPolicyHandler(restartPolicy);
|
||||||
needUpgradeEvent);
|
// Do not allow upgrades for components which have NEVER/ON_FAILURE
|
||||||
|
// restart policy
|
||||||
|
if (restartPolicyHandler.allowUpgrades()) {
|
||||||
|
ComponentEvent needUpgradeEvent = new ComponentEvent(
|
||||||
|
component.getName(), ComponentEventType.UPGRADE).setTargetSpec(
|
||||||
|
component).setUpgradeVersion(event.getVersion());
|
||||||
|
context.scheduler.getDispatcher().getEventHandler().handle(
|
||||||
|
needUpgradeEvent);
|
||||||
|
} else {
|
||||||
|
LOG.info("The component {} has a restart "
|
||||||
|
+ "policy that doesnt allow upgrades {} ", component.getName(),
|
||||||
|
component.getRestartPolicy().toString());
|
||||||
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
// nothing to upgrade if upgrade auto finalize is requested, trigger a
|
// nothing to upgrade if upgrade auto finalize is requested, trigger a
|
||||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||||
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||||
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
|
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
|
||||||
|
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
||||||
|
@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink;
|
||||||
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
|
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
|
||||||
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
||||||
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
|
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
|
||||||
|
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
||||||
import org.apache.hadoop.yarn.util.BoundedAppender;
|
import org.apache.hadoop.yarn.util.BoundedAppender;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -89,8 +91,10 @@ import java.nio.ByteBuffer;
|
||||||
import java.text.MessageFormat;
|
import java.text.MessageFormat;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
@ -101,6 +105,10 @@ import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
|
||||||
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_AFTER_APP_COMPLETION;
|
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_AFTER_APP_COMPLETION;
|
||||||
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
|
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
|
||||||
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
|
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
|
||||||
|
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
|
||||||
|
.EXIT_FALSE;
|
||||||
|
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
|
||||||
|
.EXIT_SUCCESS;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -158,8 +166,15 @@ public class ServiceScheduler extends CompositeService {
|
||||||
|
|
||||||
private boolean gracefulStop = false;
|
private boolean gracefulStop = false;
|
||||||
|
|
||||||
|
private volatile FinalApplicationStatus finalApplicationStatus =
|
||||||
|
FinalApplicationStatus.ENDED;
|
||||||
|
|
||||||
|
// For unit test override since we don't want to terminate UT process.
|
||||||
|
private ServiceUtils.ProcessTerminationHandler
|
||||||
|
terminationHandler = new ServiceUtils.ProcessTerminationHandler();
|
||||||
|
|
||||||
public ServiceScheduler(ServiceContext context) {
|
public ServiceScheduler(ServiceContext context) {
|
||||||
super(context.service.getName());
|
super(context.getService().getName());
|
||||||
this.context = context;
|
this.context = context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -256,8 +271,9 @@ public class ServiceScheduler extends CompositeService {
|
||||||
.createAMRMClientAsync(1000, new AMRMClientCallback());
|
.createAMRMClientAsync(1000, new AMRMClientCallback());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setGracefulStop() {
|
public void setGracefulStop(FinalApplicationStatus applicationStatus) {
|
||||||
this.gracefulStop = true;
|
this.gracefulStop = true;
|
||||||
|
this.finalApplicationStatus = applicationStatus;
|
||||||
nmClient.getClient().cleanupRunningContainersOnStop(true);
|
nmClient.getClient().cleanupRunningContainersOnStop(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -877,4 +893,57 @@ public class ServiceScheduler extends CompositeService {
|
||||||
public boolean hasAtLeastOnePlacementConstraint() {
|
public boolean hasAtLeastOnePlacementConstraint() {
|
||||||
return hasAtLeastOnePlacementConstraint;
|
return hasAtLeastOnePlacementConstraint;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check if all components of the scheduler finished.
|
||||||
|
* If all components finished
|
||||||
|
* (which #failed-instances + #suceeded-instances = #total-n-containers)
|
||||||
|
* The service will be terminated.
|
||||||
|
*/
|
||||||
|
public synchronized void terminateServiceIfAllComponentsFinished() {
|
||||||
|
boolean shouldTerminate = true;
|
||||||
|
|
||||||
|
// Succeeded comps and failed comps, for logging purposes.
|
||||||
|
Set<String> succeededComponents = new HashSet<>();
|
||||||
|
Set<String> failedComponents = new HashSet<>();
|
||||||
|
|
||||||
|
for (Component comp : getAllComponents().values()) {
|
||||||
|
ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
|
||||||
|
if (!restartPolicy.shouldTerminate(comp)) {
|
||||||
|
shouldTerminate = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
long nFailed = comp.getNumFailedInstances();
|
||||||
|
|
||||||
|
if (nFailed > 0) {
|
||||||
|
failedComponents.add(comp.getName());
|
||||||
|
} else{
|
||||||
|
succeededComponents.add(comp.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shouldTerminate) {
|
||||||
|
LOG.info("All component finished, exiting Service Master... "
|
||||||
|
+ ", final status=" + (failedComponents.isEmpty() ?
|
||||||
|
"Succeeded" :
|
||||||
|
"Failed"));
|
||||||
|
LOG.info("Succeeded components: [" + org.apache.commons.lang3.StringUtils
|
||||||
|
.join(succeededComponents, ",") + "]");
|
||||||
|
LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils
|
||||||
|
.join(failedComponents, ",") + "]");
|
||||||
|
|
||||||
|
if (failedComponents.isEmpty()) {
|
||||||
|
setGracefulStop(FinalApplicationStatus.SUCCEEDED);
|
||||||
|
getTerminationHandler().terminate(EXIT_SUCCESS);
|
||||||
|
} else{
|
||||||
|
setGracefulStop(FinalApplicationStatus.FAILED);
|
||||||
|
getTerminationHandler().terminate(EXIT_FALSE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public ServiceUtils.ProcessTerminationHandler getTerminationHandler() {
|
||||||
|
return terminationHandler;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.service.api.records;
|
package org.apache.hadoop.yarn.service.api.records;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonValue;
|
||||||
import io.swagger.annotations.ApiModel;
|
import io.swagger.annotations.ApiModel;
|
||||||
import io.swagger.annotations.ApiModelProperty;
|
import io.swagger.annotations.ApiModelProperty;
|
||||||
|
|
||||||
|
@ -29,7 +30,9 @@ import java.util.Objects;
|
||||||
import javax.xml.bind.annotation.XmlAccessType;
|
import javax.xml.bind.annotation.XmlAccessType;
|
||||||
import javax.xml.bind.annotation.XmlAccessorType;
|
import javax.xml.bind.annotation.XmlAccessorType;
|
||||||
import javax.xml.bind.annotation.XmlElement;
|
import javax.xml.bind.annotation.XmlElement;
|
||||||
|
import javax.xml.bind.annotation.XmlEnum;
|
||||||
import javax.xml.bind.annotation.XmlRootElement;
|
import javax.xml.bind.annotation.XmlRootElement;
|
||||||
|
import javax.xml.bind.annotation.XmlType;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
@ -98,6 +101,74 @@ public class Component implements Serializable {
|
||||||
private List<Container> containers =
|
private List<Container> containers =
|
||||||
Collections.synchronizedList(new ArrayList<Container>());
|
Collections.synchronizedList(new ArrayList<Container>());
|
||||||
|
|
||||||
|
|
||||||
|
@JsonProperty("restart_policy")
|
||||||
|
@XmlElement(name = "restart_policy")
|
||||||
|
private RestartPolicyEnum restartPolicy = RestartPolicyEnum.ALWAYS;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Policy of restart component. Including ALWAYS - Long lived components
|
||||||
|
* (Always restart component instance even if instance exit code = 0.);
|
||||||
|
*
|
||||||
|
* ON_FAILURE (Only restart component instance if instance exit code !=
|
||||||
|
* 0);
|
||||||
|
* NEVER (Do not restart in any cases)
|
||||||
|
*
|
||||||
|
* @return restartPolicy
|
||||||
|
**/
|
||||||
|
@XmlType(name = "restart_policy")
|
||||||
|
@XmlEnum
|
||||||
|
public enum RestartPolicyEnum {
|
||||||
|
ALWAYS("ALWAYS"),
|
||||||
|
|
||||||
|
ON_FAILURE("ON_FAILURE"),
|
||||||
|
|
||||||
|
NEVER("NEVER");
|
||||||
|
private String value;
|
||||||
|
|
||||||
|
RestartPolicyEnum(String value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@JsonValue
|
||||||
|
public String toString() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Component restartPolicy(RestartPolicyEnum restartPolicyEnumVal) {
|
||||||
|
this.restartPolicy = restartPolicyEnumVal;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Policy of restart component.
|
||||||
|
*
|
||||||
|
* Including
|
||||||
|
* ALWAYS (Always restart component instance even if instance exit
|
||||||
|
* code = 0);
|
||||||
|
*
|
||||||
|
* ON_FAILURE (Only restart component instance if instance exit code !=
|
||||||
|
* 0);
|
||||||
|
*
|
||||||
|
* NEVER (Do not restart in any cases)
|
||||||
|
*
|
||||||
|
* @return restartPolicy
|
||||||
|
**/
|
||||||
|
@ApiModelProperty(value = "Policy of restart component. Including ALWAYS "
|
||||||
|
+ "(Always restart component even if instance exit code = 0); "
|
||||||
|
+ "ON_FAILURE (Only restart component if instance exit code != 0); "
|
||||||
|
+ "NEVER (Do not restart in any cases)")
|
||||||
|
public RestartPolicyEnum getRestartPolicy() {
|
||||||
|
return restartPolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRestartPolicy(RestartPolicyEnum restartPolicy) {
|
||||||
|
this.restartPolicy = restartPolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Name of the service component (mandatory).
|
* Name of the service component (mandatory).
|
||||||
**/
|
**/
|
||||||
|
|
|
@ -0,0 +1,82 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.service.component;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Always restart policy allows for restarts for long live components which
|
||||||
|
* never terminate.
|
||||||
|
*/
|
||||||
|
public final class AlwaysRestartPolicy implements ComponentRestartPolicy {
|
||||||
|
|
||||||
|
private static AlwaysRestartPolicy INSTANCE = new AlwaysRestartPolicy();
|
||||||
|
|
||||||
|
private AlwaysRestartPolicy() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static AlwaysRestartPolicy getInstance() {
|
||||||
|
return INSTANCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isLongLived() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is always false since these components never terminate
|
||||||
|
*
|
||||||
|
* @param component
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@Override public boolean hasCompleted(Component component) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is always false since these components never terminate
|
||||||
|
*
|
||||||
|
* @param component
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@Override public boolean hasCompletedSuccessfully(Component component) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean shouldRelaunchInstance(
|
||||||
|
ComponentInstance componentInstance, ContainerStatus containerStatus) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isReadyForDownStream(Component dependentComponent) {
|
||||||
|
if (dependentComponent.getNumReadyInstances() < dependentComponent
|
||||||
|
.getNumDesiredInstances()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean allowUpgrades() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean shouldTerminate(Component component) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,9 +18,12 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.service.component;
|
package org.apache.hadoop.yarn.service.component;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
|
import static org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
@ -111,6 +114,13 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
// The number of containers failed since last reset. This excludes preempted,
|
// The number of containers failed since last reset. This excludes preempted,
|
||||||
// disk_failed containers etc. This will be reset to 0 periodically.
|
// disk_failed containers etc. This will be reset to 0 periodically.
|
||||||
public AtomicInteger currentContainerFailure = new AtomicInteger(0);
|
public AtomicInteger currentContainerFailure = new AtomicInteger(0);
|
||||||
|
|
||||||
|
//succeeded and Failed instances are Populated only for RestartPolicyEnum
|
||||||
|
//.ON_FAILURE/NEVER
|
||||||
|
private Map<String, ComponentInstance> succeededInstances =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
private Map<String, ComponentInstance> failedInstances =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
private boolean healthThresholdMonitorEnabled = false;
|
private boolean healthThresholdMonitorEnabled = false;
|
||||||
|
|
||||||
private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
|
private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
|
||||||
|
@ -297,7 +307,7 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
@Override
|
@Override
|
||||||
public ComponentState transition(Component component,
|
public ComponentState transition(Component component,
|
||||||
ComponentEvent event) {
|
ComponentEvent event) {
|
||||||
component.setDesiredContainers((int)event.getDesired());
|
component.setDesiredContainers((int) event.getDesired());
|
||||||
if (!component.areDependenciesReady()) {
|
if (!component.areDependenciesReady()) {
|
||||||
LOG.info("[FLEX COMPONENT {}]: Flex deferred because dependencies not"
|
LOG.info("[FLEX COMPONENT {}]: Flex deferred because dependencies not"
|
||||||
+ " satisfied.", component.getName());
|
+ " satisfied.", component.getName());
|
||||||
|
@ -402,11 +412,37 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ComponentState checkIfStable(Component component) {
|
@VisibleForTesting
|
||||||
|
static ComponentState checkIfStable(Component component) {
|
||||||
|
if (component.getRestartPolicyHandler().isLongLived()) {
|
||||||
|
return updateStateForLongRunningComponents(component);
|
||||||
|
} else{
|
||||||
|
//NEVER/ON_FAILURE
|
||||||
|
return updateStateForTerminatingComponents(component);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ComponentState updateStateForTerminatingComponents(
|
||||||
|
Component component) {
|
||||||
|
if (component.getNumRunningInstances() + component
|
||||||
|
.getNumSucceededInstances() + component.getNumFailedInstances()
|
||||||
|
< component.getComponentSpec().getNumberOfContainers()) {
|
||||||
|
component.componentSpec.setState(
|
||||||
|
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
||||||
|
return FLEXING;
|
||||||
|
} else{
|
||||||
|
component.componentSpec.setState(
|
||||||
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
||||||
|
return STABLE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ComponentState updateStateForLongRunningComponents(
|
||||||
|
Component component) {
|
||||||
// if desired == running
|
// if desired == running
|
||||||
if (component.componentMetrics.containersReady.value() == component
|
if (component.componentMetrics.containersReady.value() == component
|
||||||
.getComponentSpec().getNumberOfContainers() &&
|
.getComponentSpec().getNumberOfContainers()
|
||||||
component.numContainersThatNeedUpgrade.get() == 0) {
|
&& component.numContainersThatNeedUpgrade.get() == 0) {
|
||||||
component.componentSpec.setState(
|
component.componentSpec.setState(
|
||||||
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
||||||
return STABLE;
|
return STABLE;
|
||||||
|
@ -425,17 +461,41 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
|
|
||||||
// This method should be called whenever there is an increment or decrement
|
// This method should be called whenever there is an increment or decrement
|
||||||
// of a READY state container of a component
|
// of a READY state container of a component
|
||||||
public static synchronized void checkAndUpdateComponentState(
|
//This should not matter for terminating components
|
||||||
|
private static synchronized void checkAndUpdateComponentState(
|
||||||
Component component, boolean isIncrement) {
|
Component component, boolean isIncrement) {
|
||||||
org.apache.hadoop.yarn.service.api.records.ComponentState curState =
|
org.apache.hadoop.yarn.service.api.records.ComponentState curState =
|
||||||
component.componentSpec.getState();
|
component.componentSpec.getState();
|
||||||
if (isIncrement) {
|
|
||||||
// check if all containers are in READY state
|
if (component.getRestartPolicyHandler().isLongLived()) {
|
||||||
if (component.numContainersThatNeedUpgrade.get() == 0 &&
|
if (isIncrement) {
|
||||||
component.componentMetrics.containersReady.value() ==
|
// check if all containers are in READY state
|
||||||
component.componentMetrics.containersDesired.value()) {
|
if (component.numContainersThatNeedUpgrade.get() == 0
|
||||||
component.componentSpec.setState(
|
&& component.componentMetrics.containersReady.value()
|
||||||
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
== component.componentMetrics.containersDesired.value()) {
|
||||||
|
component.componentSpec.setState(
|
||||||
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
||||||
|
if (curState != component.componentSpec.getState()) {
|
||||||
|
LOG.info("[COMPONENT {}] state changed from {} -> {}",
|
||||||
|
component.componentSpec.getName(), curState,
|
||||||
|
component.componentSpec.getState());
|
||||||
|
}
|
||||||
|
// component state change will trigger re-check of service state
|
||||||
|
component.context.getServiceManager().checkAndUpdateServiceState();
|
||||||
|
}
|
||||||
|
} else{
|
||||||
|
// container moving out of READY state could be because of FLEX down so
|
||||||
|
// still need to verify the count before changing the component state
|
||||||
|
if (component.componentMetrics.containersReady.value()
|
||||||
|
< component.componentMetrics.containersDesired.value()) {
|
||||||
|
component.componentSpec.setState(
|
||||||
|
org.apache.hadoop.yarn.service.api.records.ComponentState
|
||||||
|
.FLEXING);
|
||||||
|
} else if (component.componentMetrics.containersReady.value()
|
||||||
|
== component.componentMetrics.containersDesired.value()) {
|
||||||
|
component.componentSpec.setState(
|
||||||
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
||||||
|
}
|
||||||
if (curState != component.componentSpec.getState()) {
|
if (curState != component.componentSpec.getState()) {
|
||||||
LOG.info("[COMPONENT {}] state changed from {} -> {}",
|
LOG.info("[COMPONENT {}] state changed from {} -> {}",
|
||||||
component.componentSpec.getName(), curState,
|
component.componentSpec.getName(), curState,
|
||||||
|
@ -445,44 +505,38 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
component.context.getServiceManager().checkAndUpdateServiceState();
|
component.context.getServiceManager().checkAndUpdateServiceState();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// container moving out of READY state could be because of FLEX down so
|
|
||||||
// still need to verify the count before changing the component state
|
|
||||||
if (component.componentMetrics.containersReady
|
|
||||||
.value() < component.componentMetrics.containersDesired.value()) {
|
|
||||||
component.componentSpec.setState(
|
|
||||||
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
|
||||||
} else if (component.componentMetrics.containersReady
|
|
||||||
.value() == component.componentMetrics.containersDesired.value()) {
|
|
||||||
component.componentSpec.setState(
|
|
||||||
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
|
||||||
}
|
|
||||||
if (curState != component.componentSpec.getState()) {
|
|
||||||
LOG.info("[COMPONENT {}] state changed from {} -> {}",
|
|
||||||
component.componentSpec.getName(), curState,
|
|
||||||
component.componentSpec.getState());
|
|
||||||
}
|
|
||||||
// component state change will trigger re-check of service state
|
// component state change will trigger re-check of service state
|
||||||
component.context.getServiceManager().checkAndUpdateServiceState();
|
component.context.getServiceManager().checkAndUpdateServiceState();
|
||||||
}
|
}
|
||||||
// when the service is stable then the state of component needs to
|
// when the service is stable then the state of component needs to
|
||||||
// transition to stable
|
// transition to stable
|
||||||
component.dispatcher.getEventHandler().handle(new ComponentEvent(
|
component.dispatcher.getEventHandler().handle(
|
||||||
component.getName(), ComponentEventType.CHECK_STABLE));
|
new ComponentEvent(component.getName(),
|
||||||
|
ComponentEventType.CHECK_STABLE));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ContainerCompletedTransition extends BaseTransition {
|
private static class ContainerCompletedTransition extends BaseTransition {
|
||||||
@Override
|
@Override
|
||||||
public void transition(Component component, ComponentEvent event) {
|
public void transition(Component component, ComponentEvent event) {
|
||||||
|
|
||||||
component.updateMetrics(event.getStatus());
|
component.updateMetrics(event.getStatus());
|
||||||
component.dispatcher.getEventHandler().handle(
|
component.dispatcher.getEventHandler().handle(
|
||||||
new ComponentInstanceEvent(event.getStatus().getContainerId(),
|
new ComponentInstanceEvent(event.getStatus().getContainerId(), STOP)
|
||||||
STOP).setStatus(event.getStatus()));
|
.setStatus(event.getStatus()));
|
||||||
component.componentSpec.setState(
|
|
||||||
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
ComponentRestartPolicy restartPolicy =
|
||||||
if (component.context.service.getState().equals(ServiceState.STABLE)) {
|
component.getRestartPolicyHandler();
|
||||||
component.getScheduler().getApp().setState(ServiceState.STARTED);
|
|
||||||
LOG.info("Service def state changed from {} -> {}",
|
if (restartPolicy.shouldRelaunchInstance(event.getInstance(),
|
||||||
ServiceState.STABLE, ServiceState.STARTED);
|
event.getStatus())) {
|
||||||
|
component.componentSpec.setState(
|
||||||
|
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
||||||
|
|
||||||
|
if (component.context.service.getState().equals(ServiceState.STABLE)) {
|
||||||
|
component.getScheduler().getApp().setState(ServiceState.STARTED);
|
||||||
|
LOG.info("Service def state changed from {} -> {}",
|
||||||
|
ServiceState.STABLE, ServiceState.STARTED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -725,8 +779,6 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
componentMetrics.containersDesired.set(n);
|
componentMetrics.containersDesired.set(n);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private void updateMetrics(ContainerStatus status) {
|
private void updateMetrics(ContainerStatus status) {
|
||||||
switch (status.getExitStatus()) {
|
switch (status.getExitStatus()) {
|
||||||
case SUCCESS:
|
case SUCCESS:
|
||||||
|
@ -753,7 +805,7 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
String host = scheduler.getLiveInstances().get(status.getContainerId())
|
String host = scheduler.getLiveInstances().get(status.getContainerId())
|
||||||
.getNodeId().getHost();
|
.getNodeId().getHost();
|
||||||
failureTracker.incNodeFailure(host);
|
failureTracker.incNodeFailure(host);
|
||||||
currentContainerFailure.getAndIncrement() ;
|
currentContainerFailure.getAndIncrement();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -763,17 +815,18 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
for (String dependency : dependencies) {
|
for (String dependency : dependencies) {
|
||||||
Component dependentComponent =
|
Component dependentComponent = scheduler.getAllComponents().get(
|
||||||
scheduler.getAllComponents().get(dependency);
|
dependency);
|
||||||
if (dependentComponent == null) {
|
if (dependentComponent == null) {
|
||||||
LOG.error("Couldn't find dependency {} for {} (should never happen)",
|
LOG.error("Couldn't find dependency {} for {} (should never happen)",
|
||||||
dependency, getName());
|
dependency, getName());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (dependentComponent.getNumReadyInstances() < dependentComponent
|
|
||||||
.getNumDesiredInstances()) {
|
if (!dependentComponent.isReadyForDownstream()) {
|
||||||
LOG.info("[COMPONENT {}]: Dependency {} not satisfied, only {} of {}"
|
LOG.info("[COMPONENT {}]: Dependency {} not satisfied, only {} of {}"
|
||||||
+ " instances are ready.", getName(), dependency,
|
+ " instances are ready or the dependent component has not "
|
||||||
|
+ "completed ", getName(), dependency,
|
||||||
dependentComponent.getNumReadyInstances(),
|
dependentComponent.getNumReadyInstances(),
|
||||||
dependentComponent.getNumDesiredInstances());
|
dependentComponent.getNumDesiredInstances());
|
||||||
return false;
|
return false;
|
||||||
|
@ -782,6 +835,7 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Map<String, String> getDependencyHostIpTokens() {
|
public Map<String, String> getDependencyHostIpTokens() {
|
||||||
Map<String, String> tokens = new HashMap<>();
|
Map<String, String> tokens = new HashMap<>();
|
||||||
List<String> dependencies = componentSpec.getDependencies();
|
List<String> dependencies = componentSpec.getDependencies();
|
||||||
|
@ -955,4 +1009,67 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
boolean healthThresholdMonitorEnabled) {
|
boolean healthThresholdMonitorEnabled) {
|
||||||
this.healthThresholdMonitorEnabled = healthThresholdMonitorEnabled;
|
this.healthThresholdMonitorEnabled = healthThresholdMonitorEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Collection<ComponentInstance> getSucceededInstances() {
|
||||||
|
return succeededInstances.values();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getNumSucceededInstances() {
|
||||||
|
return succeededInstances.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getNumFailedInstances() {
|
||||||
|
return failedInstances.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Collection<ComponentInstance> getFailedInstances() {
|
||||||
|
return failedInstances.values();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void markAsSucceeded(ComponentInstance instance) {
|
||||||
|
removeFailedInstanceIfExists(instance);
|
||||||
|
succeededInstances.put(instance.getCompInstanceName(), instance);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void markAsFailed(ComponentInstance instance) {
|
||||||
|
removeSuccessfulInstanceIfExists(instance);
|
||||||
|
failedInstances.put(instance.getCompInstanceName(), instance);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean removeFailedInstanceIfExists(ComponentInstance instance) {
|
||||||
|
if (failedInstances.containsKey(instance.getCompInstanceName())) {
|
||||||
|
failedInstances.remove(instance.getCompInstanceName());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean removeSuccessfulInstanceIfExists(ComponentInstance instance) {
|
||||||
|
if (succeededInstances.containsKey(instance.getCompInstanceName())) {
|
||||||
|
succeededInstances.remove(instance.getCompInstanceName());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isReadyForDownstream() {
|
||||||
|
return getRestartPolicyHandler().isReadyForDownStream(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ComponentRestartPolicy getRestartPolicyHandler(
|
||||||
|
RestartPolicyEnum restartPolicyEnum) {
|
||||||
|
|
||||||
|
if (RestartPolicyEnum.NEVER == restartPolicyEnum) {
|
||||||
|
return NeverRestartPolicy.getInstance();
|
||||||
|
} else if (RestartPolicyEnum.ON_FAILURE == restartPolicyEnum) {
|
||||||
|
return OnFailureRestartPolicy.getInstance();
|
||||||
|
} else{
|
||||||
|
return AlwaysRestartPolicy.getInstance();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public ComponentRestartPolicy getRestartPolicyHandler() {
|
||||||
|
RestartPolicyEnum restartPolicyEnum = getComponentSpec().getRestartPolicy();
|
||||||
|
return getRestartPolicyHandler(restartPolicyEnum);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.service.component;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface for Component Restart policies.
|
||||||
|
* Which is used to make decisions on termination/restart of components and
|
||||||
|
* their instances.
|
||||||
|
*/
|
||||||
|
public interface ComponentRestartPolicy {
|
||||||
|
|
||||||
|
boolean isLongLived();
|
||||||
|
|
||||||
|
boolean hasCompleted(Component component);
|
||||||
|
|
||||||
|
boolean hasCompletedSuccessfully(Component component);
|
||||||
|
|
||||||
|
boolean shouldRelaunchInstance(ComponentInstance componentInstance,
|
||||||
|
ContainerStatus containerStatus);
|
||||||
|
|
||||||
|
boolean isReadyForDownStream(Component component);
|
||||||
|
|
||||||
|
boolean allowUpgrades();
|
||||||
|
|
||||||
|
boolean shouldTerminate(Component component);
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,82 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.service.component;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Policy for components with instances that do not require/support a restart.
|
||||||
|
*/
|
||||||
|
public final class NeverRestartPolicy implements ComponentRestartPolicy {
|
||||||
|
|
||||||
|
private static NeverRestartPolicy INSTANCE = new NeverRestartPolicy();
|
||||||
|
|
||||||
|
private NeverRestartPolicy() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static NeverRestartPolicy getInstance() {
|
||||||
|
return INSTANCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isLongLived() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean hasCompleted(Component component) {
|
||||||
|
if (component.getNumSucceededInstances() + component.getNumFailedInstances()
|
||||||
|
< component.getNumDesiredInstances()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean hasCompletedSuccessfully(Component component) {
|
||||||
|
if (component.getNumSucceededInstances() == component
|
||||||
|
.getNumDesiredInstances()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean shouldRelaunchInstance(
|
||||||
|
ComponentInstance componentInstance, ContainerStatus containerStatus) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isReadyForDownStream(Component component) {
|
||||||
|
if (hasCompleted(component)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean allowUpgrades() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean shouldTerminate(Component component) {
|
||||||
|
long nSucceeded = component.getNumSucceededInstances();
|
||||||
|
long nFailed = component.getNumFailedInstances();
|
||||||
|
if (nSucceeded + nFailed < component.getComponentSpec()
|
||||||
|
.getNumberOfContainers()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,87 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.service.component;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Policy for components that require restarts for instances on failure.
|
||||||
|
*/
|
||||||
|
public final class OnFailureRestartPolicy implements ComponentRestartPolicy {
|
||||||
|
|
||||||
|
private static OnFailureRestartPolicy INSTANCE = new OnFailureRestartPolicy();
|
||||||
|
|
||||||
|
private OnFailureRestartPolicy() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static OnFailureRestartPolicy getInstance() {
|
||||||
|
return INSTANCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isLongLived() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean hasCompleted(Component component) {
|
||||||
|
if (hasCompletedSuccessfully(component)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean hasCompletedSuccessfully(Component component) {
|
||||||
|
if (component.getNumSucceededInstances() == component
|
||||||
|
.getNumDesiredInstances()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean shouldRelaunchInstance(
|
||||||
|
ComponentInstance componentInstance, ContainerStatus containerStatus) {
|
||||||
|
|
||||||
|
if (ComponentInstance.hasContainerFailed(containerStatus)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isReadyForDownStream(Component component) {
|
||||||
|
if (hasCompletedSuccessfully(component)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean allowUpgrades() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean shouldTerminate(Component component) {
|
||||||
|
long nSucceeded = component.getNumSucceededInstances();
|
||||||
|
if (nSucceeded < component.getComponentSpec().getNumberOfContainers()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.service.component.instance;
|
package org.apache.hadoop.yarn.service.component.instance;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.registry.client.api.RegistryConstants;
|
import org.apache.hadoop.registry.client.api.RegistryConstants;
|
||||||
|
@ -25,9 +26,9 @@ import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
|
||||||
import org.apache.hadoop.registry.client.binding.RegistryUtils;
|
import org.apache.hadoop.registry.client.binding.RegistryUtils;
|
||||||
import org.apache.hadoop.registry.client.types.ServiceRecord;
|
import org.apache.hadoop.registry.client.types.ServiceRecord;
|
||||||
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
|
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
|
||||||
import org.apache.hadoop.util.ExitUtil;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.service.component.Component;
|
import org.apache.hadoop.yarn.service.component.Component;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
||||||
|
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
|
||||||
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
|
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
|
||||||
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
||||||
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
|
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
|
||||||
|
@ -96,8 +98,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
// This container object is used for rest API query
|
// This container object is used for rest API query
|
||||||
private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
|
private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
|
||||||
|
|
||||||
|
|
||||||
private static final StateMachineFactory<ComponentInstance,
|
private static final StateMachineFactory<ComponentInstance,
|
||||||
ComponentInstanceState, ComponentInstanceEventType, ComponentInstanceEvent>
|
ComponentInstanceState, ComponentInstanceEventType,
|
||||||
|
ComponentInstanceEvent>
|
||||||
stateMachineFactory =
|
stateMachineFactory =
|
||||||
new StateMachineFactory<ComponentInstance, ComponentInstanceState,
|
new StateMachineFactory<ComponentInstance, ComponentInstanceState,
|
||||||
ComponentInstanceEventType, ComponentInstanceEvent>(INIT)
|
ComponentInstanceEventType, ComponentInstanceEvent>(INIT)
|
||||||
|
@ -230,6 +234,47 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static void handleComponentInstanceRelaunch(
|
||||||
|
ComponentInstance compInstance, ComponentInstanceEvent event) {
|
||||||
|
Component comp = compInstance.getComponent();
|
||||||
|
|
||||||
|
// Do we need to relaunch the service?
|
||||||
|
boolean hasContainerFailed = hasContainerFailed(event.getStatus());
|
||||||
|
|
||||||
|
ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
|
||||||
|
|
||||||
|
if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) {
|
||||||
|
// re-ask the failed container.
|
||||||
|
comp.requestContainers(1);
|
||||||
|
comp.reInsertPendingInstance(compInstance);
|
||||||
|
LOG.info(compInstance.getCompInstanceId()
|
||||||
|
+ ": {} completed. Reinsert back to pending list and requested " +
|
||||||
|
"a new container." + System.lineSeparator() +
|
||||||
|
" exitStatus={}, diagnostics={}.",
|
||||||
|
event.getContainerId(), event.getStatus().getExitStatus(),
|
||||||
|
event.getStatus().getDiagnostics());
|
||||||
|
} else {
|
||||||
|
// When no relaunch, update component's #succeeded/#failed
|
||||||
|
// instances.
|
||||||
|
if (hasContainerFailed) {
|
||||||
|
comp.markAsFailed(compInstance);
|
||||||
|
} else {
|
||||||
|
comp.markAsSucceeded(compInstance);
|
||||||
|
}
|
||||||
|
LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ?
|
||||||
|
" succeeded" :
|
||||||
|
" failed") + " without retry, exitStatus=" + event.getStatus());
|
||||||
|
comp.getScheduler().terminateServiceIfAllComponentsFinished();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean hasContainerFailed(ContainerStatus containerStatus) {
|
||||||
|
//Mark conainer as failed if we cant get its exit status i.e null?
|
||||||
|
return containerStatus == null || containerStatus.getExitStatus() !=
|
||||||
|
ContainerExitStatus.SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
private static class ContainerStoppedTransition extends BaseTransition {
|
private static class ContainerStoppedTransition extends BaseTransition {
|
||||||
// whether the container failed before launched by AM or not.
|
// whether the container failed before launched by AM or not.
|
||||||
boolean failedBeforeLaunching = false;
|
boolean failedBeforeLaunching = false;
|
||||||
|
@ -244,9 +289,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
@Override
|
@Override
|
||||||
public void transition(ComponentInstance compInstance,
|
public void transition(ComponentInstance compInstance,
|
||||||
ComponentInstanceEvent event) {
|
ComponentInstanceEvent event) {
|
||||||
// re-ask the failed container.
|
|
||||||
Component comp = compInstance.component;
|
Component comp = compInstance.component;
|
||||||
comp.requestContainers(1);
|
|
||||||
String containerDiag =
|
String containerDiag =
|
||||||
compInstance.getCompInstanceId() + ": " + event.getStatus()
|
compInstance.getCompInstanceId() + ": " + event.getStatus()
|
||||||
.getDiagnostics();
|
.getDiagnostics();
|
||||||
|
@ -259,7 +303,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
compInstance.component.decContainersReady(true);
|
compInstance.component.decContainersReady(true);
|
||||||
}
|
}
|
||||||
compInstance.component.decRunningContainers();
|
compInstance.component.decRunningContainers();
|
||||||
boolean shouldExit = false;
|
// Should we fail (terminate) the service?
|
||||||
|
boolean shouldFailService = false;
|
||||||
|
|
||||||
|
final ServiceScheduler scheduler = comp.getScheduler();
|
||||||
// Check if it exceeds the failure threshold, but only if health threshold
|
// Check if it exceeds the failure threshold, but only if health threshold
|
||||||
// monitor is not enabled
|
// monitor is not enabled
|
||||||
if (!comp.isHealthThresholdMonitorEnabled()
|
if (!comp.isHealthThresholdMonitorEnabled()
|
||||||
|
@ -271,10 +318,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp);
|
comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp);
|
||||||
compInstance.diagnostics.append(exitDiag);
|
compInstance.diagnostics.append(exitDiag);
|
||||||
// append to global diagnostics that will be reported to RM.
|
// append to global diagnostics that will be reported to RM.
|
||||||
comp.getScheduler().getDiagnostics().append(containerDiag);
|
scheduler.getDiagnostics().append(containerDiag);
|
||||||
comp.getScheduler().getDiagnostics().append(exitDiag);
|
scheduler.getDiagnostics().append(exitDiag);
|
||||||
LOG.warn(exitDiag);
|
LOG.warn(exitDiag);
|
||||||
shouldExit = true;
|
shouldFailService = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!failedBeforeLaunching) {
|
if (!failedBeforeLaunching) {
|
||||||
|
@ -296,25 +343,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove the failed ContainerId -> CompInstance mapping
|
// remove the failed ContainerId -> CompInstance mapping
|
||||||
comp.getScheduler().removeLiveCompInstance(event.getContainerId());
|
scheduler.removeLiveCompInstance(event.getContainerId());
|
||||||
|
|
||||||
comp.reInsertPendingInstance(compInstance);
|
// According to component restart policy, handle container restart
|
||||||
|
// or finish the service (if all components finished)
|
||||||
|
handleComponentInstanceRelaunch(compInstance, event);
|
||||||
|
|
||||||
LOG.info(compInstance.getCompInstanceId()
|
if (shouldFailService) {
|
||||||
+ ": {} completed. Reinsert back to pending list and requested " +
|
scheduler.getTerminationHandler().terminate(-1);
|
||||||
"a new container." + System.lineSeparator() +
|
|
||||||
" exitStatus={}, diagnostics={}.",
|
|
||||||
event.getContainerId(), event.getStatus().getExitStatus(),
|
|
||||||
event.getStatus().getDiagnostics());
|
|
||||||
if (shouldExit) {
|
|
||||||
// Sleep for 5 seconds in hope that the state can be recorded in ATS.
|
|
||||||
// in case there's a client polling the comp state, it can be notified.
|
|
||||||
try {
|
|
||||||
Thread.sleep(5000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.error("Interrupted on sleep while exiting.", e);
|
|
||||||
}
|
|
||||||
ExitUtil.terminate(-1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -630,4 +666,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
>>> 32));
|
>>> 32));
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting public org.apache.hadoop.yarn.service.api.records
|
||||||
|
.Container getContainerSpec() {
|
||||||
|
return containerSpec;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.net.DNS;
|
import org.apache.hadoop.net.DNS;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
|
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
|
||||||
|
@ -571,4 +572,21 @@ public final class ServiceUtils {
|
||||||
// Fallback to querying the default hostname as we did before.
|
// Fallback to querying the default hostname as we did before.
|
||||||
return InetAddress.getLocalHost().getCanonicalHostName();
|
return InetAddress.getLocalHost().getCanonicalHostName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process termination handler - exist with specified exit code after
|
||||||
|
* waiting a while for ATS state to be in sync.
|
||||||
|
*/
|
||||||
|
public static class ProcessTerminationHandler {
|
||||||
|
public void terminate(int exitCode) {
|
||||||
|
// Sleep for 5 seconds in hope that the state can be recorded in ATS.
|
||||||
|
// in case there's a client polling the comp state, it can be notified.
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.info("Interrupted on sleep while exiting.", e);
|
||||||
|
}
|
||||||
|
ExitUtil.terminate(exitCode);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,8 @@ import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
|
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
|
||||||
|
@ -99,8 +101,32 @@ public class ServiceTestUtils {
|
||||||
return exampleApp;
|
return exampleApp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Example service definition
|
||||||
|
// 2 components, each of which has 2 containers.
|
||||||
|
public static Service createTerminatingJobExample(String serviceName) {
|
||||||
|
Service exampleApp = new Service();
|
||||||
|
exampleApp.setName(serviceName);
|
||||||
|
exampleApp.setVersion("v1");
|
||||||
|
exampleApp.addComponent(
|
||||||
|
createComponent("terminating-comp1", 2, "sleep " + "1000",
|
||||||
|
Component.RestartPolicyEnum.NEVER, null));
|
||||||
|
exampleApp.addComponent(
|
||||||
|
createComponent("terminating-comp2", 2, "sleep 1000",
|
||||||
|
Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{
|
||||||
|
add("terminating-comp1");
|
||||||
|
}}));
|
||||||
|
exampleApp.addComponent(
|
||||||
|
createComponent("terminating-comp3", 2, "sleep 1000",
|
||||||
|
Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{
|
||||||
|
add("terminating-comp2");
|
||||||
|
}}));
|
||||||
|
|
||||||
|
return exampleApp;
|
||||||
|
}
|
||||||
|
|
||||||
public static Component createComponent(String name) {
|
public static Component createComponent(String name) {
|
||||||
return createComponent(name, 2L, "sleep 1000");
|
return createComponent(name, 2L, "sleep 1000",
|
||||||
|
Component.RestartPolicyEnum.ALWAYS, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static Component createComponent(String name, long numContainers,
|
protected static Component createComponent(String name, long numContainers,
|
||||||
|
@ -116,6 +142,18 @@ public class ServiceTestUtils {
|
||||||
return comp1;
|
return comp1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static Component createComponent(String name, long numContainers,
|
||||||
|
String command, Component.RestartPolicyEnum restartPolicyEnum,
|
||||||
|
List<String> dependencies) {
|
||||||
|
Component comp = createComponent(name, numContainers, command);
|
||||||
|
comp.setRestartPolicy(restartPolicyEnum);
|
||||||
|
|
||||||
|
if (dependencies != null) {
|
||||||
|
comp.dependencies(dependencies);
|
||||||
|
}
|
||||||
|
return comp;
|
||||||
|
}
|
||||||
|
|
||||||
public static SliderFileSystem initMockFs() throws IOException {
|
public static SliderFileSystem initMockFs() throws IOException {
|
||||||
return initMockFs(null);
|
return initMockFs(null);
|
||||||
}
|
}
|
||||||
|
@ -306,6 +344,12 @@ public class ServiceTestUtils {
|
||||||
return client;
|
return client;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ServiceManager createServiceManager(ServiceContext context) {
|
||||||
|
ServiceManager serviceManager = new ServiceManager(context);
|
||||||
|
context.setServiceManager(serviceManager);
|
||||||
|
return serviceManager;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a YarnClient for test purposes.
|
* Creates a YarnClient for test purposes.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -227,14 +227,16 @@ public class TestServiceManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Service createBaseDef(String name) {
|
public static Service createBaseDef(String name) {
|
||||||
|
return createDef(name, ServiceTestUtils.createExampleApplication());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Service createDef(String name, Service serviceDef) {
|
||||||
ApplicationId applicationId = ApplicationId.newInstance(
|
ApplicationId applicationId = ApplicationId.newInstance(
|
||||||
System.currentTimeMillis(), 1);
|
System.currentTimeMillis(), 1);
|
||||||
Service serviceDef = ServiceTestUtils.createExampleApplication();
|
|
||||||
serviceDef.setId(applicationId.toString());
|
serviceDef.setId(applicationId.toString());
|
||||||
serviceDef.setName(name);
|
serviceDef.setName(name);
|
||||||
serviceDef.setState(ServiceState.STARTED);
|
serviceDef.setState(ServiceState.STARTED);
|
||||||
Artifact artifact = createTestArtifact("1");
|
Artifact artifact = createTestArtifact("1");
|
||||||
|
|
||||||
serviceDef.getComponents().forEach(component ->
|
serviceDef.getComponents().forEach(component ->
|
||||||
component.setArtifact(artifact));
|
component.setArtifact(artifact));
|
||||||
return serviceDef;
|
return serviceDef;
|
||||||
|
|
|
@ -38,8 +38,10 @@ import org.apache.hadoop.yarn.service.api.records.Service;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
||||||
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -50,6 +52,7 @@ import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
|
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
|
||||||
|
|
||||||
import static org.mockito.Matchers.anyObject;
|
import static org.mockito.Matchers.anyObject;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -60,6 +63,9 @@ import static org.mockito.Mockito.when;
|
||||||
*/
|
*/
|
||||||
public class TestComponent {
|
public class TestComponent {
|
||||||
|
|
||||||
|
private static final int WAIT_MS_PER_LOOP = 1000;
|
||||||
|
static final Logger LOG = Logger.getLogger(TestComponent.class);
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public ServiceTestUtils.ServiceFSWatcher rule =
|
public ServiceTestUtils.ServiceFSWatcher rule =
|
||||||
new ServiceTestUtils.ServiceFSWatcher();
|
new ServiceTestUtils.ServiceFSWatcher();
|
||||||
|
@ -158,6 +164,57 @@ public class TestComponent {
|
||||||
comp.getComponentSpec().getConfiguration().getEnv("key1"));
|
comp.getComponentSpec().getConfiguration().getEnv("key1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testComponentStateUpdatesWithTerminatingComponents() throws
|
||||||
|
Exception {
|
||||||
|
final String serviceName =
|
||||||
|
"testComponentStateUpdatesWithTerminatingComponents";
|
||||||
|
|
||||||
|
Service testService = ServiceTestUtils.createTerminatingJobExample(
|
||||||
|
serviceName);
|
||||||
|
TestServiceManager.createDef(serviceName, testService);
|
||||||
|
|
||||||
|
ServiceContext context = createTestContext(rule, testService);
|
||||||
|
|
||||||
|
for (Component comp : context.scheduler.getAllComponents().values()) {
|
||||||
|
|
||||||
|
Iterator<ComponentInstance> instanceIter = comp.
|
||||||
|
getAllComponentInstances().iterator();
|
||||||
|
|
||||||
|
ComponentInstance componentInstance = instanceIter.next();
|
||||||
|
Container instanceContainer = componentInstance.getContainer();
|
||||||
|
|
||||||
|
Assert.assertEquals(0, comp.getNumSucceededInstances());
|
||||||
|
Assert.assertEquals(0, comp.getNumFailedInstances());
|
||||||
|
Assert.assertEquals(2, comp.getNumRunningInstances());
|
||||||
|
Assert.assertEquals(2, comp.getNumReadyInstances());
|
||||||
|
Assert.assertEquals(0, comp.getPendingInstances().size());
|
||||||
|
|
||||||
|
//stop 1 container
|
||||||
|
ContainerStatus containerStatus = ContainerStatus.newInstance(
|
||||||
|
instanceContainer.getId(),
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
|
||||||
|
"successful", 0);
|
||||||
|
comp.handle(new ComponentEvent(comp.getName(),
|
||||||
|
ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus));
|
||||||
|
componentInstance.handle(
|
||||||
|
new ComponentInstanceEvent(componentInstance.getContainer().getId(),
|
||||||
|
ComponentInstanceEventType.STOP).setStatus(containerStatus));
|
||||||
|
|
||||||
|
Assert.assertEquals(1, comp.getNumSucceededInstances());
|
||||||
|
Assert.assertEquals(0, comp.getNumFailedInstances());
|
||||||
|
Assert.assertEquals(1, comp.getNumRunningInstances());
|
||||||
|
Assert.assertEquals(1, comp.getNumReadyInstances());
|
||||||
|
Assert.assertEquals(0, comp.getPendingInstances().size());
|
||||||
|
|
||||||
|
org.apache.hadoop.yarn.service.component.ComponentState componentState =
|
||||||
|
Component.checkIfStable(comp);
|
||||||
|
Assert.assertEquals(
|
||||||
|
org.apache.hadoop.yarn.service.component.ComponentState.STABLE,
|
||||||
|
componentState);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static org.apache.hadoop.yarn.service.api.records.Component
|
private static org.apache.hadoop.yarn.service.api.records.Component
|
||||||
createSpecWithEnv(String serviceName, String compName, String key,
|
createSpecWithEnv(String serviceName, String compName, String key,
|
||||||
String val) {
|
String val) {
|
||||||
|
@ -171,31 +228,38 @@ public class TestComponent {
|
||||||
public static ServiceContext createTestContext(
|
public static ServiceContext createTestContext(
|
||||||
ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName)
|
ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
return createTestContext(fsWatcher,
|
||||||
|
TestServiceManager.createBaseDef(serviceName));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ServiceContext createTestContext(
|
||||||
|
ServiceTestUtils.ServiceFSWatcher fsWatcher, Service serviceDef)
|
||||||
|
throws Exception {
|
||||||
ServiceContext context = new ServiceContext();
|
ServiceContext context = new ServiceContext();
|
||||||
context.service = TestServiceManager.createBaseDef(serviceName);
|
context.service = serviceDef;
|
||||||
context.fs = fsWatcher.getFs();
|
context.fs = fsWatcher.getFs();
|
||||||
|
|
||||||
ContainerLaunchService mockLaunchService = mock(
|
ContainerLaunchService mockLaunchService = mock(
|
||||||
ContainerLaunchService.class);
|
ContainerLaunchService.class);
|
||||||
|
|
||||||
context.scheduler = new ServiceScheduler(context) {
|
context.scheduler = new ServiceScheduler(context) {
|
||||||
@Override
|
@Override protected YarnRegistryViewForProviders
|
||||||
protected YarnRegistryViewForProviders createYarnRegistryOperations(
|
createYarnRegistryOperations(
|
||||||
ServiceContext context, RegistryOperations registryClient) {
|
ServiceContext context, RegistryOperations registryClient) {
|
||||||
return mock(YarnRegistryViewForProviders.class);
|
return mock(YarnRegistryViewForProviders.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override public NMClientAsync createNMClient() {
|
||||||
public NMClientAsync createNMClient() {
|
|
||||||
NMClientAsync nmClientAsync = super.createNMClient();
|
NMClientAsync nmClientAsync = super.createNMClient();
|
||||||
NMClient nmClient = mock(NMClient.class);
|
NMClient nmClient = mock(NMClient.class);
|
||||||
try {
|
try {
|
||||||
when(nmClient.getContainerStatus(anyObject(), anyObject()))
|
when(nmClient.getContainerStatus(anyObject(), anyObject()))
|
||||||
.thenAnswer((Answer<ContainerStatus>) invocation ->
|
.thenAnswer(
|
||||||
ContainerStatus.newInstance(
|
(Answer<ContainerStatus>) invocation -> ContainerStatus
|
||||||
(ContainerId) invocation.getArguments()[0],
|
.newInstance((ContainerId) invocation.getArguments()[0],
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
org.apache.hadoop.yarn.api.records.ContainerState
|
||||||
"", 0));
|
.RUNNING,
|
||||||
|
"", 0));
|
||||||
} catch (YarnException | IOException e) {
|
} catch (YarnException | IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -203,16 +267,18 @@ public class TestComponent {
|
||||||
return nmClientAsync;
|
return nmClientAsync;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override public ContainerLaunchService getContainerLaunchService() {
|
||||||
public ContainerLaunchService getContainerLaunchService() {
|
|
||||||
return mockLaunchService;
|
return mockLaunchService;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
context.scheduler.init(fsWatcher.getConf());
|
context.scheduler.init(fsWatcher.getConf());
|
||||||
|
|
||||||
|
ServiceTestUtils.createServiceManager(context);
|
||||||
|
|
||||||
doNothing().when(mockLaunchService).
|
doNothing().when(mockLaunchService).
|
||||||
reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
|
reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
|
||||||
stabilizeComponents(context);
|
stabilizeComponents(context);
|
||||||
|
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -223,6 +289,8 @@ public class TestComponent {
|
||||||
context.attemptId = attemptId;
|
context.attemptId = attemptId;
|
||||||
Map<String, Component>
|
Map<String, Component>
|
||||||
componentState = context.scheduler.getAllComponents();
|
componentState = context.scheduler.getAllComponents();
|
||||||
|
|
||||||
|
int counter = 0;
|
||||||
for (org.apache.hadoop.yarn.service.api.records.Component componentSpec :
|
for (org.apache.hadoop.yarn.service.api.records.Component componentSpec :
|
||||||
context.service.getComponents()) {
|
context.service.getComponents()) {
|
||||||
Component component = new org.apache.hadoop.yarn.service.component.
|
Component component = new org.apache.hadoop.yarn.service.component.
|
||||||
|
@ -230,9 +298,12 @@ public class TestComponent {
|
||||||
componentState.put(component.getName(), component);
|
componentState.put(component.getName(), component);
|
||||||
component.handle(new ComponentEvent(component.getName(),
|
component.handle(new ComponentEvent(component.getName(),
|
||||||
ComponentEventType.FLEX));
|
ComponentEventType.FLEX));
|
||||||
|
|
||||||
for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
|
for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
|
||||||
assignNewContainer(attemptId, i + 1, context, component);
|
counter++;
|
||||||
|
assignNewContainer(attemptId, counter, context, component);
|
||||||
}
|
}
|
||||||
|
|
||||||
component.handle(new ComponentEvent(component.getName(),
|
component.handle(new ComponentEvent(component.getName(),
|
||||||
ComponentEventType.CHECK_STABLE));
|
ComponentEventType.CHECK_STABLE));
|
||||||
}
|
}
|
||||||
|
@ -241,6 +312,8 @@ public class TestComponent {
|
||||||
private static void assignNewContainer(
|
private static void assignNewContainer(
|
||||||
ApplicationAttemptId attemptId, long containerNum,
|
ApplicationAttemptId attemptId, long containerNum,
|
||||||
ServiceContext context, Component component) {
|
ServiceContext context, Component component) {
|
||||||
|
|
||||||
|
|
||||||
Container container = org.apache.hadoop.yarn.api.records.Container
|
Container container = org.apache.hadoop.yarn.api.records.Container
|
||||||
.newInstance(ContainerId.newContainerId(attemptId, containerNum),
|
.newInstance(ContainerId.newContainerId(attemptId, containerNum),
|
||||||
NODE_ID, "localhost", null, null,
|
NODE_ID, "localhost", null, null,
|
||||||
|
|
|
@ -0,0 +1,130 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.service.component;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for ComponentRestartPolicy implementations.
|
||||||
|
*/
|
||||||
|
public class TestComponentRestartPolicy {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAlwaysRestartPolicy() throws Exception {
|
||||||
|
|
||||||
|
AlwaysRestartPolicy alwaysRestartPolicy = AlwaysRestartPolicy.getInstance();
|
||||||
|
|
||||||
|
Component component = mock(Component.class);
|
||||||
|
when(component.getNumReadyInstances()).thenReturn(1);
|
||||||
|
when(component.getNumDesiredInstances()).thenReturn(2);
|
||||||
|
|
||||||
|
ComponentInstance instance = mock(ComponentInstance.class);
|
||||||
|
when(instance.getComponent()).thenReturn(component);
|
||||||
|
|
||||||
|
ContainerStatus containerStatus = mock(ContainerStatus.class);
|
||||||
|
|
||||||
|
assertEquals(true, alwaysRestartPolicy.isLongLived());
|
||||||
|
assertEquals(true, alwaysRestartPolicy.allowUpgrades());
|
||||||
|
assertEquals(false, alwaysRestartPolicy.hasCompleted(component));
|
||||||
|
assertEquals(false,
|
||||||
|
alwaysRestartPolicy.hasCompletedSuccessfully(component));
|
||||||
|
|
||||||
|
assertEquals(true,
|
||||||
|
alwaysRestartPolicy.shouldRelaunchInstance(instance, containerStatus));
|
||||||
|
|
||||||
|
assertEquals(false, alwaysRestartPolicy.isReadyForDownStream(component));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNeverRestartPolicy() throws Exception {
|
||||||
|
|
||||||
|
NeverRestartPolicy restartPolicy = NeverRestartPolicy.getInstance();
|
||||||
|
|
||||||
|
Component component = mock(Component.class);
|
||||||
|
when(component.getNumSucceededInstances()).thenReturn(new Long(1));
|
||||||
|
when(component.getNumFailedInstances()).thenReturn(new Long(2));
|
||||||
|
when(component.getNumDesiredInstances()).thenReturn(3);
|
||||||
|
|
||||||
|
ComponentInstance instance = mock(ComponentInstance.class);
|
||||||
|
when(instance.getComponent()).thenReturn(component);
|
||||||
|
|
||||||
|
ContainerStatus containerStatus = mock(ContainerStatus.class);
|
||||||
|
|
||||||
|
assertEquals(false, restartPolicy.isLongLived());
|
||||||
|
assertEquals(false, restartPolicy.allowUpgrades());
|
||||||
|
assertEquals(true, restartPolicy.hasCompleted(component));
|
||||||
|
assertEquals(false,
|
||||||
|
restartPolicy.hasCompletedSuccessfully(component));
|
||||||
|
|
||||||
|
assertEquals(false,
|
||||||
|
restartPolicy.shouldRelaunchInstance(instance, containerStatus));
|
||||||
|
|
||||||
|
assertEquals(true, restartPolicy.isReadyForDownStream(component));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOnFailureRestartPolicy() throws Exception {
|
||||||
|
|
||||||
|
OnFailureRestartPolicy restartPolicy = OnFailureRestartPolicy.getInstance();
|
||||||
|
|
||||||
|
Component component = mock(Component.class);
|
||||||
|
when(component.getNumSucceededInstances()).thenReturn(new Long(3));
|
||||||
|
when(component.getNumFailedInstances()).thenReturn(new Long(0));
|
||||||
|
when(component.getNumDesiredInstances()).thenReturn(3);
|
||||||
|
|
||||||
|
ComponentInstance instance = mock(ComponentInstance.class);
|
||||||
|
when(instance.getComponent()).thenReturn(component);
|
||||||
|
|
||||||
|
ContainerStatus containerStatus = mock(ContainerStatus.class);
|
||||||
|
when(containerStatus.getExitStatus()).thenReturn(0);
|
||||||
|
|
||||||
|
assertEquals(false, restartPolicy.isLongLived());
|
||||||
|
assertEquals(false, restartPolicy.allowUpgrades());
|
||||||
|
assertEquals(true, restartPolicy.hasCompleted(component));
|
||||||
|
assertEquals(true,
|
||||||
|
restartPolicy.hasCompletedSuccessfully(component));
|
||||||
|
|
||||||
|
assertEquals(false,
|
||||||
|
restartPolicy.shouldRelaunchInstance(instance, containerStatus));
|
||||||
|
|
||||||
|
assertEquals(true, restartPolicy.isReadyForDownStream(component));
|
||||||
|
|
||||||
|
|
||||||
|
when(component.getNumSucceededInstances()).thenReturn(new Long(2));
|
||||||
|
when(component.getNumFailedInstances()).thenReturn(new Long(1));
|
||||||
|
when(component.getNumDesiredInstances()).thenReturn(3);
|
||||||
|
|
||||||
|
assertEquals(false, restartPolicy.hasCompleted(component));
|
||||||
|
assertEquals(false,
|
||||||
|
restartPolicy.hasCompletedSuccessfully(component));
|
||||||
|
|
||||||
|
when(containerStatus.getExitStatus()).thenReturn(-1000);
|
||||||
|
|
||||||
|
assertEquals(true,
|
||||||
|
restartPolicy.shouldRelaunchInstance(instance, containerStatus));
|
||||||
|
|
||||||
|
assertEquals(false, restartPolicy.isReadyForDownStream(component));
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -6,9 +6,9 @@
|
||||||
* to you under the Apache License, Version 2.0 (the
|
* to you under the Apache License, Version 2.0 (the
|
||||||
* "License"); you may not use this file except in compliance
|
* "License"); you may not use this file except in compliance
|
||||||
* with the License. You may obtain a copy of the License at
|
* with the License. You may obtain a copy of the License at
|
||||||
*
|
* <p>
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
* <p>
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
@ -18,56 +18,80 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.service.component.instance;
|
package org.apache.hadoop.yarn.service.component.instance;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.service.ServiceContext;
|
import org.apache.hadoop.yarn.service.ServiceContext;
|
||||||
|
import org.apache.hadoop.yarn.service.ServiceScheduler;
|
||||||
import org.apache.hadoop.yarn.service.ServiceTestUtils;
|
import org.apache.hadoop.yarn.service.ServiceTestUtils;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||||
import org.apache.hadoop.yarn.service.component.Component;
|
import org.apache.hadoop.yarn.service.component.Component;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
||||||
import org.apache.hadoop.yarn.service.component.TestComponent;
|
import org.apache.hadoop.yarn.service.component.TestComponent;
|
||||||
|
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyInt;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for {@link ComponentInstance}.
|
* Tests for {@link ComponentInstance}.
|
||||||
*/
|
*/
|
||||||
public class TestComponentInstance {
|
public class TestComponentInstance {
|
||||||
|
|
||||||
@Rule
|
@Rule public ServiceTestUtils.ServiceFSWatcher rule =
|
||||||
public ServiceTestUtils.ServiceFSWatcher rule =
|
|
||||||
new ServiceTestUtils.ServiceFSWatcher();
|
new ServiceTestUtils.ServiceFSWatcher();
|
||||||
|
|
||||||
@Test
|
@Test public void testContainerUpgrade() throws Exception {
|
||||||
public void testContainerUpgrade() throws Exception {
|
|
||||||
ServiceContext context = TestComponent.createTestContext(rule,
|
ServiceContext context = TestComponent.createTestContext(rule,
|
||||||
"testContainerUpgrade");
|
"testContainerUpgrade");
|
||||||
Component component = context.scheduler.getAllComponents().entrySet()
|
Component component =
|
||||||
.iterator().next().getValue();
|
context.scheduler.getAllComponents().entrySet().iterator().next()
|
||||||
|
.getValue();
|
||||||
upgradeComponent(component);
|
upgradeComponent(component);
|
||||||
|
|
||||||
ComponentInstance instance = component.getAllComponentInstances()
|
ComponentInstance instance =
|
||||||
.iterator().next();
|
component.getAllComponentInstances().iterator().next();
|
||||||
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
|
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
|
||||||
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
|
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
|
||||||
instance.handle(instanceEvent);
|
instance.handle(instanceEvent);
|
||||||
Container containerSpec = component.getComponentSpec().getContainer(
|
Container containerSpec = component.getComponentSpec().getContainer(
|
||||||
instance.getContainer().getId().toString());
|
instance.getContainer().getId().toString());
|
||||||
Assert.assertEquals("instance not upgrading",
|
Assert.assertEquals("instance not upgrading", ContainerState.UPGRADING,
|
||||||
ContainerState.UPGRADING, containerSpec.getState());
|
containerSpec.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test public void testContainerReadyAfterUpgrade() throws Exception {
|
||||||
public void testContainerReadyAfterUpgrade() throws Exception {
|
|
||||||
ServiceContext context = TestComponent.createTestContext(rule,
|
ServiceContext context = TestComponent.createTestContext(rule,
|
||||||
"testContainerStarted");
|
"testContainerStarted");
|
||||||
Component component = context.scheduler.getAllComponents().entrySet()
|
Component component =
|
||||||
.iterator().next().getValue();
|
context.scheduler.getAllComponents().entrySet().iterator().next()
|
||||||
|
.getValue();
|
||||||
upgradeComponent(component);
|
upgradeComponent(component);
|
||||||
|
|
||||||
ComponentInstance instance = component.getAllComponentInstances()
|
ComponentInstance instance =
|
||||||
.iterator().next();
|
component.getAllComponentInstances().iterator().next();
|
||||||
|
|
||||||
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
|
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
|
||||||
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
|
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
|
||||||
|
@ -75,14 +99,426 @@ public class TestComponentInstance {
|
||||||
|
|
||||||
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
|
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
|
||||||
ComponentInstanceEventType.BECOME_READY));
|
ComponentInstanceEventType.BECOME_READY));
|
||||||
Assert.assertEquals("instance not ready",
|
Assert.assertEquals("instance not ready", ContainerState.READY,
|
||||||
ContainerState.READY, instance.getCompSpec().getContainer(
|
instance.getCompSpec()
|
||||||
instance.getContainer().getId().toString()).getState());
|
.getContainer(instance.getContainer().getId().toString())
|
||||||
|
.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void upgradeComponent(Component component) {
|
private void upgradeComponent(Component component) {
|
||||||
component.handle(new ComponentEvent(component.getName(),
|
component.handle(new ComponentEvent(component.getName(),
|
||||||
ComponentEventType.UPGRADE)
|
ComponentEventType.UPGRADE).setTargetSpec(component.getComponentSpec())
|
||||||
.setTargetSpec(component.getComponentSpec()).setUpgradeVersion("v2"));
|
.setUpgradeVersion("v2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Component createComponent(ServiceScheduler scheduler,
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
|
||||||
|
restartPolicy,
|
||||||
|
int nSucceededInstances, int nFailedInstances, int totalAsk,
|
||||||
|
int componentId) {
|
||||||
|
|
||||||
|
assert (nSucceededInstances + nFailedInstances) <= totalAsk;
|
||||||
|
|
||||||
|
Component comp = mock(Component.class);
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock(
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component.class);
|
||||||
|
when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy);
|
||||||
|
when(comp.getRestartPolicyHandler()).thenReturn(
|
||||||
|
Component.getRestartPolicyHandler(restartPolicy));
|
||||||
|
when(componentSpec.getNumberOfContainers()).thenReturn(
|
||||||
|
Long.valueOf(totalAsk));
|
||||||
|
when(comp.getComponentSpec()).thenReturn(componentSpec);
|
||||||
|
when(comp.getScheduler()).thenReturn(scheduler);
|
||||||
|
|
||||||
|
Map<String, ComponentInstance> succeeded = new ConcurrentHashMap<>();
|
||||||
|
Map<String, ComponentInstance> failed = new ConcurrentHashMap<>();
|
||||||
|
scheduler.getAllComponents().put("comp" + componentId, comp);
|
||||||
|
|
||||||
|
Map<String, ComponentInstance> componentInstances = new HashMap<>();
|
||||||
|
|
||||||
|
for (int i = 0; i < nSucceededInstances; i++) {
|
||||||
|
ComponentInstance componentInstance = createComponentInstance(comp, i);
|
||||||
|
componentInstances.put(componentInstance.getCompInstanceName(),
|
||||||
|
componentInstance);
|
||||||
|
succeeded.put(componentInstance.getCompInstanceName(), componentInstance);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < nFailedInstances; i++) {
|
||||||
|
ComponentInstance componentInstance = createComponentInstance(comp,
|
||||||
|
i + nSucceededInstances);
|
||||||
|
componentInstances.put(componentInstance.getCompInstanceName(),
|
||||||
|
componentInstance);
|
||||||
|
failed.put(componentInstance.getCompInstanceName(), componentInstance);
|
||||||
|
}
|
||||||
|
|
||||||
|
int delta = totalAsk - nFailedInstances - nSucceededInstances;
|
||||||
|
|
||||||
|
for (int i = 0; i < delta; i++) {
|
||||||
|
ComponentInstance componentInstance = createComponentInstance(comp,
|
||||||
|
i + nSucceededInstances + nFailedInstances);
|
||||||
|
componentInstances.put(componentInstance.getCompInstanceName(),
|
||||||
|
componentInstance);
|
||||||
|
}
|
||||||
|
|
||||||
|
when(comp.getAllComponentInstances()).thenReturn(
|
||||||
|
componentInstances.values());
|
||||||
|
when(comp.getSucceededInstances()).thenReturn(succeeded.values());
|
||||||
|
when(comp.getFailedInstances()).thenReturn(failed.values());
|
||||||
|
return comp;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Component createComponent(ServiceScheduler scheduler,
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
|
||||||
|
restartPolicy,
|
||||||
|
int totalAsk, int componentId) {
|
||||||
|
|
||||||
|
Component comp = mock(Component.class);
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock(
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component.class);
|
||||||
|
when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy);
|
||||||
|
when(comp.getRestartPolicyHandler()).thenReturn(
|
||||||
|
Component.getRestartPolicyHandler(restartPolicy));
|
||||||
|
when(componentSpec.getNumberOfContainers()).thenReturn(
|
||||||
|
Long.valueOf(totalAsk));
|
||||||
|
when(comp.getComponentSpec()).thenReturn(componentSpec);
|
||||||
|
when(comp.getScheduler()).thenReturn(scheduler);
|
||||||
|
|
||||||
|
scheduler.getAllComponents().put("comp" + componentId, comp);
|
||||||
|
|
||||||
|
Map<String, ComponentInstance> componentInstances = new HashMap<>();
|
||||||
|
|
||||||
|
for (int i = 0; i < totalAsk; i++) {
|
||||||
|
ComponentInstance componentInstance = createComponentInstance(comp, i);
|
||||||
|
componentInstances.put(componentInstance.getCompInstanceName(),
|
||||||
|
componentInstance);
|
||||||
|
}
|
||||||
|
|
||||||
|
when(comp.getAllComponentInstances()).thenReturn(
|
||||||
|
componentInstances.values());
|
||||||
|
return comp;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ComponentInstance createComponentInstance(Component component,
|
||||||
|
int instanceId) {
|
||||||
|
|
||||||
|
ComponentInstance componentInstance = mock(ComponentInstance.class);
|
||||||
|
when(componentInstance.getComponent()).thenReturn(component);
|
||||||
|
when(componentInstance.getCompInstanceName()).thenReturn(
|
||||||
|
"compInstance" + instanceId);
|
||||||
|
|
||||||
|
ServiceUtils.ProcessTerminationHandler terminationHandler = mock(
|
||||||
|
ServiceUtils.ProcessTerminationHandler.class);
|
||||||
|
when(component.getScheduler().getTerminationHandler()).thenReturn(
|
||||||
|
terminationHandler);
|
||||||
|
|
||||||
|
return componentInstance;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testComponentRestartPolicy() {
|
||||||
|
|
||||||
|
Map<String, Component> allComponents = new HashMap<>();
|
||||||
|
Service mockService = mock(Service.class);
|
||||||
|
ServiceContext serviceContext = mock(ServiceContext.class);
|
||||||
|
when(serviceContext.getService()).thenReturn(mockService);
|
||||||
|
ServiceScheduler serviceSchedulerInstance = new ServiceScheduler(
|
||||||
|
serviceContext);
|
||||||
|
ServiceScheduler serviceScheduler = spy(serviceSchedulerInstance);
|
||||||
|
when(serviceScheduler.getAllComponents()).thenReturn(allComponents);
|
||||||
|
Mockito.doNothing().when(serviceScheduler).setGracefulStop(
|
||||||
|
any(FinalApplicationStatus.class));
|
||||||
|
|
||||||
|
ComponentInstanceEvent componentInstanceEvent = mock(
|
||||||
|
ComponentInstanceEvent.class);
|
||||||
|
ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId
|
||||||
|
.newInstance(ApplicationId.newInstance(1234L, 1), 1), 1);
|
||||||
|
ContainerStatus containerStatus = ContainerStatus.newInstance(containerId,
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, "hello", 0);
|
||||||
|
|
||||||
|
when(componentInstanceEvent.getStatus()).thenReturn(containerStatus);
|
||||||
|
|
||||||
|
// Test case1: one component, one instance, restart policy = ALWAYS, exit=0
|
||||||
|
Component comp = createComponent(serviceScheduler,
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.ALWAYS,
|
||||||
|
1, 0, 1, 0);
|
||||||
|
ComponentInstance componentInstance =
|
||||||
|
comp.getAllComponentInstances().iterator().next();
|
||||||
|
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
|
||||||
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
||||||
|
verify(comp, times(1)).reInsertPendingInstance(
|
||||||
|
any(ComponentInstance.class));
|
||||||
|
verify(serviceScheduler.getTerminationHandler(), never()).terminate(
|
||||||
|
anyInt());
|
||||||
|
|
||||||
|
// Test case2: one component, one instance, restart policy = ALWAYS, exit=1
|
||||||
|
comp = createComponent(serviceScheduler,
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.ALWAYS,
|
||||||
|
0, 1, 1, 0);
|
||||||
|
componentInstance = comp.getAllComponentInstances().iterator().next();
|
||||||
|
containerStatus.setExitStatus(1);
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
||||||
|
verify(comp, times(1)).reInsertPendingInstance(
|
||||||
|
any(ComponentInstance.class));
|
||||||
|
verify(serviceScheduler.getTerminationHandler(), never()).terminate(
|
||||||
|
anyInt());
|
||||||
|
|
||||||
|
// Test case3: one component, one instance, restart policy = NEVER, exit=0
|
||||||
|
// Should exit with code=0
|
||||||
|
comp = createComponent(serviceScheduler,
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.NEVER,
|
||||||
|
1, 0, 1, 0);
|
||||||
|
componentInstance = comp.getAllComponentInstances().iterator().next();
|
||||||
|
containerStatus.setExitStatus(0);
|
||||||
|
|
||||||
|
Map<String, ComponentInstance> succeededInstances = new HashMap<>();
|
||||||
|
succeededInstances.put(componentInstance.getCompInstanceName(),
|
||||||
|
componentInstance);
|
||||||
|
when(comp.getSucceededInstances()).thenReturn(succeededInstances.values());
|
||||||
|
when(comp.getNumSucceededInstances()).thenReturn(new Long(1));
|
||||||
|
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
verify(comp, times(1)).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
||||||
|
verify(comp, times(0)).reInsertPendingInstance(
|
||||||
|
any(ComponentInstance.class));
|
||||||
|
verify(serviceScheduler.getTerminationHandler(), times(1)).terminate(eq(0));
|
||||||
|
|
||||||
|
// Test case4: one component, one instance, restart policy = NEVER, exit=1
|
||||||
|
// Should exit with code=-1
|
||||||
|
comp = createComponent(serviceScheduler,
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.NEVER,
|
||||||
|
0, 1, 1, 0);
|
||||||
|
componentInstance = comp.getAllComponentInstances().iterator().next();
|
||||||
|
containerStatus.setExitStatus(-1);
|
||||||
|
|
||||||
|
when(comp.getNumFailedInstances()).thenReturn(new Long(1));
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
|
||||||
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
verify(comp, times(1)).markAsFailed(any(ComponentInstance.class));
|
||||||
|
verify(comp, times(0)).reInsertPendingInstance(
|
||||||
|
any(ComponentInstance.class));
|
||||||
|
verify(serviceScheduler.getTerminationHandler(), times(1)).terminate(
|
||||||
|
eq(-1));
|
||||||
|
|
||||||
|
// Test case5: one component, one instance, restart policy = ON_FAILURE,
|
||||||
|
// exit=1
|
||||||
|
// Should continue run.
|
||||||
|
comp = createComponent(serviceScheduler,
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.ON_FAILURE,
|
||||||
|
0, 1, 1, 0);
|
||||||
|
componentInstance = comp.getAllComponentInstances().iterator().next();
|
||||||
|
containerStatus.setExitStatus(1);
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
||||||
|
verify(comp, times(1)).reInsertPendingInstance(
|
||||||
|
any(ComponentInstance.class));
|
||||||
|
verify(serviceScheduler.getTerminationHandler(), times(0)).terminate(
|
||||||
|
anyInt());
|
||||||
|
|
||||||
|
// Test case6: one component, 3 instances, restart policy = NEVER, exit=1
|
||||||
|
// 2 of the instances not completed, it should continue run.
|
||||||
|
comp = createComponent(serviceScheduler,
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.NEVER,
|
||||||
|
0, 1, 3, 0);
|
||||||
|
componentInstance = comp.getAllComponentInstances().iterator().next();
|
||||||
|
containerStatus.setExitStatus(1);
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
verify(comp, times(1)).markAsFailed(any(ComponentInstance.class));
|
||||||
|
verify(comp, times(0)).reInsertPendingInstance(
|
||||||
|
any(ComponentInstance.class));
|
||||||
|
verify(serviceScheduler.getTerminationHandler(), times(0)).terminate(
|
||||||
|
anyInt());
|
||||||
|
|
||||||
|
// Test case7: one component, 3 instances, restart policy = ON_FAILURE,
|
||||||
|
// exit=1
|
||||||
|
// 2 of the instances completed, it should continue run.
|
||||||
|
|
||||||
|
comp = createComponent(serviceScheduler,
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.ON_FAILURE,
|
||||||
|
0, 1, 3, 0);
|
||||||
|
|
||||||
|
Iterator<ComponentInstance> iter =
|
||||||
|
comp.getAllComponentInstances().iterator();
|
||||||
|
|
||||||
|
containerStatus.setExitStatus(1);
|
||||||
|
ComponentInstance commponentInstance = iter.next();
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(commponentInstance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
|
||||||
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
||||||
|
verify(comp, times(1)).reInsertPendingInstance(
|
||||||
|
any(ComponentInstance.class));
|
||||||
|
verify(serviceScheduler.getTerminationHandler(), times(0)).terminate(
|
||||||
|
anyInt());
|
||||||
|
|
||||||
|
// Test case8: 2 components, 2 instances for each
|
||||||
|
// comp2 already finished.
|
||||||
|
// comp1 has a new instance finish, we should terminate the service
|
||||||
|
|
||||||
|
comp = createComponent(serviceScheduler,
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.NEVER,
|
||||||
|
2, 0);
|
||||||
|
Collection<ComponentInstance> component1Instances =
|
||||||
|
comp.getAllComponentInstances();
|
||||||
|
|
||||||
|
containerStatus.setExitStatus(-1);
|
||||||
|
|
||||||
|
Component comp2 = createComponent(
|
||||||
|
componentInstance.getComponent().getScheduler(),
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.NEVER,
|
||||||
|
2, 1);
|
||||||
|
|
||||||
|
Collection<ComponentInstance> component2Instances =
|
||||||
|
comp2.getAllComponentInstances();
|
||||||
|
|
||||||
|
Map<String, ComponentInstance> failed2Instances = new HashMap<>();
|
||||||
|
|
||||||
|
for (ComponentInstance component2Instance : component2Instances) {
|
||||||
|
failed2Instances.put(component2Instance.getCompInstanceName(),
|
||||||
|
component2Instance);
|
||||||
|
when(component2Instance.getComponent().getFailedInstances()).thenReturn(
|
||||||
|
failed2Instances.values());
|
||||||
|
when(component2Instance.getComponent().getNumFailedInstances())
|
||||||
|
.thenReturn(new Long(failed2Instances.size()));
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, ComponentInstance> failed1Instances = new HashMap<>();
|
||||||
|
|
||||||
|
// 2nd component, already finished.
|
||||||
|
for (ComponentInstance component1Instance : component1Instances) {
|
||||||
|
failed1Instances.put(component1Instance.getCompInstanceName(),
|
||||||
|
component1Instance);
|
||||||
|
when(component1Instance.getComponent().getFailedInstances()).thenReturn(
|
||||||
|
failed1Instances.values());
|
||||||
|
when(component1Instance.getComponent().getNumFailedInstances())
|
||||||
|
.thenReturn(new Long(failed1Instances.size()));
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
verify(comp, times(2)).markAsFailed(any(ComponentInstance.class));
|
||||||
|
verify(comp, times(0)).reInsertPendingInstance(
|
||||||
|
any(ComponentInstance.class));
|
||||||
|
|
||||||
|
verify(serviceScheduler.getTerminationHandler(), times(1)).terminate(
|
||||||
|
eq(-1));
|
||||||
|
|
||||||
|
// Test case9: 2 components, 2 instances for each
|
||||||
|
// comp2 already finished.
|
||||||
|
// comp1 has a new instance finish, we should terminate the service
|
||||||
|
// All instance finish with 0, service should exit with 0 as well.
|
||||||
|
containerStatus.setExitStatus(0);
|
||||||
|
|
||||||
|
comp = createComponent(serviceScheduler,
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.ON_FAILURE,
|
||||||
|
2, 0);
|
||||||
|
component1Instances = comp.getAllComponentInstances();
|
||||||
|
|
||||||
|
comp2 = createComponent(componentInstance.getComponent().getScheduler(),
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.ON_FAILURE,
|
||||||
|
2, 1);
|
||||||
|
|
||||||
|
component2Instances = comp2.getAllComponentInstances();
|
||||||
|
|
||||||
|
Map<String, ComponentInstance> succeeded2Instances = new HashMap<>();
|
||||||
|
|
||||||
|
for (ComponentInstance component2Instance : component2Instances) {
|
||||||
|
succeeded2Instances.put(component2Instance.getCompInstanceName(),
|
||||||
|
component2Instance);
|
||||||
|
when(component2Instance.getComponent().getSucceededInstances())
|
||||||
|
.thenReturn(succeeded2Instances.values());
|
||||||
|
when(component2Instance.getComponent().getNumSucceededInstances())
|
||||||
|
.thenReturn(new Long(succeeded2Instances.size()));
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, ComponentInstance> succeeded1Instances = new HashMap<>();
|
||||||
|
// 2nd component, already finished.
|
||||||
|
for (ComponentInstance component1Instance : component1Instances) {
|
||||||
|
succeeded1Instances.put(component1Instance.getCompInstanceName(),
|
||||||
|
component1Instance);
|
||||||
|
when(component1Instance.getComponent().getSucceededInstances())
|
||||||
|
.thenReturn(succeeded1Instances.values());
|
||||||
|
when(component1Instance.getComponent().getNumSucceededInstances())
|
||||||
|
.thenReturn(new Long(succeeded1Instances.size()));
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
||||||
|
verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance(
|
||||||
|
any(ComponentInstance.class));
|
||||||
|
verify(serviceScheduler.getTerminationHandler(), times(1)).terminate(eq(0));
|
||||||
|
|
||||||
|
// Test case10: 2 components, 2 instances for each
|
||||||
|
// comp2 hasn't finished
|
||||||
|
// comp1 finished.
|
||||||
|
// Service should continue run.
|
||||||
|
|
||||||
|
comp = createComponent(serviceScheduler,
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.NEVER,
|
||||||
|
2, 0);
|
||||||
|
component1Instances = comp.getAllComponentInstances();
|
||||||
|
|
||||||
|
comp2 = createComponent(componentInstance.getComponent().getScheduler(),
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Component
|
||||||
|
.RestartPolicyEnum.NEVER,
|
||||||
|
2, 1);
|
||||||
|
|
||||||
|
component2Instances = comp2.getAllComponentInstances();
|
||||||
|
|
||||||
|
for (ComponentInstance component2Instance : component2Instances) {
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
succeeded1Instances = new HashMap<>();
|
||||||
|
// 2nd component, already finished.
|
||||||
|
for (ComponentInstance component1Instance : component1Instances) {
|
||||||
|
succeeded1Instances.put(component1Instance.getCompInstanceName(),
|
||||||
|
component1Instance);
|
||||||
|
when(component1Instance.getComponent().getSucceededInstances())
|
||||||
|
.thenReturn(succeeded1Instances.values());
|
||||||
|
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
|
||||||
|
componentInstanceEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
||||||
|
verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance(
|
||||||
|
any(ComponentInstance.class));
|
||||||
|
verify(serviceScheduler.getTerminationHandler(), never()).terminate(eq(0));
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -233,6 +233,8 @@ One or more components of the service. If the service is HBase say, then the com
|
||||||
|placement_policy|Advanced scheduling and placement policies for all containers of this component.|false|PlacementPolicy||
|
|placement_policy|Advanced scheduling and placement policies for all containers of this component.|false|PlacementPolicy||
|
||||||
|configuration|Config properties for this component.|false|Configuration||
|
|configuration|Config properties for this component.|false|Configuration||
|
||||||
|quicklinks|A list of quicklink keys defined at the service level, and to be resolved by this component.|false|string array||
|
|quicklinks|A list of quicklink keys defined at the service level, and to be resolved by this component.|false|string array||
|
||||||
|
|restart_policy|Policy of restart component. Including ALWAYS (Always restart
|
||||||
|
component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases). Flexing is not supported for components which have restart_policy=ON_FAILURE/NEVER|false|string|ALWAYS|
|
||||||
|
|
||||||
|
|
||||||
### ComponentState
|
### ComponentState
|
||||||
|
|
Loading…
Reference in New Issue