YARN-8160. Support upgrade of service that use docker containers.

Contributed by Chandni Singh
This commit is contained in:
Eric Yang 2018-11-16 16:01:25 -05:00
parent 077a7ab37b
commit fce0350289
11 changed files with 216 additions and 37 deletions

View File

@ -635,7 +635,8 @@ public class Component implements EventHandler<ComponentEvent> {
version); version);
launchContext.setArtifact(compSpec.getArtifact()) launchContext.setArtifact(compSpec.getArtifact())
.setConfiguration(compSpec.getConfiguration()) .setConfiguration(compSpec.getConfiguration())
.setLaunchCommand(compSpec.getLaunchCommand()); .setLaunchCommand(compSpec.getLaunchCommand())
.setRunPrivilegedContainer(compSpec.getRunPrivilegedContainer());
return launchContext; return launchContext;
} }

View File

@ -509,7 +509,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
return this.container.getNodeId(); return this.container.getNodeId();
} }
public org.apache.hadoop.yarn.service.api.records.Component getCompSpec() { private org.apache.hadoop.yarn.service.api.records.Component getCompSpec() {
return component.getComponentSpec(); return component.getComponentSpec();
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service.containerlaunch; package org.apache.hadoop.yarn.service.containerlaunch;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@ -254,4 +255,8 @@ public class AbstractLauncher {
this.runPrivilegedContainer = runPrivilegedContainer; this.runPrivilegedContainer = runPrivilegedContainer;
} }
@VisibleForTesting
public String getDockerImage() {
return dockerImage;
}
} }

View File

@ -139,6 +139,7 @@ public class ContainerLaunchService extends AbstractService{
private org.apache.hadoop.yarn.service.api.records.Configuration private org.apache.hadoop.yarn.service.api.records.Configuration
configuration; configuration;
private String launchCommand; private String launchCommand;
private boolean runPrivilegedContainer;
public ComponentLaunchContext(String name, String serviceVersion) { public ComponentLaunchContext(String name, String serviceVersion) {
this.name = Preconditions.checkNotNull(name); this.name = Preconditions.checkNotNull(name);
@ -166,6 +167,10 @@ public class ContainerLaunchService extends AbstractService{
return launchCommand; return launchCommand;
} }
public boolean isRunPrivilegedContainer() {
return runPrivilegedContainer;
}
public ComponentLaunchContext setArtifact(Artifact artifact) { public ComponentLaunchContext setArtifact(Artifact artifact) {
this.artifact = artifact; this.artifact = artifact;
return this; return this;
@ -181,5 +186,11 @@ public class ContainerLaunchService extends AbstractService{
this.launchCommand = launchCommand; this.launchCommand = launchCommand;
return this; return this;
} }
public ComponentLaunchContext setRunPrivilegedContainer(
boolean runPrivilegedContainer) {
this.runPrivilegedContainer = runPrivilegedContainer;
return this;
}
} }
} }

View File

