YARN-8489. Support "dominant" component concept in YARN service.

Contributed by Zac Zhou
This commit is contained in:
Eric Yang 2019-01-18 20:23:04 -05:00
parent 751bc62df7
commit 824dfa3b09
8 changed files with 159 additions and 22 deletions

View File

@ -116,6 +116,8 @@ import static org.apache.hadoop.yarn.api.records.ContainerExitStatus
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants
.CONTAINER_STATE_REPORT_AS_SERVICE_STATE;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
.EXIT_FALSE;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes
@ -946,13 +948,53 @@ public class ServiceScheduler extends CompositeService {
return hasAtLeastOnePlacementConstraint;
}
public boolean terminateServiceIfNeeded(Component component) {
boolean serviceIsTerminated =
terminateServiceIfDominantComponentFinished(component) ||
terminateServiceIfAllComponentsFinished();
return serviceIsTerminated;
}
/**
* If the service state component is finished, the service is also terminated.
* @param component
*/
private boolean terminateServiceIfDominantComponentFinished(Component
component) {
boolean shouldTerminate = false;
boolean componentIsDominant = component.getComponentSpec()
.getConfiguration().getPropertyBool(
CONTAINER_STATE_REPORT_AS_SERVICE_STATE, false);
if (componentIsDominant) {
ComponentRestartPolicy restartPolicy =
component.getRestartPolicyHandler();
if (restartPolicy.shouldTerminate(component)) {
shouldTerminate = true;
boolean isSucceeded = restartPolicy.hasCompletedSuccessfully(component);
org.apache.hadoop.yarn.service.api.records.ComponentState state
= isSucceeded ?
org.apache.hadoop.yarn.service.api.records.ComponentState.SUCCEEDED
: org.apache.hadoop.yarn.service.api.records.ComponentState.FAILED;
LOG.info("{} Component state changed from {} to {}",
component.getName(), component.getComponentSpec().getState(),
state);
component.getComponentSpec().setState(state);
LOG.info("Dominate component {} finished, exiting Service Master... " +
", final status=" + (isSucceeded ? "Succeeded" : "Failed"),
component.getName());
terminateService(isSucceeded);
}
}
return shouldTerminate;
}
/*
* Check if all components of the scheduler finished.
* If all components finished
* (which #failed-instances + #suceeded-instances = #total-n-containers)
* The service will be terminated.
*/
public void terminateServiceIfAllComponentsFinished() {
private boolean terminateServiceIfAllComponentsFinished() {
boolean shouldTerminate = true;
// Succeeded comps and failed comps, for logging purposes.
@ -964,19 +1006,19 @@ public class ServiceScheduler extends CompositeService {
if (restartPolicy.shouldTerminate(comp)) {
if (restartPolicy.hasCompletedSuccessfully(comp)) {
comp.getComponentSpec().setState(org.apache.hadoop
.yarn.service.api.records.ComponentState.SUCCEEDED);
LOG.info("{} Component state changed from {} to {}",
comp.getName(), comp.getComponentSpec().getState(),
org.apache.hadoop
.yarn.service.api.records.ComponentState.SUCCEEDED);
comp.getComponentSpec().setState(org.apache.hadoop
.yarn.service.api.records.ComponentState.SUCCEEDED);
} else {
comp.getComponentSpec().setState(org.apache.hadoop
.yarn.service.api.records.ComponentState.FAILED);
LOG.info("{} Component state changed from {} to {}",
comp.getName(), comp.getComponentSpec().getState(),
org.apache.hadoop
.yarn.service.api.records.ComponentState.FAILED);
comp.getComponentSpec().setState(org.apache.hadoop
.yarn.service.api.records.ComponentState.FAILED);
}
if (isTimelineServiceEnabled()) {
@ -1008,8 +1050,14 @@ public class ServiceScheduler extends CompositeService {
LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils
.join(failedComponents, ",") + "]");
terminateService(failedComponents.isEmpty());
}
return shouldTerminate;
}
private void terminateService(boolean isSucceeded) {
int exitStatus = EXIT_SUCCESS;
if (failedComponents.isEmpty()) {
if (isSucceeded) {
setGracefulStop(FinalApplicationStatus.SUCCEEDED);
app.setState(ServiceState.SUCCEEDED);
} else {
@ -1020,7 +1068,6 @@ public class ServiceScheduler extends CompositeService {
getTerminationHandler().terminate(exitStatus);
}
}
public Clock getSystemClock() {
return systemClock;

View File

@ -378,7 +378,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ?
" succeeded" :
" failed") + " without retry, exitStatus=" + event.getStatus());
comp.getScheduler().terminateServiceIfAllComponentsFinished();
comp.getScheduler().terminateServiceIfNeeded(comp);
}
}

View File

@ -95,4 +95,6 @@ public interface YarnServiceConstants {
String PRINCIPAL = "yarn.service.am.principal";
String UPGRADE_DIR = "upgrade";
String CONTAINER_STATE_REPORT_AS_SERVICE_STATE =
"yarn.service.container-state-report-as-service-state";
}

View File

@ -74,6 +74,9 @@ import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_VMEM_CHECK_ENABLE
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_ENABLED;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.AM_RESOURCE_MEM;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants
.CONTAINER_STATE_REPORT_AS_SERVICE_STATE;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -126,6 +129,23 @@ public class ServiceTestUtils {
return exampleApp;
}
public static Service createTerminatingDominantComponentJobExample(
String serviceName) {
Service exampleApp = new Service();
exampleApp.setName(serviceName);
exampleApp.setVersion("v1");
Component serviceStateComponent = createComponent("terminating-comp1", 2,
"sleep 1000", Component.RestartPolicyEnum.NEVER, null);
serviceStateComponent.getConfiguration().setProperty(
CONTAINER_STATE_REPORT_AS_SERVICE_STATE, "true");
exampleApp.addComponent(serviceStateComponent);
exampleApp.addComponent(
createComponent("terminating-comp2", 2, "sleep 60000",
Component.RestartPolicyEnum.ON_FAILURE, null));
return exampleApp;
}
public static Component createComponent(String name) {
return createComponent(name, 2L, "sleep 1000",
Component.RestartPolicyEnum.ALWAYS, null);

View File

@ -41,6 +41,9 @@ import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanc
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants
.CONTAINER_STATE_REPORT_AS_SERVICE_STATE;
/**
* Tests for {@link Component}.
*/
@ -440,6 +443,60 @@ public class TestComponent {
serviceState);
}
@Test
public void testComponentStateUpdatesWithTerminatingDominantComponents()
throws Exception {
final String serviceName =
"testComponentStateUpdatesWithTerminatingServiceStateComponents";
Service testService =
ServiceTestUtils.createTerminatingDominantComponentJobExample(
serviceName);
TestServiceManager.createDef(serviceName, testService);
ServiceContext context = new MockRunningServiceContext(rule, testService);
for (Component comp : context.scheduler.getAllComponents().values()) {
boolean componentIsDominant = comp.getComponentSpec()
.getConfiguration().getPropertyBool(
CONTAINER_STATE_REPORT_AS_SERVICE_STATE, false);
if (componentIsDominant) {
Iterator<ComponentInstance> instanceIter = comp.
getAllComponentInstances().iterator();
while (instanceIter.hasNext()) {
ComponentInstance componentInstance = instanceIter.next();
Container instanceContainer = componentInstance.getContainer();
//stop 1 container
ContainerStatus containerStatus = ContainerStatus.newInstance(
instanceContainer.getId(),
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
"successful", 0);
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus)
.setContainerId(instanceContainer.getId()));
componentInstance.handle(
new ComponentInstanceEvent(componentInstance.getContainer().
getId(), ComponentInstanceEventType.STOP).
setStatus(containerStatus));
}
ComponentState componentState =
comp.getComponentSpec().getState();
Assert.assertEquals(
ComponentState.SUCCEEDED,
componentState);
}
}
ServiceState serviceState =
testService.getState();
Assert.assertEquals(
ServiceState.SUCCEEDED,
serviceState);
}
private static org.apache.hadoop.yarn.service.api.records.Component
createSpecWithEnv(String serviceName, String compName, String key,
String val) {

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
@ -348,6 +349,8 @@ public class TestComponentInstance {
org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock(
org.apache.hadoop.yarn.service.api.records.Component.class);
when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy);
Configuration conf = new Configuration();
when(componentSpec.getConfiguration()).thenReturn(conf);
when(comp.getRestartPolicyHandler()).thenReturn(
Component.getRestartPolicyHandler(restartPolicy));
when(componentSpec.getNumberOfContainers()).thenReturn(
@ -401,6 +404,8 @@ public class TestComponentInstance {
org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock(
org.apache.hadoop.yarn.service.api.records.Component.class);
when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy);
Configuration conf = new Configuration();
when(componentSpec.getConfiguration()).thenReturn(conf);
when(comp.getRestartPolicyHandler()).thenReturn(
Component.getRestartPolicyHandler(restartPolicy));
when(componentSpec.getNumberOfContainers()).thenReturn(

View File

@ -65,6 +65,9 @@ import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants
.CONTAINER_STATE_REPORT_AS_SERVICE_STATE;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS;
import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
@ -492,6 +495,8 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
if (taskType.equals(TaskType.PRIMARY_WORKER)) {
workerComponent.setNumberOfContainers(1L);
workerComponent.getConfiguration().setProperty(
CONTAINER_STATE_REPORT_AS_SERVICE_STATE, "true");
} else{
workerComponent.setNumberOfContainers(
(long) parameters.getNumWorkers() - 1);

View File

@ -147,6 +147,7 @@ Component-level service AM configuration properties can be specified either in t
|yarn.service.container-health-threshold.poll-frequency-secs | Health check monitor poll frequency. It is an advanced setting and does not need to be set unless the service owner understands the implication and does not want the default. The default is 10 secs.
|yarn.service.container-health-threshold.window-secs | The amount of time the health check monitor allows a specific component to be below the health threshold after which it considers the service to be unhealthy. The default is 600 secs (10 mins).
|yarn.service.container-health-threshold.init-delay-secs | The amount of initial time the health check monitor waits before the first check kicks in. It gives a lead time for the service containers to come up for the first time. The default is 600 secs (10 mins).
|yarn.service.container-state-report-as-service-state | The boolean flag indicates that if this component is finished, the service is also terminated. The default is false.
There is one component-level configuration property that is set differently in the `yarn-site.xml` file than it is in the service specification.
To select the docker network type that will be used for docker containers, `docker.network` may be set in the service `Configuration` `properties` or the component `Configuration` `properties`.