From fd089caf69cf608a91564c9c3d20cbf84e7fd60c Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Tue, 28 Aug 2018 13:55:28 -0400 Subject: [PATCH] YARN-8488. Added SUCCEEDED/FAILED states to YARN service. Contributed by Suma Shivaprasad --- .../hadoop/yarn/service/ServiceScheduler.java | 100 ++++++++++-- .../service/api/records/ComponentState.java | 2 +- .../service/api/records/ContainerState.java | 3 +- .../service/api/records/ServiceState.java | 2 +- .../component/instance/ComponentInstance.java | 144 +++++++++++++----- .../timelineservice/ServiceTimelineEvent.java | 5 +- .../ServiceTimelinePublisher.java | 33 +++- .../service/MockRunningServiceContext.java | 18 ++- .../hadoop/yarn/service/ServiceTestUtils.java | 9 +- .../yarn/service/component/TestComponent.java | 55 ++++++- .../component/TestComponentRestartPolicy.java | 1 - .../instance/TestComponentInstance.java | 35 +++-- .../TestServiceTimelinePublisher.java | 4 +- 13 files changed, 322 insertions(+), 89 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 384659f8aba..b49ef2ad923 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.service.api.ServiceApiConstants; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ConfigFile; @@ -80,6 +81,8 @@ import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils; import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.util.BoundedAppender; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,7 +105,8 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.registry.client.api.RegistryConstants.*; -import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_AFTER_APP_COMPLETION; +import static org.apache.hadoop.yarn.api.records.ContainerExitStatus + .KILLED_AFTER_APP_COMPLETION; import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*; import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes @@ -137,6 +141,8 @@ public class ServiceScheduler extends CompositeService { private ServiceTimelinePublisher serviceTimelinePublisher; + private boolean timelineServiceEnabled; + // Global diagnostics that will be reported to RM on eRxit. // The unit the number of characters. This will be limited to 64 * 1024 // characters. @@ -169,6 +175,8 @@ public class ServiceScheduler extends CompositeService { private volatile FinalApplicationStatus finalApplicationStatus = FinalApplicationStatus.ENDED; + private Clock systemClock; + // For unit test override since we don't want to terminate UT process. private ServiceUtils.ProcessTerminationHandler terminationHandler = new ServiceUtils.ProcessTerminationHandler(); @@ -176,6 +184,8 @@ public class ServiceScheduler extends CompositeService { public ServiceScheduler(ServiceContext context) { super(context.getService().getName()); this.context = context; + this.app = context.getService(); + this.systemClock = SystemClock.getInstance(); } public void buildInstance(ServiceContext context, Configuration configuration) @@ -254,8 +264,14 @@ public class ServiceScheduler extends CompositeService { YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS, app.getConfiguration(), getConfig()); + if (YarnConfiguration + .timelineServiceV2Enabled(getConfig())) { + timelineServiceEnabled = true; + } + serviceManager = createServiceManager(); context.setServiceManager(serviceManager); + } protected YarnRegistryViewForProviders createYarnRegistryOperations( @@ -311,21 +327,38 @@ public class ServiceScheduler extends CompositeService { // only stop the entire service when a graceful stop has been initiated // (e.g. via client RPC, not through the AM receiving a SIGTERM) if (gracefulStop) { + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - // mark component-instances/containers as STOPPED - for (ContainerId containerId : getLiveInstances().keySet()) { - serviceTimelinePublisher.componentInstanceFinished(containerId, - KILLED_AFTER_APP_COMPLETION, diagnostics.toString()); + + // mark other component-instances/containers as STOPPED + final Map liveInst = + getLiveInstances(); + for (Map.Entry instance : liveInst + .entrySet()) { + if (!ComponentInstance.isFinalState( + instance.getValue().getContainerSpec().getState())) { + LOG.info("{} Component instance state changed from {} to {}", + instance.getValue().getCompInstanceName(), + instance.getValue().getContainerSpec().getState(), + ContainerState.STOPPED); + serviceTimelinePublisher.componentInstanceFinished( + instance.getKey(), KILLED_AFTER_APP_COMPLETION, + ContainerState.STOPPED, getDiagnostics().toString()); + } } + + LOG.info("Service state changed to {}", finalApplicationStatus); // mark attempt as unregistered - serviceTimelinePublisher - .serviceAttemptUnregistered(context, diagnostics.toString()); + serviceTimelinePublisher.serviceAttemptUnregistered(context, + finalApplicationStatus, diagnostics.toString()); } + // unregister AM - amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED, + amRMClient.unregisterApplicationMaster(finalApplicationStatus, diagnostics.toString(), ""); - LOG.info("Service {} unregistered with RM, with attemptId = {} " + - ", diagnostics = {} ", app.getName(), context.attemptId, diagnostics); + LOG.info("Service {} unregistered with RM, with attemptId = {} " + + ", diagnostics = {} ", app.getName(), context.attemptId, + diagnostics); } super.serviceStop(); } @@ -911,7 +944,7 @@ public class ServiceScheduler extends CompositeService { * (which #failed-instances + #suceeded-instances = #total-n-containers) * The service will be terminated. */ - public synchronized void terminateServiceIfAllComponentsFinished() { + public void terminateServiceIfAllComponentsFinished() { boolean shouldTerminate = true; // Succeeded comps and failed comps, for logging purposes. @@ -920,7 +953,30 @@ public class ServiceScheduler extends CompositeService { for (Component comp : getAllComponents().values()) { ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler(); - if (!restartPolicy.shouldTerminate(comp)) { + + if (restartPolicy.shouldTerminate(comp)) { + if (restartPolicy.hasCompletedSuccessfully(comp)) { + comp.getComponentSpec().setState(org.apache.hadoop + .yarn.service.api.records.ComponentState.SUCCEEDED); + LOG.info("{} Component state changed from {} to {}", + comp.getName(), comp.getComponentSpec().getState(), + org.apache.hadoop + .yarn.service.api.records.ComponentState.SUCCEEDED); + } else { + comp.getComponentSpec().setState(org.apache.hadoop + .yarn.service.api.records.ComponentState.FAILED); + LOG.info("{} Component state changed from {} to {}", + comp.getName(), comp.getComponentSpec().getState(), + org.apache.hadoop + .yarn.service.api.records.ComponentState.FAILED); + } + + if (isTimelineServiceEnabled()) { + // record in ATS + serviceTimelinePublisher.componentFinished(comp.getComponentSpec(), + comp.getComponentSpec().getState(), systemClock.getTime()); + } + } else { shouldTerminate = false; break; } @@ -929,7 +985,7 @@ public class ServiceScheduler extends CompositeService { if (nFailed > 0) { failedComponents.add(comp.getName()); - } else{ + } else { succeededComponents.add(comp.getName()); } } @@ -944,16 +1000,28 @@ public class ServiceScheduler extends CompositeService { LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils .join(failedComponents, ",") + "]"); + int exitStatus = EXIT_SUCCESS; if (failedComponents.isEmpty()) { setGracefulStop(FinalApplicationStatus.SUCCEEDED); - getTerminationHandler().terminate(EXIT_SUCCESS); - } else{ + app.setState(ServiceState.SUCCEEDED); + } else { setGracefulStop(FinalApplicationStatus.FAILED); - getTerminationHandler().terminate(EXIT_FALSE); + app.setState(ServiceState.FAILED); + exitStatus = EXIT_FALSE; } + + getTerminationHandler().terminate(exitStatus); } } + public Clock getSystemClock() { + return systemClock; + } + + public boolean isTimelineServiceEnabled() { + return timelineServiceEnabled; + } + public ServiceUtils.ProcessTerminationHandler getTerminationHandler() { return terminationHandler; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java index 3e7ed11a257..472f3749f70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java @@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable @ApiModel(description = "The current state of a component.") public enum ComponentState { - FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING; + FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING, SUCCEEDED, FAILED; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java index 6e390737e70..cac527a5482 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java @@ -26,5 +26,6 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceAudience.Public @InterfaceStability.Unstable public enum ContainerState { - RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING; + RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING, SUCCEEDED, + FAILED; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index 0b3c0377fab..38761738144 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -30,5 +30,5 @@ import org.apache.hadoop.classification.InterfaceStability; @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") public enum ServiceState { ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING, - UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING; + UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index ed5e68e98f4..afd8c671fa4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -41,7 +42,9 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.api.records.Artifact; +import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; @@ -68,6 +71,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*; + +import static org.apache.hadoop.yarn.api.records.ContainerExitStatus + .KILLED_AFTER_APP_COMPLETION; import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*; @@ -242,15 +248,22 @@ public class ComponentInstance implements EventHandler, } @VisibleForTesting - static void handleComponentInstanceRelaunch( - ComponentInstance compInstance, ComponentInstanceEvent event, - boolean failureBeforeLaunch) { + static void handleComponentInstanceRelaunch(ComponentInstance compInstance, + ComponentInstanceEvent event, boolean failureBeforeLaunch, + String containerDiag) { Component comp = compInstance.getComponent(); // Do we need to relaunch the service? - boolean hasContainerFailed = hasContainerFailed(event.getStatus()); + boolean hasContainerFailed = failureBeforeLaunch || hasContainerFailed( + event.getStatus()); ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler(); + ContainerState containerState = + hasContainerFailed ? ContainerState.FAILED : ContainerState.SUCCEEDED; + + if (compInstance.getContainerSpec() != null) { + compInstance.getContainerSpec().setState(containerState); + } if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) { // re-ask the failed container. @@ -259,25 +272,47 @@ public class ComponentInstance implements EventHandler, StringBuilder builder = new StringBuilder(); builder.append(compInstance.getCompInstanceId()).append(": "); - builder.append(event.getContainerId()).append(" completed. Reinsert back to pending list and requested "); + builder.append(event.getContainerId()).append( + " completed. Reinsert back to pending list and requested "); builder.append("a new container.").append(System.lineSeparator()); - builder.append(" exitStatus=").append(failureBeforeLaunch ? null : event.getStatus().getExitStatus()); + builder.append(" exitStatus=").append( + failureBeforeLaunch ? null : event.getStatus().getExitStatus()); builder.append(", diagnostics="); - builder.append(failureBeforeLaunch ? FAILED_BEFORE_LAUNCH_DIAG : event.getStatus().getDiagnostics()); + builder.append(failureBeforeLaunch ? + FAILED_BEFORE_LAUNCH_DIAG : + event.getStatus().getDiagnostics()); - if (event.getStatus().getExitStatus() != 0) { + if (event.getStatus() != null && event.getStatus().getExitStatus() != 0) { LOG.error(builder.toString()); - } else { + } else{ LOG.info(builder.toString()); } - } else { + + if (compInstance.timelineServiceEnabled) { + // record in ATS + LOG.info("Publishing component instance status {} {} ", + event.getContainerId(), containerState); + compInstance.serviceTimelinePublisher.componentInstanceFinished( + event.getContainerId(), event.getStatus().getExitStatus(), + containerState, containerDiag); + } + + } else{ // When no relaunch, update component's #succeeded/#failed // instances. if (hasContainerFailed) { comp.markAsFailed(compInstance); - } else { + } else{ comp.markAsSucceeded(compInstance); } + + if (compInstance.timelineServiceEnabled) { + // record in ATS + compInstance.serviceTimelinePublisher.componentInstanceFinished( + event.getContainerId(), event.getStatus().getExitStatus(), + containerState, containerDiag); + } + LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ? " succeeded" : " failed") + " without retry, exitStatus=" + event.getStatus()); @@ -287,8 +322,8 @@ public class ComponentInstance implements EventHandler, public static boolean hasContainerFailed(ContainerStatus containerStatus) { //Mark conainer as failed if we cant get its exit status i.e null? - return containerStatus == null || containerStatus.getExitStatus() != - ContainerExitStatus.SUCCESS; + return containerStatus == null || containerStatus + .getExitStatus() != ContainerExitStatus.SUCCESS; } private static class ContainerStoppedTransition extends BaseTransition { @@ -307,9 +342,10 @@ public class ComponentInstance implements EventHandler, ComponentInstanceEvent event) { Component comp = compInstance.component; - String containerDiag = - compInstance.getCompInstanceId() + ": " + (failedBeforeLaunching ? - FAILED_BEFORE_LAUNCH_DIAG : event.getStatus().getDiagnostics()); + String containerDiag = compInstance.getCompInstanceId() + ": " + ( + failedBeforeLaunching ? + FAILED_BEFORE_LAUNCH_DIAG : + event.getStatus().getDiagnostics()); compInstance.diagnostics.append(containerDiag + System.lineSeparator()); compInstance.cancelContainerStatusRetriever(); if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) { @@ -329,36 +365,69 @@ public class ComponentInstance implements EventHandler, // Check if it exceeds the failure threshold, but only if health threshold // monitor is not enabled if (!comp.isHealthThresholdMonitorEnabled() - && comp.currentContainerFailure - .get() > comp.maxContainerFailurePerComp) { + && comp.currentContainerFailure.get() + > comp.maxContainerFailurePerComp) { String exitDiag = MessageFormat.format( - "[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. Shutting down now... " - + System.lineSeparator(), - comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp); + "[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. " + + "Shutting down now... " + + System.lineSeparator(), comp.getName(), + comp.currentContainerFailure.get(), + comp.maxContainerFailurePerComp); compInstance.diagnostics.append(exitDiag); // append to global diagnostics that will be reported to RM. scheduler.getDiagnostics().append(containerDiag); scheduler.getDiagnostics().append(exitDiag); LOG.warn(exitDiag); + + compInstance.getContainerSpec().setState(ContainerState.FAILED); + comp.getComponentSpec().setState(ComponentState.FAILED); + comp.getScheduler().getApp().setState(ServiceState.FAILED); + + if (compInstance.timelineServiceEnabled) { + // record in ATS + compInstance.scheduler.getServiceTimelinePublisher() + .componentInstanceFinished(compInstance.getContainer().getId(), + failedBeforeLaunching ? + -1 : + event.getStatus().getExitStatus(), ContainerState.FAILED, + containerDiag); + + // mark other component-instances/containers as STOPPED + for (ContainerId containerId : scheduler.getLiveInstances() + .keySet()) { + if (!compInstance.container.getId().equals(containerId) + && !isFinalState(compInstance.getContainerSpec().getState())) { + compInstance.getContainerSpec().setState(ContainerState.STOPPED); + compInstance.scheduler.getServiceTimelinePublisher() + .componentInstanceFinished(containerId, + KILLED_AFTER_APP_COMPLETION, ContainerState.STOPPED, + scheduler.getDiagnostics().toString()); + } + } + + compInstance.scheduler.getServiceTimelinePublisher() + .componentFinished(comp.getComponentSpec(), ComponentState.FAILED, + scheduler.getSystemClock().getTime()); + + compInstance.scheduler.getServiceTimelinePublisher() + .serviceAttemptUnregistered(comp.getContext(), + FinalApplicationStatus.FAILED, + scheduler.getDiagnostics().toString()); + } + shouldFailService = true; } if (!failedBeforeLaunching) { // clean up registry - // If the container failed before launching, no need to cleanup registry, + // If the container failed before launching, no need to cleanup + // registry, // because it was not registered before. - // hdfs dir content will be overwritten when a new container gets started, + // hdfs dir content will be overwritten when a new container gets + // started, // so no need remove. - compInstance.scheduler.executorService - .submit(() -> compInstance.cleanupRegistry(event.getContainerId())); - - if (compInstance.timelineServiceEnabled) { - // record in ATS - compInstance.serviceTimelinePublisher - .componentInstanceFinished(event.getContainerId(), - event.getStatus().getExitStatus(), containerDiag); - } - compInstance.containerSpec.setState(ContainerState.STOPPED); + compInstance.scheduler.executorService.submit( + () -> compInstance.cleanupRegistry(event.getContainerId())); } // remove the failed ContainerId -> CompInstance mapping @@ -367,7 +436,7 @@ public class ComponentInstance implements EventHandler, // According to component restart policy, handle container restart // or finish the service (if all components finished) handleComponentInstanceRelaunch(compInstance, event, - failedBeforeLaunching); + failedBeforeLaunching, containerDiag); if (shouldFailService) { scheduler.getTerminationHandler().terminate(-1); @@ -375,6 +444,11 @@ public class ComponentInstance implements EventHandler, } } + public static boolean isFinalState(ContainerState state) { + return ContainerState.FAILED.equals(state) || ContainerState.STOPPED + .equals(state) || ContainerState.SUCCEEDED.equals(state); + } + private static class ContainerUpgradeTransition extends BaseTransition { @Override @@ -586,7 +660,7 @@ public class ComponentInstance implements EventHandler, if (timelineServiceEnabled) { serviceTimelinePublisher.componentInstanceFinished(containerId, - KILLED_BY_APPMASTER, diagnostics.toString()); + KILLED_BY_APPMASTER, ContainerState.STOPPED, diagnostics.toString()); } cancelContainerStatusRetriever(); scheduler.executorService.submit(() -> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java index 6c3428a748d..832dad729ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java @@ -32,5 +32,8 @@ public enum ServiceTimelineEvent { COMPONENT_INSTANCE_IP_HOST_UPDATE, - COMPONENT_INSTANCE_BECOME_READY + COMPONENT_INSTANCE_BECOME_READY, + + COMPONENT_FINISHED + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java index 6c73ebb8d67..79f37c00d27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java @@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.api.records.*; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.slf4j.Logger; @@ -42,7 +44,6 @@ import java.util.Map.Entry; import java.util.Set; import static org.apache.hadoop.yarn.service.api.records.ContainerState.READY; -import static org.apache.hadoop.yarn.service.api.records.ContainerState.STOPPED; import static org.apache.hadoop.yarn.service.timelineservice.ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO; /** @@ -130,12 +131,11 @@ public class ServiceTimelinePublisher extends CompositeService { } public void serviceAttemptUnregistered(ServiceContext context, - String diagnostics) { + FinalApplicationStatus status, String diagnostics) { TimelineEntity entity = createServiceAttemptEntity( context.attemptId.getApplicationId().toString()); Map entityInfos = new HashMap(); - entityInfos.put(ServiceTimelineMetricsConstants.STATE, - FinalApplicationStatus.ENDED); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, status); entityInfos.put(DIAGNOSTICS_INFO, diagnostics); entity.addInfo(entityInfos); @@ -180,7 +180,7 @@ public class ServiceTimelinePublisher extends CompositeService { } public void componentInstanceFinished(ContainerId containerId, - int exitCode, String diagnostics) { + int exitCode, ContainerState state, String diagnostics) { TimelineEntity entity = createComponentInstanceEntity( containerId.toString()); @@ -189,7 +189,7 @@ public class ServiceTimelinePublisher extends CompositeService { entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE, exitCode); entityInfos.put(DIAGNOSTICS_INFO, diagnostics); - entityInfos.put(ServiceTimelineMetricsConstants.STATE, STOPPED); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, state); entity.addInfo(entityInfos); // add an event @@ -375,4 +375,25 @@ public class ServiceTimelinePublisher extends CompositeService { log.error("Error when publishing entity " + entity, e); } } + + public void componentFinished( + Component comp, + ComponentState state, long finishTime) { + createComponentEntity(comp.getName()); + TimelineEntity entity = createComponentEntity(comp.getName()); + + // create info keys + Map entityInfos = new HashMap(); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, state); + entity.addInfo(entityInfos); + + // add an event + TimelineEvent startEvent = new TimelineEvent(); + startEvent + .setId(ServiceTimelineEvent.COMPONENT_FINISHED.toString()); + startEvent.setTimestamp(finishTime); + entity.addEvent(startEvent); + + putEntity(entity); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java index 89888c5cf97..321b2cda3e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.mockito.stubbing.Answer; import java.io.IOException; @@ -92,7 +93,18 @@ public class MockRunningServiceContext extends ServiceContext { public ContainerLaunchService getContainerLaunchService() { return mockLaunchService; } + + @Override public ServiceUtils.ProcessTerminationHandler + getTerminationHandler() { + return new + ServiceUtils.ProcessTerminationHandler() { + public void terminate(int exitCode) { + } + }; + } }; + + this.scheduler.init(fsWatcher.getConf()); ServiceTestUtils.createServiceManager(this); @@ -116,8 +128,10 @@ public class MockRunningServiceContext extends ServiceContext { Component component = new org.apache.hadoop.yarn.service.component. Component(componentSpec, 1L, context); componentState.put(component.getName(), component); - component.handle(new ComponentEvent(component.getName(), - ComponentEventType.FLEX)); + component.handle( + new ComponentEvent(component.getName(), ComponentEventType.FLEX) + .setDesired( + component.getComponentSpec().getNumberOfContainers())); for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) { counter++; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 170c20b84b9..6b49ab07c1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -63,7 +63,6 @@ import java.io.IOException; import java.io.OutputStream; import java.net.URL; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; @@ -119,14 +118,10 @@ public class ServiceTestUtils { Component.RestartPolicyEnum.NEVER, null)); exampleApp.addComponent( createComponent("terminating-comp2", 2, "sleep 1000", - Component.RestartPolicyEnum.ON_FAILURE, new ArrayList() {{ - add("terminating-comp1"); - }})); + Component.RestartPolicyEnum.ON_FAILURE, null)); exampleApp.addComponent( createComponent("terminating-comp3", 2, "sleep 1000", - Component.RestartPolicyEnum.ON_FAILURE, new ArrayList() {{ - add("terminating-comp2"); - }})); + Component.RestartPolicyEnum.ON_FAILURE, null)); return exampleApp; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index 2e17c7f4a1f..e1a4d9d7553 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.service.ServiceTestUtils; import org.apache.hadoop.yarn.service.TestServiceManager; import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; @@ -147,7 +148,8 @@ public class TestComponent { } @Test - public void testComponentStateUpdatesWithTerminatingComponents() throws + public void testComponentStateReachesStableStateWithTerminatingComponents() + throws Exception { final String serviceName = "testComponentStateUpdatesWithTerminatingComponents"; @@ -198,6 +200,57 @@ public class TestComponent { } } + @Test + public void testComponentStateUpdatesWithTerminatingComponents() + throws + Exception { + final String serviceName = + "testComponentStateUpdatesWithTerminatingComponents"; + + Service testService = ServiceTestUtils.createTerminatingJobExample( + serviceName); + TestServiceManager.createDef(serviceName, testService); + + ServiceContext context = new MockRunningServiceContext(rule, testService); + + for (Component comp : context.scheduler.getAllComponents().values()) { + Iterator instanceIter = comp. + getAllComponentInstances().iterator(); + + while (instanceIter.hasNext()) { + + ComponentInstance componentInstance = instanceIter.next(); + Container instanceContainer = componentInstance.getContainer(); + + //stop 1 container + ContainerStatus containerStatus = ContainerStatus.newInstance( + instanceContainer.getId(), + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + "successful", 0); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus) + .setContainerId(instanceContainer.getId())); + componentInstance.handle( + new ComponentInstanceEvent(componentInstance.getContainer().getId(), + ComponentInstanceEventType.STOP).setStatus(containerStatus)); + } + + ComponentState componentState = + comp.getComponentSpec().getState(); + Assert.assertEquals( + ComponentState.SUCCEEDED, + componentState); + } + + ServiceState serviceState = + testService.getState(); + Assert.assertEquals( + ServiceState.SUCCEEDED, + serviceState); + } + + + private static org.apache.hadoop.yarn.service.api.records.Component createSpecWithEnv(String serviceName, String compName, String key, String val) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java index 60f5c918f9a..03158cfc18e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java @@ -110,7 +110,6 @@ public class TestComponentRestartPolicy { assertEquals(true, restartPolicy.isReadyForDownStream(component)); - when(component.getNumSucceededInstances()).thenReturn(new Long(2)); when(component.getNumFailedInstances()).thenReturn(new Long(1)); when(component.getNumDesiredInstances()).thenReturn(3); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index f4289047f98..e0399816db8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -204,6 +204,8 @@ public class TestComponentInstance { when(componentInstance.getComponent()).thenReturn(component); when(componentInstance.getCompInstanceName()).thenReturn( "compInstance" + instanceId); + Container container = mock(Container.class); + when(componentInstance.getContainerSpec()).thenReturn(container); ServiceUtils.ProcessTerminationHandler terminationHandler = mock( ServiceUtils.ProcessTerminationHandler.class); @@ -227,12 +229,15 @@ public class TestComponentInstance { Mockito.doNothing().when(serviceScheduler).setGracefulStop( any(FinalApplicationStatus.class)); + final String containerDiag = "Container succeeded"; + ComponentInstanceEvent componentInstanceEvent = mock( ComponentInstanceEvent.class); ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId .newInstance(ApplicationId.newInstance(1234L, 1), 1), 1); ContainerStatus containerStatus = ContainerStatus.newInstance(containerId, - org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, "hello", 0); + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + containerDiag, 0); when(componentInstanceEvent.getStatus()).thenReturn(containerStatus); @@ -245,7 +250,7 @@ public class TestComponentInstance { comp.getAllComponentInstances().iterator().next(); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, containerDiag); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); @@ -262,7 +267,7 @@ public class TestComponentInstance { componentInstance = comp.getAllComponentInstances().iterator().next(); containerStatus.setExitStatus(1); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, containerDiag); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, times(1)).reInsertPendingInstance( @@ -286,7 +291,7 @@ public class TestComponentInstance { when(comp.getNumSucceededInstances()).thenReturn(new Long(1)); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, containerDiag); verify(comp, times(1)).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, times(0)).reInsertPendingInstance( @@ -304,8 +309,7 @@ public class TestComponentInstance { when(comp.getNumFailedInstances()).thenReturn(new Long(1)); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); - + componentInstanceEvent, false, containerDiag); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, times(1)).markAsFailed(any(ComponentInstance.class)); verify(comp, times(0)).reInsertPendingInstance( @@ -323,7 +327,7 @@ public class TestComponentInstance { componentInstance = comp.getAllComponentInstances().iterator().next(); containerStatus.setExitStatus(1); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, containerDiag); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, times(1)).reInsertPendingInstance( @@ -340,7 +344,7 @@ public class TestComponentInstance { componentInstance = comp.getAllComponentInstances().iterator().next(); containerStatus.setExitStatus(1); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, containerDiag); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, times(1)).markAsFailed(any(ComponentInstance.class)); verify(comp, times(0)).reInsertPendingInstance( @@ -363,8 +367,7 @@ public class TestComponentInstance { containerStatus.setExitStatus(1); ComponentInstance commponentInstance = iter.next(); ComponentInstance.handleComponentInstanceRelaunch(commponentInstance, - componentInstanceEvent, false); - + componentInstanceEvent, false, containerDiag); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, times(1)).reInsertPendingInstance( @@ -404,7 +407,7 @@ public class TestComponentInstance { when(component2Instance.getComponent().getNumFailedInstances()) .thenReturn(new Long(failed2Instances.size())); ComponentInstance.handleComponentInstanceRelaunch(component2Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, containerDiag); } Map failed1Instances = new HashMap<>(); @@ -418,7 +421,7 @@ public class TestComponentInstance { when(component1Instance.getComponent().getNumFailedInstances()) .thenReturn(new Long(failed1Instances.size())); ComponentInstance.handleComponentInstanceRelaunch(component1Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, containerDiag); } verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); @@ -458,7 +461,7 @@ public class TestComponentInstance { when(component2Instance.getComponent().getNumSucceededInstances()) .thenReturn(new Long(succeeded2Instances.size())); ComponentInstance.handleComponentInstanceRelaunch(component2Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, containerDiag); } Map succeeded1Instances = new HashMap<>(); @@ -471,7 +474,7 @@ public class TestComponentInstance { when(component1Instance.getComponent().getNumSucceededInstances()) .thenReturn(new Long(succeeded1Instances.size())); ComponentInstance.handleComponentInstanceRelaunch(component1Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, containerDiag); } verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class)); @@ -500,7 +503,7 @@ public class TestComponentInstance { for (ComponentInstance component2Instance : component2Instances) { ComponentInstance.handleComponentInstanceRelaunch(component2Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, containerDiag); } succeeded1Instances = new HashMap<>(); @@ -511,7 +514,7 @@ public class TestComponentInstance { when(component1Instance.getComponent().getSucceededInstances()) .thenReturn(succeeded1Instances.values()); ComponentInstance.handleComponentInstanceRelaunch(component1Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, containerDiag); } verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java index cff7229db34..a77e6c8d317 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.timelineservice; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; import org.apache.hadoop.yarn.client.api.TimelineV2Client; @@ -122,7 +123,8 @@ public class TestServiceTimelinePublisher { context.attemptId = ApplicationAttemptId .newInstance(ApplicationId.fromString(service.getId()), 1); String exitDiags = "service killed"; - serviceTimelinePublisher.serviceAttemptUnregistered(context, exitDiags); + serviceTimelinePublisher.serviceAttemptUnregistered(context, + FinalApplicationStatus.ENDED, exitDiags); lastPublishedEntities = ((DummyTimelineClient) timelineClient).getLastPublishedEntities(); for (TimelineEntity timelineEntity : lastPublishedEntities) {