From 824dfa3b099a09f18db6b0a3285eb4379ab72c2d Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Fri, 18 Jan 2019 20:23:04 -0500 Subject: [PATCH] YARN-8489. Support "dominant" component concept in YARN service. Contributed by Zac Zhou --- .../hadoop/yarn/service/ServiceScheduler.java | 89 ++++++++++++++----- .../component/instance/ComponentInstance.java | 2 +- .../service/conf/YarnServiceConstants.java | 2 + .../hadoop/yarn/service/ServiceTestUtils.java | 20 +++++ .../yarn/service/component/TestComponent.java | 57 ++++++++++++ .../instance/TestComponentInstance.java | 5 ++ .../yarnservice/YarnServiceJobSubmitter.java | 5 ++ .../markdown/yarn-service/Configurations.md | 1 + 8 files changed, 159 insertions(+), 22 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index dabff2c2032..f2cd5cb7a32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -116,6 +116,8 @@ import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*; import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START; +import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants + .CONTAINER_STATE_REPORT_AS_SERVICE_STATE; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes .EXIT_FALSE; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes @@ -946,13 +948,53 @@ public boolean hasAtLeastOnePlacementConstraint() { return hasAtLeastOnePlacementConstraint; } + public boolean terminateServiceIfNeeded(Component component) { + boolean serviceIsTerminated = + terminateServiceIfDominantComponentFinished(component) || + terminateServiceIfAllComponentsFinished(); + return serviceIsTerminated; + } + + /** + * If the service state component is finished, the service is also terminated. + * @param component + */ + private boolean terminateServiceIfDominantComponentFinished(Component + component) { + boolean shouldTerminate = false; + boolean componentIsDominant = component.getComponentSpec() + .getConfiguration().getPropertyBool( + CONTAINER_STATE_REPORT_AS_SERVICE_STATE, false); + if (componentIsDominant) { + ComponentRestartPolicy restartPolicy = + component.getRestartPolicyHandler(); + if (restartPolicy.shouldTerminate(component)) { + shouldTerminate = true; + boolean isSucceeded = restartPolicy.hasCompletedSuccessfully(component); + org.apache.hadoop.yarn.service.api.records.ComponentState state + = isSucceeded ? + org.apache.hadoop.yarn.service.api.records.ComponentState.SUCCEEDED + : org.apache.hadoop.yarn.service.api.records.ComponentState.FAILED; + LOG.info("{} Component state changed from {} to {}", + component.getName(), component.getComponentSpec().getState(), + state); + component.getComponentSpec().setState(state); + LOG.info("Dominate component {} finished, exiting Service Master... " + + ", final status=" + (isSucceeded ? "Succeeded" : "Failed"), + component.getName()); + terminateService(isSucceeded); + } + } + return shouldTerminate; + } + /* -* Check if all components of the scheduler finished. -* If all components finished -* (which #failed-instances + #suceeded-instances = #total-n-containers) -* The service will be terminated. -*/ - public void terminateServiceIfAllComponentsFinished() { + * Check if all components of the scheduler finished. + * If all components finished + * (which #failed-instances + #suceeded-instances = #total-n-containers) + * The service will be terminated. + */ + private boolean terminateServiceIfAllComponentsFinished() { boolean shouldTerminate = true; // Succeeded comps and failed comps, for logging purposes. @@ -964,19 +1006,19 @@ public void terminateServiceIfAllComponentsFinished() { if (restartPolicy.shouldTerminate(comp)) { if (restartPolicy.hasCompletedSuccessfully(comp)) { - comp.getComponentSpec().setState(org.apache.hadoop - .yarn.service.api.records.ComponentState.SUCCEEDED); LOG.info("{} Component state changed from {} to {}", comp.getName(), comp.getComponentSpec().getState(), org.apache.hadoop .yarn.service.api.records.ComponentState.SUCCEEDED); - } else { comp.getComponentSpec().setState(org.apache.hadoop - .yarn.service.api.records.ComponentState.FAILED); + .yarn.service.api.records.ComponentState.SUCCEEDED); + } else { LOG.info("{} Component state changed from {} to {}", comp.getName(), comp.getComponentSpec().getState(), org.apache.hadoop .yarn.service.api.records.ComponentState.FAILED); + comp.getComponentSpec().setState(org.apache.hadoop + .yarn.service.api.records.ComponentState.FAILED); } if (isTimelineServiceEnabled()) { @@ -1008,18 +1050,23 @@ public void terminateServiceIfAllComponentsFinished() { LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils .join(failedComponents, ",") + "]"); - int exitStatus = EXIT_SUCCESS; - if (failedComponents.isEmpty()) { - setGracefulStop(FinalApplicationStatus.SUCCEEDED); - app.setState(ServiceState.SUCCEEDED); - } else { - setGracefulStop(FinalApplicationStatus.FAILED); - app.setState(ServiceState.FAILED); - exitStatus = EXIT_FALSE; - } - - getTerminationHandler().terminate(exitStatus); + terminateService(failedComponents.isEmpty()); } + return shouldTerminate; + } + + private void terminateService(boolean isSucceeded) { + int exitStatus = EXIT_SUCCESS; + if (isSucceeded) { + setGracefulStop(FinalApplicationStatus.SUCCEEDED); + app.setState(ServiceState.SUCCEEDED); + } else { + setGracefulStop(FinalApplicationStatus.FAILED); + app.setState(ServiceState.FAILED); + exitStatus = EXIT_FALSE; + } + + getTerminationHandler().terminate(exitStatus); } public Clock getSystemClock() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 66c298d337c..27153da262e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -378,7 +378,7 @@ static void handleComponentInstanceRelaunch(ComponentInstance compInstance, LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ? " succeeded" : " failed") + " without retry, exitStatus=" + event.getStatus()); - comp.getScheduler().terminateServiceIfAllComponentsFinished(); + comp.getScheduler().terminateServiceIfNeeded(comp); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java index d0816064893..05135fe6196 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java @@ -95,4 +95,6 @@ public interface YarnServiceConstants { String PRINCIPAL = "yarn.service.am.principal"; String UPGRADE_DIR = "upgrade"; + String CONTAINER_STATE_REPORT_AS_SERVICE_STATE = + "yarn.service.container-state-report-as-service-state"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index b4859af869e..6207d632059 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -74,6 +74,9 @@ import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_ENABLED; import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.AM_RESOURCE_MEM; import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH; + +import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants + .CONTAINER_STATE_REPORT_AS_SERVICE_STATE; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -126,6 +129,23 @@ public static Service createTerminatingJobExample(String serviceName) { return exampleApp; } + public static Service createTerminatingDominantComponentJobExample( + String serviceName) { + Service exampleApp = new Service(); + exampleApp.setName(serviceName); + exampleApp.setVersion("v1"); + Component serviceStateComponent = createComponent("terminating-comp1", 2, + "sleep 1000", Component.RestartPolicyEnum.NEVER, null); + serviceStateComponent.getConfiguration().setProperty( + CONTAINER_STATE_REPORT_AS_SERVICE_STATE, "true"); + exampleApp.addComponent(serviceStateComponent); + exampleApp.addComponent( + createComponent("terminating-comp2", 2, "sleep 60000", + Component.RestartPolicyEnum.ON_FAILURE, null)); + + return exampleApp; + } + public static Component createComponent(String name) { return createComponent(name, 2L, "sleep 1000", Component.RestartPolicyEnum.ALWAYS, null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index 1961ff4791a..f8f948dd88f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -41,6 +41,9 @@ import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP; +import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants + .CONTAINER_STATE_REPORT_AS_SERVICE_STATE; + /** * Tests for {@link Component}. */ @@ -440,6 +443,60 @@ public void testComponentStateUpdatesWithTerminatingComponents() serviceState); } + @Test + public void testComponentStateUpdatesWithTerminatingDominantComponents() + throws Exception { + final String serviceName = + "testComponentStateUpdatesWithTerminatingServiceStateComponents"; + + Service testService = + ServiceTestUtils.createTerminatingDominantComponentJobExample( + serviceName); + TestServiceManager.createDef(serviceName, testService); + + ServiceContext context = new MockRunningServiceContext(rule, testService); + + for (Component comp : context.scheduler.getAllComponents().values()) { + boolean componentIsDominant = comp.getComponentSpec() + .getConfiguration().getPropertyBool( + CONTAINER_STATE_REPORT_AS_SERVICE_STATE, false); + if (componentIsDominant) { + Iterator instanceIter = comp. + getAllComponentInstances().iterator(); + + while (instanceIter.hasNext()) { + + ComponentInstance componentInstance = instanceIter.next(); + Container instanceContainer = componentInstance.getContainer(); + + //stop 1 container + ContainerStatus containerStatus = ContainerStatus.newInstance( + instanceContainer.getId(), + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + "successful", 0); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus) + .setContainerId(instanceContainer.getId())); + componentInstance.handle( + new ComponentInstanceEvent(componentInstance.getContainer(). + getId(), ComponentInstanceEventType.STOP). + setStatus(containerStatus)); + } + ComponentState componentState = + comp.getComponentSpec().getState(); + Assert.assertEquals( + ComponentState.SUCCEEDED, + componentState); + } + } + + ServiceState serviceState = + testService.getState(); + Assert.assertEquals( + ServiceState.SUCCEEDED, + serviceState); + } + private static org.apache.hadoop.yarn.service.api.records.Component createSpecWithEnv(String serviceName, String compName, String key, String val) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index f857353a629..c3b1602c47b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.api.records.Configuration; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; @@ -348,6 +349,8 @@ private Component createComponent(ServiceScheduler scheduler, org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock( org.apache.hadoop.yarn.service.api.records.Component.class); when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy); + Configuration conf = new Configuration(); + when(componentSpec.getConfiguration()).thenReturn(conf); when(comp.getRestartPolicyHandler()).thenReturn( Component.getRestartPolicyHandler(restartPolicy)); when(componentSpec.getNumberOfContainers()).thenReturn( @@ -401,6 +404,8 @@ private Component createComponent(ServiceScheduler scheduler, org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock( org.apache.hadoop.yarn.service.api.records.Component.class); when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy); + Configuration conf = new Configuration(); + when(componentSpec.getConfiguration()).thenReturn(conf); when(comp.getRestartPolicyHandler()).thenReturn( Component.getRestartPolicyHandler(restartPolicy)); when(componentSpec.getNumberOfContainers()).thenReturn( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java index 06d33b2354b..842f4ad4c39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java @@ -65,6 +65,9 @@ import java.util.zip.ZipOutputStream; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + +import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants + .CONTAINER_STATE_REPORT_AS_SERVICE_STATE; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS; import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; @@ -492,6 +495,8 @@ private void addWorkerComponent(Service service, if (taskType.equals(TaskType.PRIMARY_WORKER)) { workerComponent.setNumberOfContainers(1L); + workerComponent.getConfiguration().setProperty( + CONTAINER_STATE_REPORT_AS_SERVICE_STATE, "true"); } else{ workerComponent.setNumberOfContainers( (long) parameters.getNumWorkers() - 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md index 524cfb925f5..d0ae15a7122 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md @@ -147,6 +147,7 @@ Component-level service AM configuration properties can be specified either in t |yarn.service.container-health-threshold.poll-frequency-secs | Health check monitor poll frequency. It is an advanced setting and does not need to be set unless the service owner understands the implication and does not want the default. The default is 10 secs. |yarn.service.container-health-threshold.window-secs | The amount of time the health check monitor allows a specific component to be below the health threshold after which it considers the service to be unhealthy. The default is 600 secs (10 mins). |yarn.service.container-health-threshold.init-delay-secs | The amount of initial time the health check monitor waits before the first check kicks in. It gives a lead time for the service containers to come up for the first time. The default is 600 secs (10 mins). +|yarn.service.container-state-report-as-service-state | The boolean flag indicates that if this component is finished, the service is also terminated. The default is false. There is one component-level configuration property that is set differently in the `yarn-site.xml` file than it is in the service specification. To select the docker network type that will be used for docker containers, `docker.network` may be set in the service `Configuration` `properties` or the component `Configuration` `properties`.