@ -56,7 +56,8 @@ public abstract class AbstractProviderService implements ProviderService,
public abstract void processArtifact(AbstractLauncher launcher, public abstract void processArtifact(AbstractLauncher launcher,
ComponentInstance compInstance, SliderFileSystem fileSystem, ComponentInstance compInstance, SliderFileSystem fileSystem,
Service service) Service service,
ContainerLaunchService.ComponentLaunchContext compLaunchCtx)
throws IOException; throws IOException;
public Map<String, String> buildContainerTokens(ComponentInstance instance, public Map<String, String> buildContainerTokens(ComponentInstance instance,
@ -140,7 +141,7 @@ public abstract class AbstractProviderService implements ProviderService,
SliderFileSystem fileSystem, Configuration yarnConf, Container container, SliderFileSystem fileSystem, Configuration yarnConf, Container container,
ContainerLaunchService.ComponentLaunchContext compLaunchContext) ContainerLaunchService.ComponentLaunchContext compLaunchContext)
throws IOException, SliderException { throws IOException, SliderException {
processArtifact(launcher, instance, fileSystem, service); processArtifact(launcher, instance, fileSystem, service, compLaunchContext);
ServiceContext context = ServiceContext context =
instance.getComponent().getScheduler().getContext(); instance.getComponent().getScheduler().getContext();

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service.provider.defaultImpl;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.provider.AbstractProviderService; import org.apache.hadoop.yarn.service.provider.AbstractProviderService;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher; import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
@ -30,7 +31,8 @@ public class DefaultProviderService extends AbstractProviderService {
@Override @Override
public void processArtifact(AbstractLauncher launcher, public void processArtifact(AbstractLauncher launcher,
ComponentInstance compInstance, SliderFileSystem fileSystem, ComponentInstance compInstance, SliderFileSystem fileSystem,
Service service) Service service,
ContainerLaunchService.ComponentLaunchContext compLaunchCtx)
throws IOException { throws IOException {
} }
} }

View File

@ -23,7 +23,6 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.provider.AbstractProviderService; import org.apache.hadoop.yarn.service.provider.AbstractProviderService;
import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher; import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
@ -38,34 +37,38 @@ import java.util.Map;
public class DockerProviderService extends AbstractProviderService public class DockerProviderService extends AbstractProviderService
implements DockerKeys { implements DockerKeys {
@Override
public void processArtifact(AbstractLauncher launcher, public void processArtifact(AbstractLauncher launcher,
ComponentInstance compInstance, SliderFileSystem fileSystem, ComponentInstance compInstance, SliderFileSystem fileSystem,
Service service) throws IOException{ Service service, ContainerLaunchService.ComponentLaunchContext
compLaunchCtx) throws IOException{
launcher.setYarnDockerMode(true); launcher.setYarnDockerMode(true);
launcher.setDockerImage(compInstance.getCompSpec().getArtifact().getId()); launcher.setDockerImage(compLaunchCtx.getArtifact().getId());
launcher.setDockerNetwork(compInstance.getCompSpec().getConfiguration() launcher.setDockerNetwork(compLaunchCtx.getConfiguration()
.getProperty(DOCKER_NETWORK)); .getProperty(DOCKER_NETWORK));
launcher.setDockerHostname(compInstance.getHostname()); launcher.setDockerHostname(compInstance.getHostname());
launcher.setRunPrivilegedContainer( launcher.setRunPrivilegedContainer(
compInstance.getCompSpec().getRunPrivilegedContainer()); compLaunchCtx.isRunPrivilegedContainer());
} }
/** /**
* Check if system is default to disable docker override or * Check if system is default to disable docker override or
* user requested a Docker container with ENTRY_POINT support. * user requested a Docker container with ENTRY_POINT support.
* *
* @param component - YARN Service component * @param compLaunchContext - launch context for the component.
* @return true if Docker launch command override is disabled * @return true if Docker launch command override is disabled
*/ */
private boolean checkUseEntryPoint(Component component) { private boolean checkUseEntryPoint(
ContainerLaunchService.ComponentLaunchContext compLaunchContext) {
boolean overrideDisable = false; boolean overrideDisable = false;
String overrideDisableKey = Environment. String overrideDisableKey = Environment.
YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE. YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE.
name(); name();
String overrideDisableValue = (component String overrideDisableValue = (
.getConfiguration().getEnv(overrideDisableKey) != null) ? compLaunchContext.getConfiguration().getEnv(overrideDisableKey)
component.getConfiguration().getEnv(overrideDisableKey) : != null) ?
System.getenv(overrideDisableKey); compLaunchContext.getConfiguration().getEnv(
overrideDisableKey) : System.getenv(overrideDisableKey);
overrideDisable = Boolean.parseBoolean(overrideDisableValue); overrideDisable = Boolean.parseBoolean(overrideDisableValue);
return overrideDisable; return overrideDisable;
} }
@ -77,10 +80,9 @@ public class DockerProviderService extends AbstractProviderService
ContainerLaunchService.ComponentLaunchContext compLaunchContext, ContainerLaunchService.ComponentLaunchContext compLaunchContext,
Map<String, String> tokensForSubstitution) Map<String, String> tokensForSubstitution)
throws IOException, SliderException { throws IOException, SliderException {
Component component = instance.getComponent().getComponentSpec(); boolean useEntryPoint = checkUseEntryPoint(compLaunchContext);
boolean useEntryPoint = checkUseEntryPoint(component);
if (useEntryPoint) { if (useEntryPoint) {
String launchCommand = component.getLaunchCommand(); String launchCommand = compLaunchContext.getLaunchCommand();
if (!StringUtils.isEmpty(launchCommand)) { if (!StringUtils.isEmpty(launchCommand)) {
launcher.addCommand(launchCommand); launcher.addCommand(launchCommand);
} }

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.provider.AbstractProviderService; import org.apache.hadoop.yarn.service.provider.AbstractProviderService;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher; import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
@ -33,9 +34,9 @@ public class TarballProviderService extends AbstractProviderService {
@Override @Override
public void processArtifact(AbstractLauncher launcher, public void processArtifact(AbstractLauncher launcher,
ComponentInstance instance, SliderFileSystem fileSystem, ComponentInstance instance, SliderFileSystem fileSystem,
Service service) Service service, ContainerLaunchService.ComponentLaunchContext
throws IOException { compLaunchCtx) throws IOException {
Path artifact = new Path(instance.getCompSpec().getArtifact().getId()); Path artifact = new Path(compLaunchCtx.getArtifact().getId());
if (!fileSystem.isFile(artifact)) { if (!fileSystem.isFile(artifact)) {
throw new IOException( throw new IOException(
"Package doesn't exist as a resource: " + artifact); "Package doesn't exist as a resource: " + artifact);

View File

@ -101,8 +101,8 @@ public class TestComponentInstance {
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY)); ComponentInstanceEventType.BECOME_READY));
Assert.assertEquals("instance not ready", ContainerState.READY, Assert.assertEquals("instance not ready", ContainerState.READY,
instance.getCompSpec().getContainer( component.getComponentSpec().getContainer(instance.getContainer()
instance.getContainer().getId().toString()).getState()); .getId().toString()).getState());
} }
private void upgradeComponent(Component component) { private void upgradeComponent(Component component) {

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.provider;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.TestServiceManager;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.provider.docker.DockerProviderService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests for {@link AbstractProviderService}
*/
public class TestAbstractProviderService {
private ServiceContext serviceContext;
private Service testService;
private AbstractLauncher launcher;
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Before
public void setup() throws Exception {
testService = TestServiceManager.createBaseDef("testService");
serviceContext = new MockRunningServiceContext(rule, testService);
launcher = new AbstractLauncher(serviceContext);
rule.getFs().setAppDir(new Path("target/testAbstractProviderService"));
}
@After
public void teardown() throws Exception {
FileUtils.deleteQuietly(
new File(rule.getFs().getAppDir().toUri().getPath()));
}
@Test
public void testBuildContainerLaunchCommand() throws Exception {
AbstractProviderService providerService = new DockerProviderService();
Component component = serviceContext.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
ContainerLaunchService.ComponentLaunchContext clc =
createEntryPointCLCFor(testService, component);
ComponentInstance instance = component.getAllComponentInstances().iterator()
.next();
Container container = mock(Container.class);
providerService.buildContainerLaunchCommand(launcher, testService, instance,
rule.getFs(), serviceContext.scheduler.getConfig(), container, clc,
null);
Assert.assertEquals("commands", Lists.newArrayList(clc.getLaunchCommand()),
launcher.getCommands());
}
@Test
public void testBuildContainerLaunchContext() throws Exception {
AbstractProviderService providerService = new DockerProviderService();
Component component = serviceContext.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
ContainerLaunchService.ComponentLaunchContext clc =
createEntryPointCLCFor(testService, component);
ComponentInstance instance = component.getAllComponentInstances().iterator()
.next();
Container container = mock(Container.class);
ContainerId containerId = ContainerId.newContainerId(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(
System.currentTimeMillis(), 1), 1), 1L);
when(container.getId()).thenReturn(containerId);
providerService.buildContainerLaunchContext(launcher, testService, instance,
rule.getFs(), serviceContext.scheduler.getConfig(), container, clc);
Assert.assertEquals("artifact", clc.getArtifact().getId(),
launcher.getDockerImage());
}
private static ContainerLaunchService.ComponentLaunchContext
createEntryPointCLCFor(Service service, Component component) {
String launchCmd = "sleep,9000";
Artifact artifact = new Artifact();
artifact.setType(Artifact.TypeEnum.DOCKER);
artifact.setId("example");
Map<String, String> env = new HashMap<>();
env.put("YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL", "true");
env.put("YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE", "true");
component.getComponentSpec().getConfiguration().setEnv(env);
return new ContainerLaunchService.ComponentLaunchContext(
component.getName(),
service.getVersion())
.setArtifact(artifact)
.setConfiguration(component.getComponentSpec().getConfiguration())
.setLaunchCommand(launchCmd);
}
}

View File

@ -45,6 +45,8 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -138,6 +140,8 @@ public class ContainerLaunch implements Callable<Integer> {
protected final LocalDirsHandlerService dirsHandler; protected final LocalDirsHandlerService dirsHandler;
private final Lock containerExecLock = new ReentrantLock();
public ContainerLaunch(Context context, Configuration configuration, public ContainerLaunch(Context context, Configuration configuration,
Dispatcher dispatcher, ContainerExecutor exec, Application app, Dispatcher dispatcher, ContainerExecutor exec, Application app,
Container container, LocalDirsHandlerService dirsHandler, Container container, LocalDirsHandlerService dirsHandler,
@ -492,7 +496,12 @@ public class ContainerLaunch implements Callable<Integer> {
throws IOException, ConfigurationException { throws IOException, ConfigurationException {
int launchPrep = prepareForLaunch(ctx); int launchPrep = prepareForLaunch(ctx);
if (launchPrep == 0) { if (launchPrep == 0) {
return exec.launchContainer(ctx); containerExecLock.lock();
try {
return exec.launchContainer(ctx);
} finally {
containerExecLock.unlock();
}
} }
return launchPrep; return launchPrep;
} }
@ -502,7 +511,12 @@ public class ContainerLaunch implements Callable<Integer> {
throws IOException, ConfigurationException { throws IOException, ConfigurationException {
int launchPrep = prepareForLaunch(ctx); int launchPrep = prepareForLaunch(ctx);
if (launchPrep == 0) { if (launchPrep == 0) {
return exec.relaunchContainer(ctx); containerExecLock.lock();
try {
return exec.relaunchContainer(ctx);
} finally {
containerExecLock.unlock();
}
} }
return launchPrep; return launchPrep;
} }
@ -813,18 +827,22 @@ public class ContainerLaunch implements Callable<Integer> {
lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false); lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
} }
} }
containerExecLock.lock();
// Reap the container try {
boolean result = exec.reapContainer( // Reap the container
new ContainerReapContext.Builder() boolean result = exec.reapContainer(
.setContainer(container) new ContainerReapContext.Builder()
.setUser(container.getUser()) .setContainer(container)
.build()); .setUser(container.getUser())
if (!result) { .build());
throw new IOException("Reap container failed for container " if (!result) {
+ containerIdStr); throw new IOException("Reap container failed for container "
+ containerIdStr);
}
cleanupContainerFiles(getContainerWorkDir());
} finally {
containerExecLock.unlock();
} }
cleanupContainerFiles(getContainerWorkDir());
} }
/** /**