YARN-8488. Added SUCCEEDED/FAILED states to YARN service for branch-3.1.
Contributed by Chandni Singh
This commit is contained in:
parent
5e2f123748
commit
7741106fd0
|
@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
|
import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
|
||||||
|
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||||
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||||
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
|
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
|
||||||
|
@ -80,6 +81,8 @@ import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
||||||
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
|
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
|
||||||
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
||||||
import org.apache.hadoop.yarn.util.BoundedAppender;
|
import org.apache.hadoop.yarn.util.BoundedAppender;
|
||||||
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -102,7 +105,8 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
|
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
|
||||||
import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
|
import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
|
||||||
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_AFTER_APP_COMPLETION;
|
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus
|
||||||
|
.KILLED_AFTER_APP_COMPLETION;
|
||||||
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
|
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
|
||||||
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
|
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
|
||||||
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
|
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
|
||||||
|
@ -137,6 +141,8 @@ public class ServiceScheduler extends CompositeService {
|
||||||
|
|
||||||
private ServiceTimelinePublisher serviceTimelinePublisher;
|
private ServiceTimelinePublisher serviceTimelinePublisher;
|
||||||
|
|
||||||
|
private boolean timelineServiceEnabled;
|
||||||
|
|
||||||
// Global diagnostics that will be reported to RM on eRxit.
|
// Global diagnostics that will be reported to RM on eRxit.
|
||||||
// The unit the number of characters. This will be limited to 64 * 1024
|
// The unit the number of characters. This will be limited to 64 * 1024
|
||||||
// characters.
|
// characters.
|
||||||
|
@ -169,6 +175,8 @@ public class ServiceScheduler extends CompositeService {
|
||||||
private volatile FinalApplicationStatus finalApplicationStatus =
|
private volatile FinalApplicationStatus finalApplicationStatus =
|
||||||
FinalApplicationStatus.ENDED;
|
FinalApplicationStatus.ENDED;
|
||||||
|
|
||||||
|
private Clock systemClock;
|
||||||
|
|
||||||
// For unit test override since we don't want to terminate UT process.
|
// For unit test override since we don't want to terminate UT process.
|
||||||
private ServiceUtils.ProcessTerminationHandler
|
private ServiceUtils.ProcessTerminationHandler
|
||||||
terminationHandler = new ServiceUtils.ProcessTerminationHandler();
|
terminationHandler = new ServiceUtils.ProcessTerminationHandler();
|
||||||
|
@ -176,6 +184,8 @@ public class ServiceScheduler extends CompositeService {
|
||||||
public ServiceScheduler(ServiceContext context) {
|
public ServiceScheduler(ServiceContext context) {
|
||||||
super(context.getService().getName());
|
super(context.getService().getName());
|
||||||
this.context = context;
|
this.context = context;
|
||||||
|
this.app = context.getService();
|
||||||
|
this.systemClock = SystemClock.getInstance();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void buildInstance(ServiceContext context, Configuration configuration)
|
public void buildInstance(ServiceContext context, Configuration configuration)
|
||||||
|
@ -254,8 +264,14 @@ public class ServiceScheduler extends CompositeService {
|
||||||
YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS,
|
YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS,
|
||||||
app.getConfiguration(), getConfig());
|
app.getConfiguration(), getConfig());
|
||||||
|
|
||||||
|
if (YarnConfiguration
|
||||||
|
.timelineServiceV2Enabled(getConfig())) {
|
||||||
|
timelineServiceEnabled = true;
|
||||||
|
}
|
||||||
|
|
||||||
serviceManager = createServiceManager();
|
serviceManager = createServiceManager();
|
||||||
context.setServiceManager(serviceManager);
|
context.setServiceManager(serviceManager);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected YarnRegistryViewForProviders createYarnRegistryOperations(
|
protected YarnRegistryViewForProviders createYarnRegistryOperations(
|
||||||
|
@ -311,21 +327,38 @@ public class ServiceScheduler extends CompositeService {
|
||||||
// only stop the entire service when a graceful stop has been initiated
|
// only stop the entire service when a graceful stop has been initiated
|
||||||
// (e.g. via client RPC, not through the AM receiving a SIGTERM)
|
// (e.g. via client RPC, not through the AM receiving a SIGTERM)
|
||||||
if (gracefulStop) {
|
if (gracefulStop) {
|
||||||
|
|
||||||
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
|
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
|
||||||
// mark component-instances/containers as STOPPED
|
|
||||||
for (ContainerId containerId : getLiveInstances().keySet()) {
|
// mark other component-instances/containers as STOPPED
|
||||||
serviceTimelinePublisher.componentInstanceFinished(containerId,
|
final Map<ContainerId, ComponentInstance> liveInst =
|
||||||
KILLED_AFTER_APP_COMPLETION, diagnostics.toString());
|
getLiveInstances();
|
||||||
|
for (Map.Entry<ContainerId, ComponentInstance> instance : liveInst
|
||||||
|
.entrySet()) {
|
||||||
|
if (!ComponentInstance.isFinalState(
|
||||||
|
instance.getValue().getContainerSpec().getState())) {
|
||||||
|
LOG.info("{} Component instance state changed from {} to {}",
|
||||||
|
instance.getValue().getCompInstanceName(),
|
||||||
|
instance.getValue().getContainerSpec().getState(),
|
||||||
|
ContainerState.STOPPED);
|
||||||
|
serviceTimelinePublisher.componentInstanceFinished(
|
||||||
|
instance.getKey(), KILLED_AFTER_APP_COMPLETION,
|
||||||
|
ContainerState.STOPPED, getDiagnostics().toString());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Service state changed to {}", finalApplicationStatus);
|
||||||
// mark attempt as unregistered
|
// mark attempt as unregistered
|
||||||
serviceTimelinePublisher
|
serviceTimelinePublisher.serviceAttemptUnregistered(context,
|
||||||
.serviceAttemptUnregistered(context, diagnostics.toString());
|
finalApplicationStatus, diagnostics.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
// unregister AM
|
// unregister AM
|
||||||
amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED,
|
amRMClient.unregisterApplicationMaster(finalApplicationStatus,
|
||||||
diagnostics.toString(), "");
|
diagnostics.toString(), "");
|
||||||
LOG.info("Service {} unregistered with RM, with attemptId = {} " +
|
LOG.info("Service {} unregistered with RM, with attemptId = {} "
|
||||||
", diagnostics = {} ", app.getName(), context.attemptId, diagnostics);
|
+ ", diagnostics = {} ", app.getName(), context.attemptId,
|
||||||
|
diagnostics);
|
||||||
}
|
}
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
@ -911,7 +944,7 @@ public class ServiceScheduler extends CompositeService {
|
||||||
* (which #failed-instances + #suceeded-instances = #total-n-containers)
|
* (which #failed-instances + #suceeded-instances = #total-n-containers)
|
||||||
* The service will be terminated.
|
* The service will be terminated.
|
||||||
*/
|
*/
|
||||||
public synchronized void terminateServiceIfAllComponentsFinished() {
|
public void terminateServiceIfAllComponentsFinished() {
|
||||||
boolean shouldTerminate = true;
|
boolean shouldTerminate = true;
|
||||||
|
|
||||||
// Succeeded comps and failed comps, for logging purposes.
|
// Succeeded comps and failed comps, for logging purposes.
|
||||||
|
@ -920,7 +953,30 @@ public class ServiceScheduler extends CompositeService {
|
||||||
|
|
||||||
for (Component comp : getAllComponents().values()) {
|
for (Component comp : getAllComponents().values()) {
|
||||||
ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
|
ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
|
||||||
if (!restartPolicy.shouldTerminate(comp)) {
|
|
||||||
|
if (restartPolicy.shouldTerminate(comp)) {
|
||||||
|
if (restartPolicy.hasCompletedSuccessfully(comp)) {
|
||||||
|
comp.getComponentSpec().setState(org.apache.hadoop
|
||||||
|
.yarn.service.api.records.ComponentState.SUCCEEDED);
|
||||||
|
LOG.info("{} Component state changed from {} to {}",
|
||||||
|
comp.getName(), comp.getComponentSpec().getState(),
|
||||||
|
org.apache.hadoop
|
||||||
|
.yarn.service.api.records.ComponentState.SUCCEEDED);
|
||||||
|
} else {
|
||||||
|
comp.getComponentSpec().setState(org.apache.hadoop
|
||||||
|
.yarn.service.api.records.ComponentState.FAILED);
|
||||||
|
LOG.info("{} Component state changed from {} to {}",
|
||||||
|
comp.getName(), comp.getComponentSpec().getState(),
|
||||||
|
org.apache.hadoop
|
||||||
|
.yarn.service.api.records.ComponentState.FAILED);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isTimelineServiceEnabled()) {
|
||||||
|
// record in ATS
|
||||||
|
serviceTimelinePublisher.componentFinished(comp.getComponentSpec(),
|
||||||
|
comp.getComponentSpec().getState(), systemClock.getTime());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
shouldTerminate = false;
|
shouldTerminate = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -944,14 +1000,26 @@ public class ServiceScheduler extends CompositeService {
|
||||||
LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils
|
LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils
|
||||||
.join(failedComponents, ",") + "]");
|
.join(failedComponents, ",") + "]");
|
||||||
|
|
||||||
|
int exitStatus = EXIT_SUCCESS;
|
||||||
if (failedComponents.isEmpty()) {
|
if (failedComponents.isEmpty()) {
|
||||||
setGracefulStop(FinalApplicationStatus.SUCCEEDED);
|
setGracefulStop(FinalApplicationStatus.SUCCEEDED);
|
||||||
getTerminationHandler().terminate(EXIT_SUCCESS);
|
app.setState(ServiceState.SUCCEEDED);
|
||||||
} else {
|
} else {
|
||||||
setGracefulStop(FinalApplicationStatus.FAILED);
|
setGracefulStop(FinalApplicationStatus.FAILED);
|
||||||
getTerminationHandler().terminate(EXIT_FALSE);
|
app.setState(ServiceState.FAILED);
|
||||||
|
exitStatus = EXIT_FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
getTerminationHandler().terminate(exitStatus);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Clock getSystemClock() {
|
||||||
|
return systemClock;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTimelineServiceEnabled() {
|
||||||
|
return timelineServiceEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ServiceUtils.ProcessTerminationHandler getTerminationHandler() {
|
public ServiceUtils.ProcessTerminationHandler getTerminationHandler() {
|
||||||
|
|
|
@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
@ApiModel(description = "The current state of a component.")
|
@ApiModel(description = "The current state of a component.")
|
||||||
public enum ComponentState {
|
public enum ComponentState {
|
||||||
FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING;
|
FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING, SUCCEEDED, FAILED;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,5 +26,6 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public enum ContainerState {
|
public enum ContainerState {
|
||||||
RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING;
|
RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING, SUCCEEDED,
|
||||||
|
FAILED;
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,5 +30,5 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
@javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
|
@javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
|
||||||
public enum ServiceState {
|
public enum ServiceState {
|
||||||
ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
|
ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
|
||||||
UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING;
|
UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED;
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.client.api.NMClient;
|
import org.apache.hadoop.yarn.client.api.NMClient;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -41,7 +42,9 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.service.ServiceScheduler;
|
import org.apache.hadoop.yarn.service.ServiceScheduler;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
||||||
|
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
||||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||||
import org.apache.hadoop.yarn.service.component.Component;
|
import org.apache.hadoop.yarn.service.component.Component;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
||||||
|
@ -68,6 +71,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||||
|
|
||||||
import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*;
|
import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus
|
||||||
|
.KILLED_AFTER_APP_COMPLETION;
|
||||||
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER;
|
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER;
|
||||||
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
|
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
|
||||||
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*;
|
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*;
|
||||||
|
@ -242,15 +248,22 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static void handleComponentInstanceRelaunch(
|
static void handleComponentInstanceRelaunch(ComponentInstance compInstance,
|
||||||
ComponentInstance compInstance, ComponentInstanceEvent event,
|
ComponentInstanceEvent event, boolean failureBeforeLaunch,
|
||||||
boolean failureBeforeLaunch) {
|
String containerDiag) {
|
||||||
Component comp = compInstance.getComponent();
|
Component comp = compInstance.getComponent();
|
||||||
|
|
||||||
// Do we need to relaunch the service?
|
// Do we need to relaunch the service?
|
||||||
boolean hasContainerFailed = hasContainerFailed(event.getStatus());
|
boolean hasContainerFailed = failureBeforeLaunch || hasContainerFailed(
|
||||||
|
event.getStatus());
|
||||||
|
|
||||||
ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
|
ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
|
||||||
|
ContainerState containerState =
|
||||||
|
hasContainerFailed ? ContainerState.FAILED : ContainerState.SUCCEEDED;
|
||||||
|
|
||||||
|
if (compInstance.getContainerSpec() != null) {
|
||||||
|
compInstance.getContainerSpec().setState(containerState);
|
||||||
|
}
|
||||||
|
|
||||||
if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) {
|
if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) {
|
||||||
// re-ask the failed container.
|
// re-ask the failed container.
|
||||||
|
@ -259,17 +272,31 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
|
|
||||||
StringBuilder builder = new StringBuilder();
|
StringBuilder builder = new StringBuilder();
|
||||||
builder.append(compInstance.getCompInstanceId()).append(": ");
|
builder.append(compInstance.getCompInstanceId()).append(": ");
|
||||||
builder.append(event.getContainerId()).append(" completed. Reinsert back to pending list and requested ");
|
builder.append(event.getContainerId()).append(
|
||||||
|
" completed. Reinsert back to pending list and requested ");
|
||||||
builder.append("a new container.").append(System.lineSeparator());
|
builder.append("a new container.").append(System.lineSeparator());
|
||||||
builder.append(" exitStatus=").append(failureBeforeLaunch ? null : event.getStatus().getExitStatus());
|
builder.append(" exitStatus=").append(
|
||||||
|
failureBeforeLaunch ? null : event.getStatus().getExitStatus());
|
||||||
builder.append(", diagnostics=");
|
builder.append(", diagnostics=");
|
||||||
builder.append(failureBeforeLaunch ? FAILED_BEFORE_LAUNCH_DIAG : event.getStatus().getDiagnostics());
|
builder.append(failureBeforeLaunch ?
|
||||||
|
FAILED_BEFORE_LAUNCH_DIAG :
|
||||||
|
event.getStatus().getDiagnostics());
|
||||||
|
|
||||||
if (event.getStatus().getExitStatus() != 0) {
|
if (event.getStatus() != null && event.getStatus().getExitStatus() != 0) {
|
||||||
LOG.error(builder.toString());
|
LOG.error(builder.toString());
|
||||||
} else{
|
} else{
|
||||||
LOG.info(builder.toString());
|
LOG.info(builder.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (compInstance.timelineServiceEnabled) {
|
||||||
|
// record in ATS
|
||||||
|
LOG.info("Publishing component instance status {} {} ",
|
||||||
|
event.getContainerId(), containerState);
|
||||||
|
compInstance.serviceTimelinePublisher.componentInstanceFinished(
|
||||||
|
event.getContainerId(), event.getStatus().getExitStatus(),
|
||||||
|
containerState, containerDiag);
|
||||||
|
}
|
||||||
|
|
||||||
} else{
|
} else{
|
||||||
// When no relaunch, update component's #succeeded/#failed
|
// When no relaunch, update component's #succeeded/#failed
|
||||||
// instances.
|
// instances.
|
||||||
|
@ -278,6 +305,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
} else{
|
} else{
|
||||||
comp.markAsSucceeded(compInstance);
|
comp.markAsSucceeded(compInstance);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (compInstance.timelineServiceEnabled) {
|
||||||
|
// record in ATS
|
||||||
|
compInstance.serviceTimelinePublisher.componentInstanceFinished(
|
||||||
|
event.getContainerId(), event.getStatus().getExitStatus(),
|
||||||
|
containerState, containerDiag);
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ?
|
LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ?
|
||||||
" succeeded" :
|
" succeeded" :
|
||||||
" failed") + " without retry, exitStatus=" + event.getStatus());
|
" failed") + " without retry, exitStatus=" + event.getStatus());
|
||||||
|
@ -287,8 +322,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
|
|
||||||
public static boolean hasContainerFailed(ContainerStatus containerStatus) {
|
public static boolean hasContainerFailed(ContainerStatus containerStatus) {
|
||||||
//Mark conainer as failed if we cant get its exit status i.e null?
|
//Mark conainer as failed if we cant get its exit status i.e null?
|
||||||
return containerStatus == null || containerStatus.getExitStatus() !=
|
return containerStatus == null || containerStatus
|
||||||
ContainerExitStatus.SUCCESS;
|
.getExitStatus() != ContainerExitStatus.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ContainerStoppedTransition extends BaseTransition {
|
private static class ContainerStoppedTransition extends BaseTransition {
|
||||||
|
@ -307,9 +342,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
ComponentInstanceEvent event) {
|
ComponentInstanceEvent event) {
|
||||||
|
|
||||||
Component comp = compInstance.component;
|
Component comp = compInstance.component;
|
||||||
String containerDiag =
|
String containerDiag = compInstance.getCompInstanceId() + ": " + (
|
||||||
compInstance.getCompInstanceId() + ": " + (failedBeforeLaunching ?
|
failedBeforeLaunching ?
|
||||||
FAILED_BEFORE_LAUNCH_DIAG : event.getStatus().getDiagnostics());
|
FAILED_BEFORE_LAUNCH_DIAG :
|
||||||
|
event.getStatus().getDiagnostics());
|
||||||
compInstance.diagnostics.append(containerDiag + System.lineSeparator());
|
compInstance.diagnostics.append(containerDiag + System.lineSeparator());
|
||||||
compInstance.cancelContainerStatusRetriever();
|
compInstance.cancelContainerStatusRetriever();
|
||||||
if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
|
if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
|
||||||
|
@ -329,36 +365,69 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
// Check if it exceeds the failure threshold, but only if health threshold
|
// Check if it exceeds the failure threshold, but only if health threshold
|
||||||
// monitor is not enabled
|
// monitor is not enabled
|
||||||
if (!comp.isHealthThresholdMonitorEnabled()
|
if (!comp.isHealthThresholdMonitorEnabled()
|
||||||
&& comp.currentContainerFailure
|
&& comp.currentContainerFailure.get()
|
||||||
.get() > comp.maxContainerFailurePerComp) {
|
> comp.maxContainerFailurePerComp) {
|
||||||
String exitDiag = MessageFormat.format(
|
String exitDiag = MessageFormat.format(
|
||||||
"[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. Shutting down now... "
|
"[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. "
|
||||||
+ System.lineSeparator(),
|
+ "Shutting down now... "
|
||||||
comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp);
|
+ System.lineSeparator(), comp.getName(),
|
||||||
|
comp.currentContainerFailure.get(),
|
||||||
|
comp.maxContainerFailurePerComp);
|
||||||
compInstance.diagnostics.append(exitDiag);
|
compInstance.diagnostics.append(exitDiag);
|
||||||
// append to global diagnostics that will be reported to RM.
|
// append to global diagnostics that will be reported to RM.
|
||||||
scheduler.getDiagnostics().append(containerDiag);
|
scheduler.getDiagnostics().append(containerDiag);
|
||||||
scheduler.getDiagnostics().append(exitDiag);
|
scheduler.getDiagnostics().append(exitDiag);
|
||||||
LOG.warn(exitDiag);
|
LOG.warn(exitDiag);
|
||||||
|
|
||||||
|
compInstance.getContainerSpec().setState(ContainerState.FAILED);
|
||||||
|
comp.getComponentSpec().setState(ComponentState.FAILED);
|
||||||
|
comp.getScheduler().getApp().setState(ServiceState.FAILED);
|
||||||
|
|
||||||
|
if (compInstance.timelineServiceEnabled) {
|
||||||
|
// record in ATS
|
||||||
|
compInstance.scheduler.getServiceTimelinePublisher()
|
||||||
|
.componentInstanceFinished(compInstance.getContainer().getId(),
|
||||||
|
failedBeforeLaunching ?
|
||||||
|
-1 :
|
||||||
|
event.getStatus().getExitStatus(), ContainerState.FAILED,
|
||||||
|
containerDiag);
|
||||||
|
|
||||||
|
// mark other component-instances/containers as STOPPED
|
||||||
|
for (ContainerId containerId : scheduler.getLiveInstances()
|
||||||
|
.keySet()) {
|
||||||
|
if (!compInstance.container.getId().equals(containerId)
|
||||||
|
&& !isFinalState(compInstance.getContainerSpec().getState())) {
|
||||||
|
compInstance.getContainerSpec().setState(ContainerState.STOPPED);
|
||||||
|
compInstance.scheduler.getServiceTimelinePublisher()
|
||||||
|
.componentInstanceFinished(containerId,
|
||||||
|
KILLED_AFTER_APP_COMPLETION, ContainerState.STOPPED,
|
||||||
|
scheduler.getDiagnostics().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
compInstance.scheduler.getServiceTimelinePublisher()
|
||||||
|
.componentFinished(comp.getComponentSpec(), ComponentState.FAILED,
|
||||||
|
scheduler.getSystemClock().getTime());
|
||||||
|
|
||||||
|
compInstance.scheduler.getServiceTimelinePublisher()
|
||||||
|
.serviceAttemptUnregistered(comp.getContext(),
|
||||||
|
FinalApplicationStatus.FAILED,
|
||||||
|
scheduler.getDiagnostics().toString());
|
||||||
|
}
|
||||||
|
|
||||||
shouldFailService = true;
|
shouldFailService = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!failedBeforeLaunching) {
|
if (!failedBeforeLaunching) {
|
||||||
// clean up registry
|
// clean up registry
|
||||||
// If the container failed before launching, no need to cleanup registry,
|
// If the container failed before launching, no need to cleanup
|
||||||
|
// registry,
|
||||||
// because it was not registered before.
|
// because it was not registered before.
|
||||||
// hdfs dir content will be overwritten when a new container gets started,
|
// hdfs dir content will be overwritten when a new container gets
|
||||||
|
// started,
|
||||||
// so no need remove.
|
// so no need remove.
|
||||||
compInstance.scheduler.executorService
|
compInstance.scheduler.executorService.submit(
|
||||||
.submit(() -> compInstance.cleanupRegistry(event.getContainerId()));
|
() -> compInstance.cleanupRegistry(event.getContainerId()));
|
||||||
|
|
||||||
if (compInstance.timelineServiceEnabled) {
|
|
||||||
// record in ATS
|
|
||||||
compInstance.serviceTimelinePublisher
|
|
||||||
.componentInstanceFinished(event.getContainerId(),
|
|
||||||
event.getStatus().getExitStatus(), containerDiag);
|
|
||||||
}
|
|
||||||
compInstance.containerSpec.setState(ContainerState.STOPPED);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove the failed ContainerId -> CompInstance mapping
|
// remove the failed ContainerId -> CompInstance mapping
|
||||||
|
@ -367,7 +436,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
// According to component restart policy, handle container restart
|
// According to component restart policy, handle container restart
|
||||||
// or finish the service (if all components finished)
|
// or finish the service (if all components finished)
|
||||||
handleComponentInstanceRelaunch(compInstance, event,
|
handleComponentInstanceRelaunch(compInstance, event,
|
||||||
failedBeforeLaunching);
|
failedBeforeLaunching, containerDiag);
|
||||||
|
|
||||||
if (shouldFailService) {
|
if (shouldFailService) {
|
||||||
scheduler.getTerminationHandler().terminate(-1);
|
scheduler.getTerminationHandler().terminate(-1);
|
||||||
|
@ -375,6 +444,11 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean isFinalState(ContainerState state) {
|
||||||
|
return ContainerState.FAILED.equals(state) || ContainerState.STOPPED
|
||||||
|
.equals(state) || ContainerState.SUCCEEDED.equals(state);
|
||||||
|
}
|
||||||
|
|
||||||
private static class ContainerUpgradeTransition extends BaseTransition {
|
private static class ContainerUpgradeTransition extends BaseTransition {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -586,7 +660,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
|
|
||||||
if (timelineServiceEnabled) {
|
if (timelineServiceEnabled) {
|
||||||
serviceTimelinePublisher.componentInstanceFinished(containerId,
|
serviceTimelinePublisher.componentInstanceFinished(containerId,
|
||||||
KILLED_BY_APPMASTER, diagnostics.toString());
|
KILLED_BY_APPMASTER, ContainerState.STOPPED, diagnostics.toString());
|
||||||
}
|
}
|
||||||
cancelContainerStatusRetriever();
|
cancelContainerStatusRetriever();
|
||||||
scheduler.executorService.submit(() ->
|
scheduler.executorService.submit(() ->
|
||||||
|
|
|
@ -32,5 +32,8 @@ public enum ServiceTimelineEvent {
|
||||||
|
|
||||||
COMPONENT_INSTANCE_IP_HOST_UPDATE,
|
COMPONENT_INSTANCE_IP_HOST_UPDATE,
|
||||||
|
|
||||||
COMPONENT_INSTANCE_BECOME_READY
|
COMPONENT_INSTANCE_BECOME_READY,
|
||||||
|
|
||||||
|
COMPONENT_FINISHED
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||||
import org.apache.hadoop.yarn.service.ServiceContext;
|
import org.apache.hadoop.yarn.service.ServiceContext;
|
||||||
import org.apache.hadoop.yarn.service.api.records.*;
|
import org.apache.hadoop.yarn.service.api.records.*;
|
||||||
|
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||||
|
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -42,7 +44,6 @@ import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.service.api.records.ContainerState.READY;
|
import static org.apache.hadoop.yarn.service.api.records.ContainerState.READY;
|
||||||
import static org.apache.hadoop.yarn.service.api.records.ContainerState.STOPPED;
|
|
||||||
import static org.apache.hadoop.yarn.service.timelineservice.ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO;
|
import static org.apache.hadoop.yarn.service.timelineservice.ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -130,12 +131,11 @@ public class ServiceTimelinePublisher extends CompositeService {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void serviceAttemptUnregistered(ServiceContext context,
|
public void serviceAttemptUnregistered(ServiceContext context,
|
||||||
String diagnostics) {
|
FinalApplicationStatus status, String diagnostics) {
|
||||||
TimelineEntity entity = createServiceAttemptEntity(
|
TimelineEntity entity = createServiceAttemptEntity(
|
||||||
context.attemptId.getApplicationId().toString());
|
context.attemptId.getApplicationId().toString());
|
||||||
Map<String, Object> entityInfos = new HashMap<String, Object>();
|
Map<String, Object> entityInfos = new HashMap<String, Object>();
|
||||||
entityInfos.put(ServiceTimelineMetricsConstants.STATE,
|
entityInfos.put(ServiceTimelineMetricsConstants.STATE, status);
|
||||||
FinalApplicationStatus.ENDED);
|
|
||||||
entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
|
entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
|
||||||
entity.addInfo(entityInfos);
|
entity.addInfo(entityInfos);
|
||||||
|
|
||||||
|
@ -180,7 +180,7 @@ public class ServiceTimelinePublisher extends CompositeService {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void componentInstanceFinished(ContainerId containerId,
|
public void componentInstanceFinished(ContainerId containerId,
|
||||||
int exitCode, String diagnostics) {
|
int exitCode, ContainerState state, String diagnostics) {
|
||||||
TimelineEntity entity = createComponentInstanceEntity(
|
TimelineEntity entity = createComponentInstanceEntity(
|
||||||
containerId.toString());
|
containerId.toString());
|
||||||
|
|
||||||
|
@ -189,7 +189,7 @@ public class ServiceTimelinePublisher extends CompositeService {
|
||||||
entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE,
|
entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE,
|
||||||
exitCode);
|
exitCode);
|
||||||
entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
|
entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
|
||||||
entityInfos.put(ServiceTimelineMetricsConstants.STATE, STOPPED);
|
entityInfos.put(ServiceTimelineMetricsConstants.STATE, state);
|
||||||
entity.addInfo(entityInfos);
|
entity.addInfo(entityInfos);
|
||||||
|
|
||||||
// add an event
|
// add an event
|
||||||
|
@ -375,4 +375,25 @@ public class ServiceTimelinePublisher extends CompositeService {
|
||||||
log.error("Error when publishing entity " + entity, e);
|
log.error("Error when publishing entity " + entity, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void componentFinished(
|
||||||
|
Component comp,
|
||||||
|
ComponentState state, long finishTime) {
|
||||||
|
createComponentEntity(comp.getName());
|
||||||
|
TimelineEntity entity = createComponentEntity(comp.getName());
|
||||||
|
|
||||||
|
// create info keys
|
||||||
|
Map<String, Object> entityInfos = new HashMap<String, Object>();
|
||||||
|
entityInfos.put(ServiceTimelineMetricsConstants.STATE, state);
|
||||||
|
entity.addInfo(entityInfos);
|
||||||
|
|
||||||
|
// add an event
|
||||||
|
TimelineEvent startEvent = new TimelineEvent();
|
||||||
|
startEvent
|
||||||
|
.setId(ServiceTimelineEvent.COMPONENT_FINISHED.toString());
|
||||||
|
startEvent.setTimestamp(finishTime);
|
||||||
|
entity.addEvent(startEvent);
|
||||||
|
|
||||||
|
putEntity(entity);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
||||||
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
||||||
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
||||||
|
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -92,7 +93,18 @@ public class MockRunningServiceContext extends ServiceContext {
|
||||||
public ContainerLaunchService getContainerLaunchService() {
|
public ContainerLaunchService getContainerLaunchService() {
|
||||||
return mockLaunchService;
|
return mockLaunchService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public ServiceUtils.ProcessTerminationHandler
|
||||||
|
getTerminationHandler() {
|
||||||
|
return new
|
||||||
|
ServiceUtils.ProcessTerminationHandler() {
|
||||||
|
public void terminate(int exitCode) {
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
this.scheduler.init(fsWatcher.getConf());
|
this.scheduler.init(fsWatcher.getConf());
|
||||||
|
|
||||||
ServiceTestUtils.createServiceManager(this);
|
ServiceTestUtils.createServiceManager(this);
|
||||||
|
@ -116,8 +128,10 @@ public class MockRunningServiceContext extends ServiceContext {
|
||||||
Component component = new org.apache.hadoop.yarn.service.component.
|
Component component = new org.apache.hadoop.yarn.service.component.
|
||||||
Component(componentSpec, 1L, context);
|
Component(componentSpec, 1L, context);
|
||||||
componentState.put(component.getName(), component);
|
componentState.put(component.getName(), component);
|
||||||
component.handle(new ComponentEvent(component.getName(),
|
component.handle(
|
||||||
ComponentEventType.FLEX));
|
new ComponentEvent(component.getName(), ComponentEventType.FLEX)
|
||||||
|
.setDesired(
|
||||||
|
component.getComponentSpec().getNumberOfContainers()));
|
||||||
|
|
||||||
for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
|
for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
|
||||||
counter++;
|
counter++;
|
||||||
|
|
|
@ -63,7 +63,6 @@ import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
@ -119,14 +118,10 @@ public class ServiceTestUtils {
|
||||||
Component.RestartPolicyEnum.NEVER, null));
|
Component.RestartPolicyEnum.NEVER, null));
|
||||||
exampleApp.addComponent(
|
exampleApp.addComponent(
|
||||||
createComponent("terminating-comp2", 2, "sleep 1000",
|
createComponent("terminating-comp2", 2, "sleep 1000",
|
||||||
Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{
|
Component.RestartPolicyEnum.ON_FAILURE, null));
|
||||||
add("terminating-comp1");
|
|
||||||
}}));
|
|
||||||
exampleApp.addComponent(
|
exampleApp.addComponent(
|
||||||
createComponent("terminating-comp3", 2, "sleep 1000",
|
createComponent("terminating-comp3", 2, "sleep 1000",
|
||||||
Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{
|
Component.RestartPolicyEnum.ON_FAILURE, null));
|
||||||
add("terminating-comp2");
|
|
||||||
}}));
|
|
||||||
|
|
||||||
return exampleApp;
|
return exampleApp;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.service.ServiceTestUtils;
|
||||||
import org.apache.hadoop.yarn.service.TestServiceManager;
|
import org.apache.hadoop.yarn.service.TestServiceManager;
|
||||||
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||||
|
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
||||||
|
@ -147,7 +148,8 @@ public class TestComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testComponentStateUpdatesWithTerminatingComponents() throws
|
public void testComponentStateReachesStableStateWithTerminatingComponents()
|
||||||
|
throws
|
||||||
Exception {
|
Exception {
|
||||||
final String serviceName =
|
final String serviceName =
|
||||||
"testComponentStateUpdatesWithTerminatingComponents";
|
"testComponentStateUpdatesWithTerminatingComponents";
|
||||||
|
@ -198,6 +200,57 @@ public class TestComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testComponentStateUpdatesWithTerminatingComponents()
|
||||||
|
throws
|
||||||
|
Exception {
|
||||||
|
final String serviceName =
|
||||||
|
"testComponentStateUpdatesWithTerminatingComponents";
|
||||||
|
|
||||||
|
Service testService = ServiceTestUtils.createTerminatingJobExample(
|
||||||
|
serviceName);
|
||||||
|
TestServiceManager.createDef(serviceName, testService);
|
||||||
|
|
||||||
|
ServiceContext context = new MockRunningServiceContext(rule, testService);
|
||||||
|
|
||||||
|
for (Component comp : context.scheduler.getAllComponents().values()) {
|
||||||
|
Iterator<ComponentInstance> instanceIter = comp.
|
||||||
|
getAllComponentInstances().iterator();
|
||||||
|
|
||||||
|
while (instanceIter.hasNext()) {
|
||||||
|
|
||||||
|
ComponentInstance componentInstance = instanceIter.next();
|
||||||
|
Container instanceContainer = componentInstance.getContainer();
|
||||||
|
|
||||||
|
//stop 1 container
|
||||||
|
ContainerStatus containerStatus = ContainerStatus.newInstance(
|
||||||
|
instanceContainer.getId(),
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
|
||||||
|
"successful", 0);
|
||||||
|
comp.handle(new ComponentEvent(comp.getName(),
|
||||||
|
ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus)
|
||||||
|
.setContainerId(instanceContainer.getId()));
|
||||||
|
componentInstance.handle(
|
||||||
|
new ComponentInstanceEvent(componentInstance.getContainer().getId(),
|
||||||
|
ComponentInstanceEventType.STOP).setStatus(containerStatus));
|
||||||
|
}
|
||||||
|
|
||||||
|
ComponentState componentState =
|
||||||
|
comp.getComponentSpec().getState();
|
||||||
|
Assert.assertEquals(
|
||||||
|
ComponentState.SUCCEEDED,
|
||||||
|
componentState);
|
||||||
|
}
|
||||||
|
|
||||||
|
ServiceState serviceState =
|
||||||
|
testService.getState();
|
||||||
|
Assert.assertEquals(
|
||||||
|
ServiceState.SUCCEEDED,
|
||||||
|
serviceState);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private static org.apache.hadoop.yarn.service.api.records.Component
|
private static org.apache.hadoop.yarn.service.api.records.Component
|
||||||
createSpecWithEnv(String serviceName, String compName, String key,
|
createSpecWithEnv(String serviceName, String compName, String key,
|
||||||
String val) {
|
String val) {
|
||||||
|
|
|
@ -110,7 +110,6 @@ public class TestComponentRestartPolicy {
|
||||||
|
|
||||||
assertEquals(true, restartPolicy.isReadyForDownStream(component));
|
assertEquals(true, restartPolicy.isReadyForDownStream(component));
|
||||||
|
|
||||||
|
|
||||||
when(component.getNumSucceededInstances()).thenReturn(new Long(2));
|
when(component.getNumSucceededInstances()).thenReturn(new Long(2));
|
||||||
when(component.getNumFailedInstances()).thenReturn(new Long(1));
|
when(component.getNumFailedInstances()).thenReturn(new Long(1));
|
||||||
when(component.getNumDesiredInstances()).thenReturn(3);
|
when(component.getNumDesiredInstances()).thenReturn(3);
|
||||||
|
|
|
@ -204,6 +204,8 @@ public class TestComponentInstance {
|
||||||
when(componentInstance.getComponent()).thenReturn(component);
|
when(componentInstance.getComponent()).thenReturn(component);
|
||||||
when(componentInstance.getCompInstanceName()).thenReturn(
|
when(componentInstance.getCompInstanceName()).thenReturn(
|
||||||
"compInstance" + instanceId);
|
"compInstance" + instanceId);
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
when(componentInstance.getContainerSpec()).thenReturn(container);
|
||||||
|
|
||||||
ServiceUtils.ProcessTerminationHandler terminationHandler = mock(
|
ServiceUtils.ProcessTerminationHandler terminationHandler = mock(
|
||||||
ServiceUtils.ProcessTerminationHandler.class);
|
ServiceUtils.ProcessTerminationHandler.class);
|
||||||
|
@ -227,12 +229,15 @@ public class TestComponentInstance {
|
||||||
Mockito.doNothing().when(serviceScheduler).setGracefulStop(
|
Mockito.doNothing().when(serviceScheduler).setGracefulStop(
|
||||||
any(FinalApplicationStatus.class));
|
any(FinalApplicationStatus.class));
|
||||||
|
|
||||||
|
final String containerDiag = "Container succeeded";
|
||||||
|
|
||||||
ComponentInstanceEvent componentInstanceEvent = mock(
|
ComponentInstanceEvent componentInstanceEvent = mock(
|
||||||
ComponentInstanceEvent.class);
|
ComponentInstanceEvent.class);
|
||||||
ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId
|
ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId
|
||||||
.newInstance(ApplicationId.newInstance(1234L, 1), 1), 1);
|
.newInstance(ApplicationId.newInstance(1234L, 1), 1), 1);
|
||||||
ContainerStatus containerStatus = ContainerStatus.newInstance(containerId,
|
ContainerStatus containerStatus = ContainerStatus.newInstance(containerId,
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, "hello", 0);
|
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
|
||||||
|
containerDiag, 0);
|
||||||
|
|
||||||
when(componentInstanceEvent.getStatus()).thenReturn(containerStatus);
|
when(componentInstanceEvent.getStatus()).thenReturn(containerStatus);
|
||||||
|
|
||||||
|
@ -245,7 +250,7 @@ public class TestComponentInstance {
|
||||||
comp.getAllComponentInstances().iterator().next();
|
comp.getAllComponentInstances().iterator().next();
|
||||||
|
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
|
|
||||||
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
||||||
|
@ -262,7 +267,7 @@ public class TestComponentInstance {
|
||||||
componentInstance = comp.getAllComponentInstances().iterator().next();
|
componentInstance = comp.getAllComponentInstances().iterator().next();
|
||||||
containerStatus.setExitStatus(1);
|
containerStatus.setExitStatus(1);
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
||||||
verify(comp, times(1)).reInsertPendingInstance(
|
verify(comp, times(1)).reInsertPendingInstance(
|
||||||
|
@ -286,7 +291,7 @@ public class TestComponentInstance {
|
||||||
when(comp.getNumSucceededInstances()).thenReturn(new Long(1));
|
when(comp.getNumSucceededInstances()).thenReturn(new Long(1));
|
||||||
|
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
verify(comp, times(1)).markAsSucceeded(any(ComponentInstance.class));
|
verify(comp, times(1)).markAsSucceeded(any(ComponentInstance.class));
|
||||||
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
||||||
verify(comp, times(0)).reInsertPendingInstance(
|
verify(comp, times(0)).reInsertPendingInstance(
|
||||||
|
@ -304,8 +309,7 @@ public class TestComponentInstance {
|
||||||
|
|
||||||
when(comp.getNumFailedInstances()).thenReturn(new Long(1));
|
when(comp.getNumFailedInstances()).thenReturn(new Long(1));
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
|
|
||||||
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
verify(comp, times(1)).markAsFailed(any(ComponentInstance.class));
|
verify(comp, times(1)).markAsFailed(any(ComponentInstance.class));
|
||||||
verify(comp, times(0)).reInsertPendingInstance(
|
verify(comp, times(0)).reInsertPendingInstance(
|
||||||
|
@ -323,7 +327,7 @@ public class TestComponentInstance {
|
||||||
componentInstance = comp.getAllComponentInstances().iterator().next();
|
componentInstance = comp.getAllComponentInstances().iterator().next();
|
||||||
containerStatus.setExitStatus(1);
|
containerStatus.setExitStatus(1);
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
||||||
verify(comp, times(1)).reInsertPendingInstance(
|
verify(comp, times(1)).reInsertPendingInstance(
|
||||||
|
@ -340,7 +344,7 @@ public class TestComponentInstance {
|
||||||
componentInstance = comp.getAllComponentInstances().iterator().next();
|
componentInstance = comp.getAllComponentInstances().iterator().next();
|
||||||
containerStatus.setExitStatus(1);
|
containerStatus.setExitStatus(1);
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
verify(comp, times(1)).markAsFailed(any(ComponentInstance.class));
|
verify(comp, times(1)).markAsFailed(any(ComponentInstance.class));
|
||||||
verify(comp, times(0)).reInsertPendingInstance(
|
verify(comp, times(0)).reInsertPendingInstance(
|
||||||
|
@ -363,8 +367,7 @@ public class TestComponentInstance {
|
||||||
containerStatus.setExitStatus(1);
|
containerStatus.setExitStatus(1);
|
||||||
ComponentInstance commponentInstance = iter.next();
|
ComponentInstance commponentInstance = iter.next();
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(commponentInstance,
|
ComponentInstance.handleComponentInstanceRelaunch(commponentInstance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
|
|
||||||
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
verify(comp, never()).markAsFailed(any(ComponentInstance.class));
|
||||||
verify(comp, times(1)).reInsertPendingInstance(
|
verify(comp, times(1)).reInsertPendingInstance(
|
||||||
|
@ -404,7 +407,7 @@ public class TestComponentInstance {
|
||||||
when(component2Instance.getComponent().getNumFailedInstances())
|
when(component2Instance.getComponent().getNumFailedInstances())
|
||||||
.thenReturn(new Long(failed2Instances.size()));
|
.thenReturn(new Long(failed2Instances.size()));
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
|
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, ComponentInstance> failed1Instances = new HashMap<>();
|
Map<String, ComponentInstance> failed1Instances = new HashMap<>();
|
||||||
|
@ -418,7 +421,7 @@ public class TestComponentInstance {
|
||||||
when(component1Instance.getComponent().getNumFailedInstances())
|
when(component1Instance.getComponent().getNumFailedInstances())
|
||||||
.thenReturn(new Long(failed1Instances.size()));
|
.thenReturn(new Long(failed1Instances.size()));
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
|
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
}
|
}
|
||||||
|
|
||||||
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
@ -458,7 +461,7 @@ public class TestComponentInstance {
|
||||||
when(component2Instance.getComponent().getNumSucceededInstances())
|
when(component2Instance.getComponent().getNumSucceededInstances())
|
||||||
.thenReturn(new Long(succeeded2Instances.size()));
|
.thenReturn(new Long(succeeded2Instances.size()));
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
|
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, ComponentInstance> succeeded1Instances = new HashMap<>();
|
Map<String, ComponentInstance> succeeded1Instances = new HashMap<>();
|
||||||
|
@ -471,7 +474,7 @@ public class TestComponentInstance {
|
||||||
when(component1Instance.getComponent().getNumSucceededInstances())
|
when(component1Instance.getComponent().getNumSucceededInstances())
|
||||||
.thenReturn(new Long(succeeded1Instances.size()));
|
.thenReturn(new Long(succeeded1Instances.size()));
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
|
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
}
|
}
|
||||||
|
|
||||||
verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class));
|
verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
@ -500,7 +503,7 @@ public class TestComponentInstance {
|
||||||
|
|
||||||
for (ComponentInstance component2Instance : component2Instances) {
|
for (ComponentInstance component2Instance : component2Instances) {
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
|
ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
}
|
}
|
||||||
|
|
||||||
succeeded1Instances = new HashMap<>();
|
succeeded1Instances = new HashMap<>();
|
||||||
|
@ -511,7 +514,7 @@ public class TestComponentInstance {
|
||||||
when(component1Instance.getComponent().getSucceededInstances())
|
when(component1Instance.getComponent().getSucceededInstances())
|
||||||
.thenReturn(succeeded1Instances.values());
|
.thenReturn(succeeded1Instances.values());
|
||||||
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
|
ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
|
||||||
componentInstanceEvent, false);
|
componentInstanceEvent, false, containerDiag);
|
||||||
}
|
}
|
||||||
|
|
||||||
verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class));
|
verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class));
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.timelineservice;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
|
||||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||||
|
@ -122,7 +123,8 @@ public class TestServiceTimelinePublisher {
|
||||||
context.attemptId = ApplicationAttemptId
|
context.attemptId = ApplicationAttemptId
|
||||||
.newInstance(ApplicationId.fromString(service.getId()), 1);
|
.newInstance(ApplicationId.fromString(service.getId()), 1);
|
||||||
String exitDiags = "service killed";
|
String exitDiags = "service killed";
|
||||||
serviceTimelinePublisher.serviceAttemptUnregistered(context, exitDiags);
|
serviceTimelinePublisher.serviceAttemptUnregistered(context,
|
||||||
|
FinalApplicationStatus.ENDED, exitDiags);
|
||||||
lastPublishedEntities =
|
lastPublishedEntities =
|
||||||
((DummyTimelineClient) timelineClient).getLastPublishedEntities();
|
((DummyTimelineClient) timelineClient).getLastPublishedEntities();
|
||||||
for (TimelineEntity timelineEntity : lastPublishedEntities) {
|
for (TimelineEntity timelineEntity : lastPublishedEntities) {
|
||||||
|
|
Loading…
Reference in New Issue