YARN-7996. Allow user supplied Docker client configurations with YARN native services. Contributed by Shane Kumpf
(cherry picked from commit 1d6e43da51
)
This commit is contained in:
parent
8f300c9ce1
commit
42aef3b655
|
@ -251,6 +251,9 @@ definitions:
|
||||||
kerberos_principal:
|
kerberos_principal:
|
||||||
description: The Kerberos Principal of the service
|
description: The Kerberos Principal of the service
|
||||||
$ref: '#/definitions/KerberosPrincipal'
|
$ref: '#/definitions/KerberosPrincipal'
|
||||||
|
docker_client_config:
|
||||||
|
type: string
|
||||||
|
description: URI of the file containing the docker client configuration (e.g. hdfs:///tmp/config.json).
|
||||||
ResourceInformation:
|
ResourceInformation:
|
||||||
description:
|
description:
|
||||||
ResourceInformation determines unit/value of resource types in addition to memory and vcores. It will be part of Resource object.
|
ResourceInformation determines unit/value of resource types in addition to memory and vcores. It will be part of Resource object.
|
||||||
|
|
|
@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.service;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import java.io.BufferedWriter;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileWriter;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -27,6 +30,7 @@ import javax.ws.rs.Path;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import javax.ws.rs.core.Response.Status;
|
import javax.ws.rs.core.Response.Status;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Artifact.TypeEnum;
|
import org.apache.hadoop.yarn.service.api.records.Artifact.TypeEnum;
|
||||||
|
@ -90,7 +94,19 @@ public class TestApiServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGoodCreateService() {
|
public void testGoodCreateService() throws Exception {
|
||||||
|
String json = "{\"auths\": "
|
||||||
|
+ "{\"https://index.docker.io/v1/\": "
|
||||||
|
+ "{\"auth\": \"foobarbaz\"},"
|
||||||
|
+ "\"registry.example.com\": "
|
||||||
|
+ "{\"auth\": \"bazbarfoo\"}}}";
|
||||||
|
File dockerTmpDir = new File("target", "docker-tmp");
|
||||||
|
FileUtils.deleteQuietly(dockerTmpDir);
|
||||||
|
dockerTmpDir.mkdirs();
|
||||||
|
String dockerConfig = dockerTmpDir + "/config.json";
|
||||||
|
BufferedWriter bw = new BufferedWriter(new FileWriter(dockerConfig));
|
||||||
|
bw.write(json);
|
||||||
|
bw.close();
|
||||||
Service service = new Service();
|
Service service = new Service();
|
||||||
service.setName("jenkins");
|
service.setName("jenkins");
|
||||||
service.setVersion("v1");
|
service.setVersion("v1");
|
||||||
|
@ -115,6 +131,33 @@ public class TestApiServer {
|
||||||
actual.getStatus());
|
actual.getStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInternalServerErrorDockerClientConfigMissingCreateService() {
|
||||||
|
Service service = new Service();
|
||||||
|
service.setName("jenkins");
|
||||||
|
service.setVersion("v1");
|
||||||
|
service.setDockerClientConfig("/does/not/exist/config.json");
|
||||||
|
Artifact artifact = new Artifact();
|
||||||
|
artifact.setType(TypeEnum.DOCKER);
|
||||||
|
artifact.setId("jenkins:latest");
|
||||||
|
Resource resource = new Resource();
|
||||||
|
resource.setCpus(1);
|
||||||
|
resource.setMemory("2048");
|
||||||
|
List<Component> components = new ArrayList<>();
|
||||||
|
Component c = new Component();
|
||||||
|
c.setName("jenkins");
|
||||||
|
c.setNumberOfContainers(1L);
|
||||||
|
c.setArtifact(artifact);
|
||||||
|
c.setLaunchCommand("");
|
||||||
|
c.setResource(resource);
|
||||||
|
components.add(c);
|
||||||
|
service.setComponents(components);
|
||||||
|
final Response actual = apiServer.createService(request, service);
|
||||||
|
assertEquals("Create service is ",
|
||||||
|
Response.status(Status.BAD_REQUEST).build().getStatus(),
|
||||||
|
actual.getStatus());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBadGetService() {
|
public void testBadGetService() {
|
||||||
final Response actual = apiServer.getService(request, "no-jenkins");
|
final Response actual = apiServer.getService(request, "no-jenkins");
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.service;
|
package org.apache.hadoop.yarn.service;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.cli.Options;
|
import org.apache.commons.cli.Options;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
@ -89,8 +90,8 @@ public class ServiceMaster extends CompositeService {
|
||||||
fs.setAppDir(appDir);
|
fs.setAppDir(appDir);
|
||||||
loadApplicationJson(context, fs);
|
loadApplicationJson(context, fs);
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
|
||||||
context.tokens = recordTokensForContainers();
|
context.tokens = recordTokensForContainers();
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
doSecureLogin();
|
doSecureLogin();
|
||||||
}
|
}
|
||||||
// Take yarn config from YarnFile and merge them into YarnConfiguration
|
// Take yarn config from YarnFile and merge them into YarnConfiguration
|
||||||
|
@ -128,15 +129,10 @@ public class ServiceMaster extends CompositeService {
|
||||||
|
|
||||||
// Record the tokens and use them for launching containers.
|
// Record the tokens and use them for launching containers.
|
||||||
// e.g. localization requires the hdfs delegation tokens
|
// e.g. localization requires the hdfs delegation tokens
|
||||||
private ByteBuffer recordTokensForContainers() throws IOException {
|
@VisibleForTesting
|
||||||
|
protected ByteBuffer recordTokensForContainers() throws IOException {
|
||||||
Credentials copy = new Credentials(UserGroupInformation.getCurrentUser()
|
Credentials copy = new Credentials(UserGroupInformation.getCurrentUser()
|
||||||
.getCredentials());
|
.getCredentials());
|
||||||
DataOutputBuffer dob = new DataOutputBuffer();
|
|
||||||
try {
|
|
||||||
copy.writeTokenStorageToStream(dob);
|
|
||||||
} finally {
|
|
||||||
dob.close();
|
|
||||||
}
|
|
||||||
// Now remove the AM->RM token so that task containers cannot access it.
|
// Now remove the AM->RM token so that task containers cannot access it.
|
||||||
Iterator<Token<?>> iter = copy.getAllTokens().iterator();
|
Iterator<Token<?>> iter = copy.getAllTokens().iterator();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
|
@ -146,6 +142,12 @@ public class ServiceMaster extends CompositeService {
|
||||||
iter.remove();
|
iter.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
try {
|
||||||
|
copy.writeTokenStorageToStream(dob);
|
||||||
|
} finally {
|
||||||
|
dob.close();
|
||||||
|
}
|
||||||
return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,6 +73,7 @@ public class Service extends BaseResource {
|
||||||
private KerberosPrincipal kerberosPrincipal = new KerberosPrincipal();
|
private KerberosPrincipal kerberosPrincipal = new KerberosPrincipal();
|
||||||
private String version = null;
|
private String version = null;
|
||||||
private String description = null;
|
private String description = null;
|
||||||
|
private String dockerClientConfig = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A unique service name.
|
* A unique service name.
|
||||||
|
@ -370,6 +371,27 @@ public class Service extends BaseResource {
|
||||||
this.kerberosPrincipal = kerberosPrincipal;
|
this.kerberosPrincipal = kerberosPrincipal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@JsonProperty("docker_client_config")
|
||||||
|
@XmlElement(name = "docker_client_config")
|
||||||
|
@SuppressWarnings("checkstyle:hiddenfield")
|
||||||
|
public Service dockerClientConfig(String dockerClientConfig) {
|
||||||
|
this.dockerClientConfig = dockerClientConfig;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Docker client config for the service.
|
||||||
|
* @return dockerClientConfig
|
||||||
|
*/
|
||||||
|
@ApiModelProperty(value = "The Docker client config for the service")
|
||||||
|
public String getDockerClientConfig() {
|
||||||
|
return dockerClientConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDockerClientConfig(String dockerClientConfig) {
|
||||||
|
this.dockerClientConfig = dockerClientConfig;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(java.lang.Object o) {
|
public boolean equals(java.lang.Object o) {
|
||||||
if (this == o) {
|
if (this == o) {
|
||||||
|
@ -414,6 +436,8 @@ public class Service extends BaseResource {
|
||||||
sb.append(" queue: ").append(toIndentedString(queue)).append("\n");
|
sb.append(" queue: ").append(toIndentedString(queue)).append("\n");
|
||||||
sb.append(" kerberosPrincipal: ")
|
sb.append(" kerberosPrincipal: ")
|
||||||
.append(toIndentedString(kerberosPrincipal)).append("\n");
|
.append(toIndentedString(kerberosPrincipal)).append("\n");
|
||||||
|
sb.append(" dockerClientConfig: ")
|
||||||
|
.append(toIndentedString(dockerClientConfig)).append("\n");
|
||||||
sb.append("}");
|
sb.append("}");
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
|
||||||
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
||||||
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
|
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
|
||||||
import org.apache.hadoop.yarn.service.utils.ZookeeperUtils;
|
import org.apache.hadoop.yarn.service.utils.ZookeeperUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.DockerClientConfigHandler;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.apache.hadoop.yarn.util.Times;
|
import org.apache.hadoop.yarn.util.Times;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -710,7 +711,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
||||||
amLaunchContext.setCommands(Collections.singletonList(cmdStr));
|
amLaunchContext.setCommands(Collections.singletonList(cmdStr));
|
||||||
amLaunchContext.setEnvironment(env);
|
amLaunchContext.setEnvironment(env);
|
||||||
amLaunchContext.setLocalResources(localResources);
|
amLaunchContext.setLocalResources(localResources);
|
||||||
addHdfsDelegationTokenIfSecure(amLaunchContext);
|
addCredentials(amLaunchContext, app);
|
||||||
submissionContext.setAMContainerSpec(amLaunchContext);
|
submissionContext.setAMContainerSpec(amLaunchContext);
|
||||||
yarnClient.submitApplication(submissionContext);
|
yarnClient.submitApplication(submissionContext);
|
||||||
return submissionContext.getApplicationId();
|
return submissionContext.getApplicationId();
|
||||||
|
@ -933,28 +934,37 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
||||||
return appDir;
|
return appDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addHdfsDelegationTokenIfSecure(ContainerLaunchContext amContext)
|
private void addCredentials(ContainerLaunchContext amContext, Service app)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
Credentials allCreds = new Credentials();
|
||||||
return;
|
// HDFS DT
|
||||||
}
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
Credentials credentials = new Credentials();
|
|
||||||
String tokenRenewer = YarnClientUtils.getRmPrincipal(getConfig());
|
String tokenRenewer = YarnClientUtils.getRmPrincipal(getConfig());
|
||||||
if (StringUtils.isEmpty(tokenRenewer)) {
|
if (StringUtils.isEmpty(tokenRenewer)) {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"Can't get Master Kerberos principal for the RM to use as renewer");
|
"Can't get Master Kerberos principal for the RM to use as renewer");
|
||||||
}
|
}
|
||||||
// Get hdfs dt
|
|
||||||
final org.apache.hadoop.security.token.Token<?>[] tokens =
|
final org.apache.hadoop.security.token.Token<?>[] tokens =
|
||||||
fs.getFileSystem().addDelegationTokens(tokenRenewer, credentials);
|
fs.getFileSystem().addDelegationTokens(tokenRenewer, allCreds);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
if (tokens != null && tokens.length != 0) {
|
if (tokens != null && tokens.length != 0) {
|
||||||
for (Token<?> token : tokens) {
|
for (Token<?> token : tokens) {
|
||||||
LOG.debug("Got DT: " + token);
|
LOG.debug("Got DT: " + token);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!StringUtils.isEmpty(app.getDockerClientConfig())) {
|
||||||
|
allCreds.addAll(DockerClientConfigHandler.readCredentialsFromConfigFile(
|
||||||
|
new Path(app.getDockerClientConfig()), getConfig(), app.getName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (allCreds.numberOfTokens() > 0) {
|
||||||
DataOutputBuffer dob = new DataOutputBuffer();
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
credentials.writeTokenStorageToStream(dob);
|
allCreds.writeTokenStorageToStream(dob);
|
||||||
ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||||
amContext.setTokens(fsTokens);
|
amContext.setTokens(tokens);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -116,6 +116,13 @@ public class ServiceApiUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Validate the Docker client config.
|
||||||
|
try {
|
||||||
|
validateDockerClientConfiguration(service, conf);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalArgumentException(e);
|
||||||
|
}
|
||||||
|
|
||||||
// Validate there are no component name collisions (collisions are not
|
// Validate there are no component name collisions (collisions are not
|
||||||
// currently supported) and add any components from external services
|
// currently supported) and add any components from external services
|
||||||
Configuration globalConf = service.getConfiguration();
|
Configuration globalConf = service.getConfiguration();
|
||||||
|
@ -214,6 +221,20 @@ public class ServiceApiUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void validateDockerClientConfiguration(Service service,
|
||||||
|
org.apache.hadoop.conf.Configuration conf) throws IOException {
|
||||||
|
String dockerClientConfig = service.getDockerClientConfig();
|
||||||
|
if (!StringUtils.isEmpty(dockerClientConfig)) {
|
||||||
|
Path dockerClientConfigPath = new Path(dockerClientConfig);
|
||||||
|
FileSystem fs = dockerClientConfigPath.getFileSystem(conf);
|
||||||
|
if (!fs.exists(dockerClientConfigPath)) {
|
||||||
|
throw new IOException(
|
||||||
|
"The supplied Docker client config does not exist: "
|
||||||
|
+ dockerClientConfig);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void validateComponent(Component comp, FileSystem fs,
|
private static void validateComponent(Component comp, FileSystem fs,
|
||||||
org.apache.hadoop.conf.Configuration conf)
|
org.apache.hadoop.conf.Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -21,11 +21,13 @@ package org.apache.hadoop.yarn.service;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.registry.client.api.RegistryOperations;
|
import org.apache.hadoop.registry.client.api.RegistryOperations;
|
||||||
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
|
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
|
||||||
import org.apache.hadoop.registry.client.types.ServiceRecord;
|
import org.apache.hadoop.registry.client.types.ServiceRecord;
|
||||||
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
|
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
|
||||||
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
|
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
|
@ -60,6 +62,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -96,11 +99,19 @@ public class MockServiceAM extends ServiceMaster {
|
||||||
private Map<ContainerId, ContainerStatus> containerStatuses =
|
private Map<ContainerId, ContainerStatus> containerStatuses =
|
||||||
new ConcurrentHashMap<>();
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private Credentials amCreds;
|
||||||
|
|
||||||
public MockServiceAM(Service service) {
|
public MockServiceAM(Service service) {
|
||||||
super(service.getName());
|
super(service.getName());
|
||||||
this.service = service;
|
this.service = service;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MockServiceAM(Service service, Credentials amCreds) {
|
||||||
|
super(service.getName());
|
||||||
|
this.service = service;
|
||||||
|
this.amCreds = amCreds;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ContainerId getAMContainerId()
|
protected ContainerId getAMContainerId()
|
||||||
throws BadClusterStateException {
|
throws BadClusterStateException {
|
||||||
|
@ -385,4 +396,18 @@ public class MockServiceAM extends ServiceMaster {
|
||||||
containerStatuses.put(container.getId(), status);
|
containerStatuses.put(container.getId(), status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ByteBuffer recordTokensForContainers()
|
||||||
|
throws IOException {
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
if (amCreds == null) {
|
||||||
|
return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
amCreds.writeTokenStorageToStream(dob);
|
||||||
|
} finally {
|
||||||
|
dob.close();
|
||||||
|
}
|
||||||
|
return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,10 @@ package org.apache.hadoop.yarn.service;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.curator.test.TestingCluster;
|
import org.apache.curator.test.TestingCluster;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -29,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||||
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.security.DockerCredentialTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||||
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
|
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||||
|
@ -36,6 +41,7 @@ import org.apache.hadoop.yarn.service.component.ComponentState;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
|
||||||
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
|
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
|
||||||
|
import org.apache.hadoop.yarn.util.DockerClientConfigHandler;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -44,14 +50,18 @@ import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.BufferedWriter;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
|
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class TestServiceAM extends ServiceTestUtils{
|
public class TestServiceAM extends ServiceTestUtils{
|
||||||
|
|
||||||
|
@ -294,4 +304,44 @@ public class TestServiceAM extends ServiceTestUtils{
|
||||||
|
|
||||||
am.stop();
|
am.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecordTokensForContainers() throws Exception {
|
||||||
|
ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
|
||||||
|
Service exampleApp = new Service();
|
||||||
|
exampleApp.setId(applicationId.toString());
|
||||||
|
exampleApp.setName("testContainerCompleted");
|
||||||
|
exampleApp.addComponent(createComponent("compa", 1, "pwd"));
|
||||||
|
|
||||||
|
String json = "{\"auths\": "
|
||||||
|
+ "{\"https://index.docker.io/v1/\": "
|
||||||
|
+ "{\"auth\": \"foobarbaz\"},"
|
||||||
|
+ "\"registry.example.com\": "
|
||||||
|
+ "{\"auth\": \"bazbarfoo\"}}}";
|
||||||
|
File dockerTmpDir = new File("target", "docker-tmp");
|
||||||
|
FileUtils.deleteQuietly(dockerTmpDir);
|
||||||
|
dockerTmpDir.mkdirs();
|
||||||
|
String dockerConfig = dockerTmpDir + "/config.json";
|
||||||
|
BufferedWriter bw = new BufferedWriter(new FileWriter(dockerConfig));
|
||||||
|
bw.write(json);
|
||||||
|
bw.close();
|
||||||
|
Credentials dockerCred =
|
||||||
|
DockerClientConfigHandler.readCredentialsFromConfigFile(
|
||||||
|
new Path(dockerConfig), conf, applicationId.toString());
|
||||||
|
|
||||||
|
|
||||||
|
MockServiceAM am = new MockServiceAM(exampleApp, dockerCred);
|
||||||
|
ByteBuffer amCredBuffer = am.recordTokensForContainers();
|
||||||
|
Credentials amCreds =
|
||||||
|
DockerClientConfigHandler.getCredentialsFromTokensByteBuffer(
|
||||||
|
amCredBuffer);
|
||||||
|
|
||||||
|
assertEquals(2, amCreds.numberOfTokens());
|
||||||
|
for (Token<? extends TokenIdentifier> tk : amCreds.getAllTokens()) {
|
||||||
|
Assert.assertTrue(
|
||||||
|
tk.getKind().equals(DockerCredentialTokenIdentifier.KIND));
|
||||||
|
}
|
||||||
|
|
||||||
|
am.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,7 +119,8 @@ public final class DockerClientConfigHandler {
|
||||||
credentials.addToken(
|
credentials.addToken(
|
||||||
new Text(registryUrl + "-" + applicationId), token);
|
new Text(registryUrl + "-" + applicationId), token);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Added token: " + token.toString());
|
LOG.debug("Token read from Docker client configuration file: "
|
||||||
|
+ token.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -142,7 +143,7 @@ public final class DockerClientConfigHandler {
|
||||||
tokens.rewind();
|
tokens.rewind();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
for (Token token : credentials.getAllTokens()) {
|
for (Token token : credentials.getAllTokens()) {
|
||||||
LOG.debug("Added token: " + token.toString());
|
LOG.debug("Token read from token storage: " + token.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return credentials;
|
return credentials;
|
||||||
|
@ -161,9 +162,11 @@ public final class DockerClientConfigHandler {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
ObjectNode rootNode = mapper.createObjectNode();
|
ObjectNode rootNode = mapper.createObjectNode();
|
||||||
ObjectNode registryUrlNode = mapper.createObjectNode();
|
ObjectNode registryUrlNode = mapper.createObjectNode();
|
||||||
|
boolean foundDockerCred = false;
|
||||||
if (credentials.numberOfTokens() > 0) {
|
if (credentials.numberOfTokens() > 0) {
|
||||||
for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
|
for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
|
||||||
if (tk.getKind().equals(DockerCredentialTokenIdentifier.KIND)) {
|
if (tk.getKind().equals(DockerCredentialTokenIdentifier.KIND)) {
|
||||||
|
foundDockerCred = true;
|
||||||
DockerCredentialTokenIdentifier ti =
|
DockerCredentialTokenIdentifier ti =
|
||||||
(DockerCredentialTokenIdentifier) tk.decodeIdentifier();
|
(DockerCredentialTokenIdentifier) tk.decodeIdentifier();
|
||||||
ObjectNode registryCredNode = mapper.createObjectNode();
|
ObjectNode registryCredNode = mapper.createObjectNode();
|
||||||
|
@ -176,9 +179,11 @@ public final class DockerClientConfigHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (foundDockerCred) {
|
||||||
rootNode.put(CONFIG_AUTHS_KEY, registryUrlNode);
|
rootNode.put(CONFIG_AUTHS_KEY, registryUrlNode);
|
||||||
String json =
|
String json =
|
||||||
mapper.writerWithDefaultPrettyPrinter().writeValueAsString(rootNode);
|
mapper.writerWithDefaultPrettyPrinter().writeValueAsString(rootNode);
|
||||||
FileUtils.writeStringToFile(outConfigFile, json, StandardCharsets.UTF_8);
|
FileUtils.writeStringToFile(outConfigFile, json, StandardCharsets.UTF_8);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
|
@ -403,6 +403,7 @@ a service resource has the following attributes.
|
||||||
|quicklinks|A blob of key-value pairs of quicklinks to be exported for a service.|false|object||
|
|quicklinks|A blob of key-value pairs of quicklinks to be exported for a service.|false|object||
|
||||||
|queue|The YARN queue that this service should be submitted to.|false|string||
|
|queue|The YARN queue that this service should be submitted to.|false|string||
|
||||||
|kerberos_principal | The principal info of the user who launches the service|false|KerberosPrincipal||
|
|kerberos_principal | The principal info of the user who launches the service|false|KerberosPrincipal||
|
||||||
|
|docker_client_config|URI of the file containing the docker client configuration (e.g. hdfs:///tmp/config.json)|false|string||
|
||||||
|
|
||||||
### ServiceState
|
### ServiceState
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue