YARN-8488. Added SUCCEEDED/FAILED states to YARN service.

Contributed by Suma Shivaprasad
This commit is contained in:
Eric Yang 2018-08-28 13:55:28 -04:00
parent c61824a189
commit fd089caf69
13 changed files with 322 additions and 89 deletions

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.service.api.ServiceApiConstants; import org.apache.hadoop.yarn.service.api.ServiceApiConstants;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.api.records.ConfigFile; import org.apache.hadoop.yarn.service.api.records.ConfigFile;
@ -80,6 +81,8 @@ import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils; import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.util.BoundedAppender; import org.apache.hadoop.yarn.util.BoundedAppender;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -102,7 +105,8 @@ import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.registry.client.api.RegistryConstants.*; import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_AFTER_APP_COMPLETION; import static org.apache.hadoop.yarn.api.records.ContainerExitStatus
.KILLED_AFTER_APP_COMPLETION;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*; import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
@ -137,6 +141,8 @@ public class ServiceScheduler extends CompositeService {
private ServiceTimelinePublisher serviceTimelinePublisher; private ServiceTimelinePublisher serviceTimelinePublisher;
private boolean timelineServiceEnabled;
// Global diagnostics that will be reported to RM on eRxit. // Global diagnostics that will be reported to RM on eRxit.
// The unit the number of characters. This will be limited to 64 * 1024 // The unit the number of characters. This will be limited to 64 * 1024
// characters. // characters.
@ -169,6 +175,8 @@ public class ServiceScheduler extends CompositeService {
private volatile FinalApplicationStatus finalApplicationStatus = private volatile FinalApplicationStatus finalApplicationStatus =
FinalApplicationStatus.ENDED; FinalApplicationStatus.ENDED;
private Clock systemClock;
// For unit test override since we don't want to terminate UT process. // For unit test override since we don't want to terminate UT process.
private ServiceUtils.ProcessTerminationHandler private ServiceUtils.ProcessTerminationHandler
terminationHandler = new ServiceUtils.ProcessTerminationHandler(); terminationHandler = new ServiceUtils.ProcessTerminationHandler();
@ -176,6 +184,8 @@ public class ServiceScheduler extends CompositeService {
public ServiceScheduler(ServiceContext context) { public ServiceScheduler(ServiceContext context) {
super(context.getService().getName()); super(context.getService().getName());
this.context = context; this.context = context;
this.app = context.getService();
this.systemClock = SystemClock.getInstance();
} }
public void buildInstance(ServiceContext context, Configuration configuration) public void buildInstance(ServiceContext context, Configuration configuration)
@ -254,8 +264,14 @@ public class ServiceScheduler extends CompositeService {
YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS, YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS,
app.getConfiguration(), getConfig()); app.getConfiguration(), getConfig());
if (YarnConfiguration
.timelineServiceV2Enabled(getConfig())) {
timelineServiceEnabled = true;
}
serviceManager = createServiceManager(); serviceManager = createServiceManager();
context.setServiceManager(serviceManager); context.setServiceManager(serviceManager);
} }
protected YarnRegistryViewForProviders createYarnRegistryOperations( protected YarnRegistryViewForProviders createYarnRegistryOperations(
@ -311,21 +327,38 @@ public class ServiceScheduler extends CompositeService {
// only stop the entire service when a graceful stop has been initiated // only stop the entire service when a graceful stop has been initiated
// (e.g. via client RPC, not through the AM receiving a SIGTERM) // (e.g. via client RPC, not through the AM receiving a SIGTERM)
if (gracefulStop) { if (gracefulStop) {
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
// mark component-instances/containers as STOPPED
for (ContainerId containerId : getLiveInstances().keySet()) { // mark other component-instances/containers as STOPPED
serviceTimelinePublisher.componentInstanceFinished(containerId, final Map<ContainerId, ComponentInstance> liveInst =
KILLED_AFTER_APP_COMPLETION, diagnostics.toString()); getLiveInstances();
for (Map.Entry<ContainerId, ComponentInstance> instance : liveInst
.entrySet()) {
if (!ComponentInstance.isFinalState(
instance.getValue().getContainerSpec().getState())) {
LOG.info("{} Component instance state changed from {} to {}",
instance.getValue().getCompInstanceName(),
instance.getValue().getContainerSpec().getState(),
ContainerState.STOPPED);
serviceTimelinePublisher.componentInstanceFinished(
instance.getKey(), KILLED_AFTER_APP_COMPLETION,
ContainerState.STOPPED, getDiagnostics().toString());
}
} }
LOG.info("Service state changed to {}", finalApplicationStatus);
// mark attempt as unregistered // mark attempt as unregistered
serviceTimelinePublisher serviceTimelinePublisher.serviceAttemptUnregistered(context,
.serviceAttemptUnregistered(context, diagnostics.toString()); finalApplicationStatus, diagnostics.toString());
} }
// unregister AM // unregister AM
amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED, amRMClient.unregisterApplicationMaster(finalApplicationStatus,
diagnostics.toString(), ""); diagnostics.toString(), "");
LOG.info("Service {} unregistered with RM, with attemptId = {} " + LOG.info("Service {} unregistered with RM, with attemptId = {} "
", diagnostics = {} ", app.getName(), context.attemptId, diagnostics); + ", diagnostics = {} ", app.getName(), context.attemptId,
diagnostics);
} }
super.serviceStop(); super.serviceStop();
} }
@ -911,7 +944,7 @@ public class ServiceScheduler extends CompositeService {
* (which #failed-instances + #suceeded-instances = #total-n-containers) * (which #failed-instances + #suceeded-instances = #total-n-containers)
* The service will be terminated. * The service will be terminated.
*/ */
public synchronized void terminateServiceIfAllComponentsFinished() { public void terminateServiceIfAllComponentsFinished() {
boolean shouldTerminate = true; boolean shouldTerminate = true;
// Succeeded comps and failed comps, for logging purposes. // Succeeded comps and failed comps, for logging purposes.
@ -920,7 +953,30 @@ public class ServiceScheduler extends CompositeService {
for (Component comp : getAllComponents().values()) { for (Component comp : getAllComponents().values()) {
ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler(); ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
if (!restartPolicy.shouldTerminate(comp)) {
if (restartPolicy.shouldTerminate(comp)) {
if (restartPolicy.hasCompletedSuccessfully(comp)) {
comp.getComponentSpec().setState(org.apache.hadoop
.yarn.service.api.records.ComponentState.SUCCEEDED);
LOG.info("{} Component state changed from {} to {}",
comp.getName(), comp.getComponentSpec().getState(),
org.apache.hadoop
.yarn.service.api.records.ComponentState.SUCCEEDED);
} else {
comp.getComponentSpec().setState(org.apache.hadoop
.yarn.service.api.records.ComponentState.FAILED);
LOG.info("{} Component state changed from {} to {}",
comp.getName(), comp.getComponentSpec().getState(),
org.apache.hadoop
.yarn.service.api.records.ComponentState.FAILED);
}
if (isTimelineServiceEnabled()) {
// record in ATS
serviceTimelinePublisher.componentFinished(comp.getComponentSpec(),
comp.getComponentSpec().getState(), systemClock.getTime());
}
} else {
shouldTerminate = false; shouldTerminate = false;
break; break;
} }
@ -929,7 +985,7 @@ public class ServiceScheduler extends CompositeService {
if (nFailed > 0) { if (nFailed > 0) {
failedComponents.add(comp.getName()); failedComponents.add(comp.getName());
} else{ } else {
succeededComponents.add(comp.getName()); succeededComponents.add(comp.getName());
} }
} }
@ -944,16 +1000,28 @@ public class ServiceScheduler extends CompositeService {
LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils
.join(failedComponents, ",") + "]"); .join(failedComponents, ",") + "]");
int exitStatus = EXIT_SUCCESS;
if (failedComponents.isEmpty()) { if (failedComponents.isEmpty()) {
setGracefulStop(FinalApplicationStatus.SUCCEEDED); setGracefulStop(FinalApplicationStatus.SUCCEEDED);
getTerminationHandler().terminate(EXIT_SUCCESS); app.setState(ServiceState.SUCCEEDED);
} else{ } else {
setGracefulStop(FinalApplicationStatus.FAILED); setGracefulStop(FinalApplicationStatus.FAILED);
getTerminationHandler().terminate(EXIT_FALSE); app.setState(ServiceState.FAILED);
exitStatus = EXIT_FALSE;
} }
getTerminationHandler().terminate(exitStatus);
} }
} }
public Clock getSystemClock() {
return systemClock;
}
public boolean isTimelineServiceEnabled() {
return timelineServiceEnabled;
}
public ServiceUtils.ProcessTerminationHandler getTerminationHandler() { public ServiceUtils.ProcessTerminationHandler getTerminationHandler() {
return terminationHandler; return terminationHandler;
} }

View File

@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable @InterfaceStability.Unstable
@ApiModel(description = "The current state of a component.") @ApiModel(description = "The current state of a component.")
public enum ComponentState { public enum ComponentState {
FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING; FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING, SUCCEEDED, FAILED;
} }

View File

@ -26,5 +26,6 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable
public enum ContainerState { public enum ContainerState {
RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING; RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING, SUCCEEDED,
FAILED;
} }

View File

@ -30,5 +30,5 @@ import org.apache.hadoop.classification.InterfaceStability;
@javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
public enum ServiceState { public enum ServiceState {
ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING, ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING; UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED;
} }

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -41,7 +42,9 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType; import org.apache.hadoop.yarn.service.component.ComponentEventType;
@ -68,6 +71,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*; import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*;
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus
.KILLED_AFTER_APP_COMPLETION;
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER; import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*;
@ -242,15 +248,22 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
} }
@VisibleForTesting @VisibleForTesting
static void handleComponentInstanceRelaunch( static void handleComponentInstanceRelaunch(ComponentInstance compInstance,
ComponentInstance compInstance, ComponentInstanceEvent event, ComponentInstanceEvent event, boolean failureBeforeLaunch,
boolean failureBeforeLaunch) { String containerDiag) {
Component comp = compInstance.getComponent(); Component comp = compInstance.getComponent();
// Do we need to relaunch the service? // Do we need to relaunch the service?
boolean hasContainerFailed = hasContainerFailed(event.getStatus()); boolean hasContainerFailed = failureBeforeLaunch || hasContainerFailed(
event.getStatus());
ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler(); ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler();
ContainerState containerState =
hasContainerFailed ? ContainerState.FAILED : ContainerState.SUCCEEDED;
if (compInstance.getContainerSpec() != null) {
compInstance.getContainerSpec().setState(containerState);
}
if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) { if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) {
// re-ask the failed container. // re-ask the failed container.
@ -259,25 +272,47 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append(compInstance.getCompInstanceId()).append(": "); builder.append(compInstance.getCompInstanceId()).append(": ");
builder.append(event.getContainerId()).append(" completed. Reinsert back to pending list and requested "); builder.append(event.getContainerId()).append(
" completed. Reinsert back to pending list and requested ");
builder.append("a new container.").append(System.lineSeparator()); builder.append("a new container.").append(System.lineSeparator());
builder.append(" exitStatus=").append(failureBeforeLaunch ? null : event.getStatus().getExitStatus()); builder.append(" exitStatus=").append(
failureBeforeLaunch ? null : event.getStatus().getExitStatus());
builder.append(", diagnostics="); builder.append(", diagnostics=");
builder.append(failureBeforeLaunch ? FAILED_BEFORE_LAUNCH_DIAG : event.getStatus().getDiagnostics()); builder.append(failureBeforeLaunch ?
FAILED_BEFORE_LAUNCH_DIAG :
event.getStatus().getDiagnostics());
if (event.getStatus().getExitStatus() != 0) { if (event.getStatus() != null && event.getStatus().getExitStatus() != 0) {
LOG.error(builder.toString()); LOG.error(builder.toString());
} else { } else{
LOG.info(builder.toString()); LOG.info(builder.toString());
} }
} else {
if (compInstance.timelineServiceEnabled) {
// record in ATS
LOG.info("Publishing component instance status {} {} ",
event.getContainerId(), containerState);
compInstance.serviceTimelinePublisher.componentInstanceFinished(
event.getContainerId(), event.getStatus().getExitStatus(),
containerState, containerDiag);
}
} else{
// When no relaunch, update component's #succeeded/#failed // When no relaunch, update component's #succeeded/#failed
// instances. // instances.
if (hasContainerFailed) { if (hasContainerFailed) {
comp.markAsFailed(compInstance); comp.markAsFailed(compInstance);
} else { } else{
comp.markAsSucceeded(compInstance); comp.markAsSucceeded(compInstance);
} }
if (compInstance.timelineServiceEnabled) {
// record in ATS
compInstance.serviceTimelinePublisher.componentInstanceFinished(
event.getContainerId(), event.getStatus().getExitStatus(),
containerState, containerDiag);
}
LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ? LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ?
" succeeded" : " succeeded" :
" failed") + " without retry, exitStatus=" + event.getStatus()); " failed") + " without retry, exitStatus=" + event.getStatus());
@ -287,8 +322,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
public static boolean hasContainerFailed(ContainerStatus containerStatus) { public static boolean hasContainerFailed(ContainerStatus containerStatus) {
//Mark conainer as failed if we cant get its exit status i.e null? //Mark conainer as failed if we cant get its exit status i.e null?
return containerStatus == null || containerStatus.getExitStatus() != return containerStatus == null || containerStatus
ContainerExitStatus.SUCCESS; .getExitStatus() != ContainerExitStatus.SUCCESS;
} }
private static class ContainerStoppedTransition extends BaseTransition { private static class ContainerStoppedTransition extends BaseTransition {
@ -307,9 +342,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
ComponentInstanceEvent event) { ComponentInstanceEvent event) {
Component comp = compInstance.component; Component comp = compInstance.component;
String containerDiag = String containerDiag = compInstance.getCompInstanceId() + ": " + (
compInstance.getCompInstanceId() + ": " + (failedBeforeLaunching ? failedBeforeLaunching ?
FAILED_BEFORE_LAUNCH_DIAG : event.getStatus().getDiagnostics()); FAILED_BEFORE_LAUNCH_DIAG :
event.getStatus().getDiagnostics());
compInstance.diagnostics.append(containerDiag + System.lineSeparator()); compInstance.diagnostics.append(containerDiag + System.lineSeparator());
compInstance.cancelContainerStatusRetriever(); compInstance.cancelContainerStatusRetriever();
if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) { if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
@ -329,36 +365,69 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// Check if it exceeds the failure threshold, but only if health threshold // Check if it exceeds the failure threshold, but only if health threshold
// monitor is not enabled // monitor is not enabled
if (!comp.isHealthThresholdMonitorEnabled() if (!comp.isHealthThresholdMonitorEnabled()
&& comp.currentContainerFailure && comp.currentContainerFailure.get()
.get() > comp.maxContainerFailurePerComp) { > comp.maxContainerFailurePerComp) {
String exitDiag = MessageFormat.format( String exitDiag = MessageFormat.format(
"[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. Shutting down now... " "[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. "
+ System.lineSeparator(), + "Shutting down now... "
comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp); + System.lineSeparator(), comp.getName(),
comp.currentContainerFailure.get(),
comp.maxContainerFailurePerComp);
compInstance.diagnostics.append(exitDiag); compInstance.diagnostics.append(exitDiag);
// append to global diagnostics that will be reported to RM. // append to global diagnostics that will be reported to RM.
scheduler.getDiagnostics().append(containerDiag); scheduler.getDiagnostics().append(containerDiag);
scheduler.getDiagnostics().append(exitDiag); scheduler.getDiagnostics().append(exitDiag);
LOG.warn(exitDiag); LOG.warn(exitDiag);
compInstance.getContainerSpec().setState(ContainerState.FAILED);
comp.getComponentSpec().setState(ComponentState.FAILED);
comp.getScheduler().getApp().setState(ServiceState.FAILED);
if (compInstance.timelineServiceEnabled) {
// record in ATS
compInstance.scheduler.getServiceTimelinePublisher()
.componentInstanceFinished(compInstance.getContainer().getId(),
failedBeforeLaunching ?
-1 :
event.getStatus().getExitStatus(), ContainerState.FAILED,
containerDiag);
// mark other component-instances/containers as STOPPED
for (ContainerId containerId : scheduler.getLiveInstances()
.keySet()) {
if (!compInstance.container.getId().equals(containerId)
&& !isFinalState(compInstance.getContainerSpec().getState())) {
compInstance.getContainerSpec().setState(ContainerState.STOPPED);
compInstance.scheduler.getServiceTimelinePublisher()
.componentInstanceFinished(containerId,
KILLED_AFTER_APP_COMPLETION, ContainerState.STOPPED,
scheduler.getDiagnostics().toString());
}
}
compInstance.scheduler.getServiceTimelinePublisher()
.componentFinished(comp.getComponentSpec(), ComponentState.FAILED,
scheduler.getSystemClock().getTime());
compInstance.scheduler.getServiceTimelinePublisher()
.serviceAttemptUnregistered(comp.getContext(),
FinalApplicationStatus.FAILED,
scheduler.getDiagnostics().toString());
}
shouldFailService = true; shouldFailService = true;
} }
if (!failedBeforeLaunching) { if (!failedBeforeLaunching) {
// clean up registry // clean up registry
// If the container failed before launching, no need to cleanup registry, // If the container failed before launching, no need to cleanup
// registry,
// because it was not registered before. // because it was not registered before.
// hdfs dir content will be overwritten when a new container gets started, // hdfs dir content will be overwritten when a new container gets
// started,
// so no need remove. // so no need remove.
compInstance.scheduler.executorService compInstance.scheduler.executorService.submit(
.submit(() -> compInstance.cleanupRegistry(event.getContainerId())); () -> compInstance.cleanupRegistry(event.getContainerId()));
if (compInstance.timelineServiceEnabled) {
// record in ATS
compInstance.serviceTimelinePublisher
.componentInstanceFinished(event.getContainerId(),
event.getStatus().getExitStatus(), containerDiag);
}
compInstance.containerSpec.setState(ContainerState.STOPPED);
} }
// remove the failed ContainerId -> CompInstance mapping // remove the failed ContainerId -> CompInstance mapping
@ -367,7 +436,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// According to component restart policy, handle container restart // According to component restart policy, handle container restart
// or finish the service (if all components finished) // or finish the service (if all components finished)
handleComponentInstanceRelaunch(compInstance, event, handleComponentInstanceRelaunch(compInstance, event,
failedBeforeLaunching); failedBeforeLaunching, containerDiag);
if (shouldFailService) { if (shouldFailService) {
scheduler.getTerminationHandler().terminate(-1); scheduler.getTerminationHandler().terminate(-1);
@ -375,6 +444,11 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
} }
} }
public static boolean isFinalState(ContainerState state) {
return ContainerState.FAILED.equals(state) || ContainerState.STOPPED
.equals(state) || ContainerState.SUCCEEDED.equals(state);
}
private static class ContainerUpgradeTransition extends BaseTransition { private static class ContainerUpgradeTransition extends BaseTransition {
@Override @Override
@ -586,7 +660,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
if (timelineServiceEnabled) { if (timelineServiceEnabled) {
serviceTimelinePublisher.componentInstanceFinished(containerId, serviceTimelinePublisher.componentInstanceFinished(containerId,
KILLED_BY_APPMASTER, diagnostics.toString()); KILLED_BY_APPMASTER, ContainerState.STOPPED, diagnostics.toString());
} }
cancelContainerStatusRetriever(); cancelContainerStatusRetriever();
scheduler.executorService.submit(() -> scheduler.executorService.submit(() ->

View File

@ -32,5 +32,8 @@ public enum ServiceTimelineEvent {
COMPONENT_INSTANCE_IP_HOST_UPDATE, COMPONENT_INSTANCE_IP_HOST_UPDATE,
COMPONENT_INSTANCE_BECOME_READY COMPONENT_INSTANCE_BECOME_READY,
COMPONENT_FINISHED
} }

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.api.records.*; import org.apache.hadoop.yarn.service.api.records.*;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -42,7 +44,6 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import static org.apache.hadoop.yarn.service.api.records.ContainerState.READY; import static org.apache.hadoop.yarn.service.api.records.ContainerState.READY;
import static org.apache.hadoop.yarn.service.api.records.ContainerState.STOPPED;
import static org.apache.hadoop.yarn.service.timelineservice.ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO; import static org.apache.hadoop.yarn.service.timelineservice.ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO;
/** /**
@ -130,12 +131,11 @@ public class ServiceTimelinePublisher extends CompositeService {
} }
public void serviceAttemptUnregistered(ServiceContext context, public void serviceAttemptUnregistered(ServiceContext context,
String diagnostics) { FinalApplicationStatus status, String diagnostics) {
TimelineEntity entity = createServiceAttemptEntity( TimelineEntity entity = createServiceAttemptEntity(
context.attemptId.getApplicationId().toString()); context.attemptId.getApplicationId().toString());
Map<String, Object> entityInfos = new HashMap<String, Object>(); Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(ServiceTimelineMetricsConstants.STATE, entityInfos.put(ServiceTimelineMetricsConstants.STATE, status);
FinalApplicationStatus.ENDED);
entityInfos.put(DIAGNOSTICS_INFO, diagnostics); entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
entity.addInfo(entityInfos); entity.addInfo(entityInfos);
@ -180,7 +180,7 @@ public class ServiceTimelinePublisher extends CompositeService {
} }
public void componentInstanceFinished(ContainerId containerId, public void componentInstanceFinished(ContainerId containerId,
int exitCode, String diagnostics) { int exitCode, ContainerState state, String diagnostics) {
TimelineEntity entity = createComponentInstanceEntity( TimelineEntity entity = createComponentInstanceEntity(
containerId.toString()); containerId.toString());
@ -189,7 +189,7 @@ public class ServiceTimelinePublisher extends CompositeService {
entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE, entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE,
exitCode); exitCode);
entityInfos.put(DIAGNOSTICS_INFO, diagnostics); entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
entityInfos.put(ServiceTimelineMetricsConstants.STATE, STOPPED); entityInfos.put(ServiceTimelineMetricsConstants.STATE, state);
entity.addInfo(entityInfos); entity.addInfo(entityInfos);
// add an event // add an event
@ -375,4 +375,25 @@ public class ServiceTimelinePublisher extends CompositeService {
log.error("Error when publishing entity " + entity, e); log.error("Error when publishing entity " + entity, e);
} }
} }
public void componentFinished(
Component comp,
ComponentState state, long finishTime) {
createComponentEntity(comp.getName());
TimelineEntity entity = createComponentEntity(comp.getName());
// create info keys
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(ServiceTimelineMetricsConstants.STATE, state);
entity.addInfo(entityInfos);
// add an event
TimelineEvent startEvent = new TimelineEvent();
startEvent
.setId(ServiceTimelineEvent.COMPONENT_FINISHED.toString());
startEvent.setTimestamp(finishTime);
entity.addEvent(startEvent);
putEntity(entity);
}
} }

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.io.IOException; import java.io.IOException;
@ -92,7 +93,18 @@ public class MockRunningServiceContext extends ServiceContext {
public ContainerLaunchService getContainerLaunchService() { public ContainerLaunchService getContainerLaunchService() {
return mockLaunchService; return mockLaunchService;
} }
@Override public ServiceUtils.ProcessTerminationHandler
getTerminationHandler() {
return new
ServiceUtils.ProcessTerminationHandler() {
public void terminate(int exitCode) {
}
};
}
}; };
this.scheduler.init(fsWatcher.getConf()); this.scheduler.init(fsWatcher.getConf());
ServiceTestUtils.createServiceManager(this); ServiceTestUtils.createServiceManager(this);
@ -116,8 +128,10 @@ public class MockRunningServiceContext extends ServiceContext {
Component component = new org.apache.hadoop.yarn.service.component. Component component = new org.apache.hadoop.yarn.service.component.
Component(componentSpec, 1L, context); Component(componentSpec, 1L, context);
componentState.put(component.getName(), component); componentState.put(component.getName(), component);
component.handle(new ComponentEvent(component.getName(), component.handle(
ComponentEventType.FLEX)); new ComponentEvent(component.getName(), ComponentEventType.FLEX)
.setDesired(
component.getComponentSpec().getNumberOfContainers()));
for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) { for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
counter++; counter++;

View File

@ -63,7 +63,6 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URL; import java.net.URL;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -119,14 +118,10 @@ public class ServiceTestUtils {
Component.RestartPolicyEnum.NEVER, null)); Component.RestartPolicyEnum.NEVER, null));
exampleApp.addComponent( exampleApp.addComponent(
createComponent("terminating-comp2", 2, "sleep 1000", createComponent("terminating-comp2", 2, "sleep 1000",
Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{ Component.RestartPolicyEnum.ON_FAILURE, null));
add("terminating-comp1");
}}));
exampleApp.addComponent( exampleApp.addComponent(
createComponent("terminating-comp3", 2, "sleep 1000", createComponent("terminating-comp3", 2, "sleep 1000",
Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{ Component.RestartPolicyEnum.ON_FAILURE, null));
add("terminating-comp2");
}}));
return exampleApp; return exampleApp;
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.TestServiceManager; import org.apache.hadoop.yarn.service.TestServiceManager;
import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
@ -147,7 +148,8 @@ public class TestComponent {
} }
@Test @Test
public void testComponentStateUpdatesWithTerminatingComponents() throws public void testComponentStateReachesStableStateWithTerminatingComponents()
throws
Exception { Exception {
final String serviceName = final String serviceName =
"testComponentStateUpdatesWithTerminatingComponents"; "testComponentStateUpdatesWithTerminatingComponents";
@ -198,6 +200,57 @@ public class TestComponent {
} }
} }
@Test
public void testComponentStateUpdatesWithTerminatingComponents()
throws
Exception {
final String serviceName =
"testComponentStateUpdatesWithTerminatingComponents";
Service testService = ServiceTestUtils.createTerminatingJobExample(
serviceName);
TestServiceManager.createDef(serviceName, testService);
ServiceContext context = new MockRunningServiceContext(rule, testService);
for (Component comp : context.scheduler.getAllComponents().values()) {
Iterator<ComponentInstance> instanceIter = comp.
getAllComponentInstances().iterator();
while (instanceIter.hasNext()) {
ComponentInstance componentInstance = instanceIter.next();
Container instanceContainer = componentInstance.getContainer();
//stop 1 container
ContainerStatus containerStatus = ContainerStatus.newInstance(
instanceContainer.getId(),
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
"successful", 0);
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus)
.setContainerId(instanceContainer.getId()));
componentInstance.handle(
new ComponentInstanceEvent(componentInstance.getContainer().getId(),
ComponentInstanceEventType.STOP).setStatus(containerStatus));
}
ComponentState componentState =
comp.getComponentSpec().getState();
Assert.assertEquals(
ComponentState.SUCCEEDED,
componentState);
}
ServiceState serviceState =
testService.getState();
Assert.assertEquals(
ServiceState.SUCCEEDED,
serviceState);
}
private static org.apache.hadoop.yarn.service.api.records.Component private static org.apache.hadoop.yarn.service.api.records.Component
createSpecWithEnv(String serviceName, String compName, String key, createSpecWithEnv(String serviceName, String compName, String key,
String val) { String val) {

View File

@ -110,7 +110,6 @@ public class TestComponentRestartPolicy {
assertEquals(true, restartPolicy.isReadyForDownStream(component)); assertEquals(true, restartPolicy.isReadyForDownStream(component));
when(component.getNumSucceededInstances()).thenReturn(new Long(2)); when(component.getNumSucceededInstances()).thenReturn(new Long(2));
when(component.getNumFailedInstances()).thenReturn(new Long(1)); when(component.getNumFailedInstances()).thenReturn(new Long(1));
when(component.getNumDesiredInstances()).thenReturn(3); when(component.getNumDesiredInstances()).thenReturn(3);

View File

@ -204,6 +204,8 @@ public class TestComponentInstance {
when(componentInstance.getComponent()).thenReturn(component); when(componentInstance.getComponent()).thenReturn(component);
when(componentInstance.getCompInstanceName()).thenReturn( when(componentInstance.getCompInstanceName()).thenReturn(
"compInstance" + instanceId); "compInstance" + instanceId);
Container container = mock(Container.class);
when(componentInstance.getContainerSpec()).thenReturn(container);
ServiceUtils.ProcessTerminationHandler terminationHandler = mock( ServiceUtils.ProcessTerminationHandler terminationHandler = mock(
ServiceUtils.ProcessTerminationHandler.class); ServiceUtils.ProcessTerminationHandler.class);
@ -227,12 +229,15 @@ public class TestComponentInstance {
Mockito.doNothing().when(serviceScheduler).setGracefulStop( Mockito.doNothing().when(serviceScheduler).setGracefulStop(
any(FinalApplicationStatus.class)); any(FinalApplicationStatus.class));
final String containerDiag = "Container succeeded";
ComponentInstanceEvent componentInstanceEvent = mock( ComponentInstanceEvent componentInstanceEvent = mock(
ComponentInstanceEvent.class); ComponentInstanceEvent.class);
ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId
.newInstance(ApplicationId.newInstance(1234L, 1), 1), 1); .newInstance(ApplicationId.newInstance(1234L, 1), 1), 1);
ContainerStatus containerStatus = ContainerStatus.newInstance(containerId, ContainerStatus containerStatus = ContainerStatus.newInstance(containerId,
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, "hello", 0); org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
containerDiag, 0);
when(componentInstanceEvent.getStatus()).thenReturn(containerStatus); when(componentInstanceEvent.getStatus()).thenReturn(containerStatus);
@ -245,7 +250,7 @@ public class TestComponentInstance {
comp.getAllComponentInstances().iterator().next(); comp.getAllComponentInstances().iterator().next();
ComponentInstance.handleComponentInstanceRelaunch(componentInstance, ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class));
@ -262,7 +267,7 @@ public class TestComponentInstance {
componentInstance = comp.getAllComponentInstances().iterator().next(); componentInstance = comp.getAllComponentInstances().iterator().next();
containerStatus.setExitStatus(1); containerStatus.setExitStatus(1);
ComponentInstance.handleComponentInstanceRelaunch(componentInstance, ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(1)).reInsertPendingInstance( verify(comp, times(1)).reInsertPendingInstance(
@ -286,7 +291,7 @@ public class TestComponentInstance {
when(comp.getNumSucceededInstances()).thenReturn(new Long(1)); when(comp.getNumSucceededInstances()).thenReturn(new Long(1));
ComponentInstance.handleComponentInstanceRelaunch(componentInstance, ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
verify(comp, times(1)).markAsSucceeded(any(ComponentInstance.class)); verify(comp, times(1)).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(0)).reInsertPendingInstance( verify(comp, times(0)).reInsertPendingInstance(
@ -304,8 +309,7 @@ public class TestComponentInstance {
when(comp.getNumFailedInstances()).thenReturn(new Long(1)); when(comp.getNumFailedInstances()).thenReturn(new Long(1));
ComponentInstance.handleComponentInstanceRelaunch(componentInstance, ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, times(1)).markAsFailed(any(ComponentInstance.class)); verify(comp, times(1)).markAsFailed(any(ComponentInstance.class));
verify(comp, times(0)).reInsertPendingInstance( verify(comp, times(0)).reInsertPendingInstance(
@ -323,7 +327,7 @@ public class TestComponentInstance {
componentInstance = comp.getAllComponentInstances().iterator().next(); componentInstance = comp.getAllComponentInstances().iterator().next();
containerStatus.setExitStatus(1); containerStatus.setExitStatus(1);
ComponentInstance.handleComponentInstanceRelaunch(componentInstance, ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(1)).reInsertPendingInstance( verify(comp, times(1)).reInsertPendingInstance(
@ -340,7 +344,7 @@ public class TestComponentInstance {
componentInstance = comp.getAllComponentInstances().iterator().next(); componentInstance = comp.getAllComponentInstances().iterator().next();
containerStatus.setExitStatus(1); containerStatus.setExitStatus(1);
ComponentInstance.handleComponentInstanceRelaunch(componentInstance, ComponentInstance.handleComponentInstanceRelaunch(componentInstance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, times(1)).markAsFailed(any(ComponentInstance.class)); verify(comp, times(1)).markAsFailed(any(ComponentInstance.class));
verify(comp, times(0)).reInsertPendingInstance( verify(comp, times(0)).reInsertPendingInstance(
@ -363,8 +367,7 @@ public class TestComponentInstance {
containerStatus.setExitStatus(1); containerStatus.setExitStatus(1);
ComponentInstance commponentInstance = iter.next(); ComponentInstance commponentInstance = iter.next();
ComponentInstance.handleComponentInstanceRelaunch(commponentInstance, ComponentInstance.handleComponentInstanceRelaunch(commponentInstance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class));
verify(comp, times(1)).reInsertPendingInstance( verify(comp, times(1)).reInsertPendingInstance(
@ -404,7 +407,7 @@ public class TestComponentInstance {
when(component2Instance.getComponent().getNumFailedInstances()) when(component2Instance.getComponent().getNumFailedInstances())
.thenReturn(new Long(failed2Instances.size())); .thenReturn(new Long(failed2Instances.size()));
ComponentInstance.handleComponentInstanceRelaunch(component2Instance, ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
} }
Map<String, ComponentInstance> failed1Instances = new HashMap<>(); Map<String, ComponentInstance> failed1Instances = new HashMap<>();
@ -418,7 +421,7 @@ public class TestComponentInstance {
when(component1Instance.getComponent().getNumFailedInstances()) when(component1Instance.getComponent().getNumFailedInstances())
.thenReturn(new Long(failed1Instances.size())); .thenReturn(new Long(failed1Instances.size()));
ComponentInstance.handleComponentInstanceRelaunch(component1Instance, ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
} }
verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class));
@ -458,7 +461,7 @@ public class TestComponentInstance {
when(component2Instance.getComponent().getNumSucceededInstances()) when(component2Instance.getComponent().getNumSucceededInstances())
.thenReturn(new Long(succeeded2Instances.size())); .thenReturn(new Long(succeeded2Instances.size()));
ComponentInstance.handleComponentInstanceRelaunch(component2Instance, ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
} }
Map<String, ComponentInstance> succeeded1Instances = new HashMap<>(); Map<String, ComponentInstance> succeeded1Instances = new HashMap<>();
@ -471,7 +474,7 @@ public class TestComponentInstance {
when(component1Instance.getComponent().getNumSucceededInstances()) when(component1Instance.getComponent().getNumSucceededInstances())
.thenReturn(new Long(succeeded1Instances.size())); .thenReturn(new Long(succeeded1Instances.size()));
ComponentInstance.handleComponentInstanceRelaunch(component1Instance, ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
} }
verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class)); verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class));
@ -500,7 +503,7 @@ public class TestComponentInstance {
for (ComponentInstance component2Instance : component2Instances) { for (ComponentInstance component2Instance : component2Instances) {
ComponentInstance.handleComponentInstanceRelaunch(component2Instance, ComponentInstance.handleComponentInstanceRelaunch(component2Instance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
} }
succeeded1Instances = new HashMap<>(); succeeded1Instances = new HashMap<>();
@ -511,7 +514,7 @@ public class TestComponentInstance {
when(component1Instance.getComponent().getSucceededInstances()) when(component1Instance.getComponent().getSucceededInstances())
.thenReturn(succeeded1Instances.values()); .thenReturn(succeeded1Instances.values());
ComponentInstance.handleComponentInstanceRelaunch(component1Instance, ComponentInstance.handleComponentInstanceRelaunch(component1Instance,
componentInstanceEvent, false); componentInstanceEvent, false, containerDiag);
} }
verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class)); verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class));

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.timelineservice;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.client.api.TimelineV2Client;
@ -122,7 +123,8 @@ public class TestServiceTimelinePublisher {
context.attemptId = ApplicationAttemptId context.attemptId = ApplicationAttemptId
.newInstance(ApplicationId.fromString(service.getId()), 1); .newInstance(ApplicationId.fromString(service.getId()), 1);
String exitDiags = "service killed"; String exitDiags = "service killed";
serviceTimelinePublisher.serviceAttemptUnregistered(context, exitDiags); serviceTimelinePublisher.serviceAttemptUnregistered(context,
FinalApplicationStatus.ENDED, exitDiags);
lastPublishedEntities = lastPublishedEntities =
((DummyTimelineClient) timelineClient).getLastPublishedEntities(); ((DummyTimelineClient) timelineClient).getLastPublishedEntities();
for (TimelineEntity timelineEntity : lastPublishedEntities) { for (TimelineEntity timelineEntity : lastPublishedEntities) {