YARN-8569. Create an interface to provide cluster information to application. Contributed by Eric Yang

This commit is contained in:
Billie Rinaldi 2018-10-25 09:55:05 -07:00
parent 4f10d7e23f
commit d07e873b7d
23 changed files with 745 additions and 11 deletions

View File

@ -273,7 +273,14 @@ public interface ApplicationConstants {
* Final, Docker run support ENTRY_POINT.
*/
YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE(
"YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE");
"YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE"),
/**
* $YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE
* Final, expose cluster information to container.
*/
YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE(
"YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE");
private final String variable;
private Environment(String variable) {

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
import org.apache.hadoop.yarn.service.monitor.ServiceMonitor;
@ -302,6 +303,12 @@ public class ServiceMaster extends CompositeService {
LOG.info("Service state changed from {} -> {}", curState,
scheduler.getApp().getState());
}
populateYarnSysFS(scheduler);
}
private static void populateYarnSysFS(ServiceScheduler scheduler) {
Service service = scheduler.getApp();
scheduler.syncSysFs(service);
}
private void printSystemEnv() {

View File

@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.service;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource.Builder;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@ -38,6 +41,7 @@ import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -77,6 +81,7 @@ import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink;
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
import org.apache.hadoop.yarn.service.utils.HttpUtil;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
@ -90,6 +95,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.Collection;
@ -1027,4 +1033,65 @@ public class ServiceScheduler extends CompositeService {
public ServiceUtils.ProcessTerminationHandler getTerminationHandler() {
return terminationHandler;
}
public void syncSysFs(Service yarnApp) {
boolean success = true;
Configuration conf = getConfig();
String spec;
boolean useKerberos = UserGroupInformation.isSecurityEnabled();
boolean printSyncResult = false;
try {
String port = conf.get("yarn.nodemanager.webapp.address").split(":")[1];
spec = ServiceApiUtil.jsonSerDeser.toJson(yarnApp);
for (org.apache.hadoop.yarn.service.api.records.Component c :
yarnApp.getComponents()) {
Set<String> nodes = new HashSet<String>();
boolean update = Boolean.parseBoolean(c.getConfiguration()
.getEnv(ApplicationConstants.Environment
.YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE.name()));
if (!update) {
continue;
}
printSyncResult = true;
for (org.apache.hadoop.yarn.service.api.records.Container container :
c.getContainers()) {
String bareHost = container.getBareHost();
nodes.add(bareHost);
}
for (String bareHost : nodes) {
StringBuilder requestPath = new StringBuilder();
if (YarnConfiguration.useHttps(conf)) {
requestPath.append("https://");
} else {
requestPath.append("http://");
}
requestPath.append(bareHost);
requestPath.append(":");
requestPath.append(port);
requestPath.append("/ws/v1/node/yarn/sysfs/");
requestPath.append(UserGroupInformation.getCurrentUser()
.getShortUserName());
requestPath.append("/");
requestPath.append(yarnApp.getId());
if (!useKerberos) {
requestPath.append("?user.name=");
requestPath.append(UserGroupInformation.getCurrentUser()
.getShortUserName());
}
Builder builder = HttpUtil.connect(requestPath.toString());
ClientResponse response = builder.put(ClientResponse.class, spec);
if (response.getStatus()!=ClientResponse.Status.OK.getStatusCode()) {
LOG.warn("Error synchronize YARN sysfs: " +
response.getEntity(String.class));
success = false;
}
}
}
if (printSyncResult && success) {
LOG.info("YARN sysfs synchronized.");
}
} catch (IOException | URISyntaxException | InterruptedException e) {
LOG.error("Fail to sync service spec: {}", e);
}
}
}

View File

@ -20,10 +20,14 @@ package org.apache.hadoop.yarn.service.client;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.shaded.com.google.common.io.Files;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -73,6 +77,8 @@ import org.apache.hadoop.yarn.service.api.records.ComponentContainers;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.api.records.ConfigFile.TypeEnum;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
@ -97,12 +103,18 @@ import org.apache.hadoop.yarn.util.Times;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@ -929,6 +941,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
addJarResource(serviceName, localResources);
// add keytab if in secure env
addKeytabResourceIfSecure(fs, localResources, app);
// add yarn sysfs to localResources
addYarnSysFs(appRootDir, localResources, app);
if (LOG.isDebugEnabled()) {
printLocalResources(localResources);
}
@ -938,8 +952,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
String cmdStr = buildCommandLine(app, conf, appRootDir, hasAMLog4j);
submissionContext.setResource(Resource.newInstance(YarnServiceConf
.getLong(YarnServiceConf.AM_RESOURCE_MEM,
YarnServiceConf.DEFAULT_KEY_AM_RESOURCE_MEM, app.getConfiguration(),
conf), 1));
YarnServiceConf.DEFAULT_KEY_AM_RESOURCE_MEM,
app.getConfiguration(), conf), 1));
String queue = app.getQueue();
if (StringUtils.isEmpty(queue)) {
queue = conf.get(YARN_QUEUE, DEFAULT_YARN_QUEUE);
@ -963,6 +977,128 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return submissionContext.getApplicationId();
}
/**
* Compress (tar) the input files to the output file.
*
* @param files The files to compress
* @param output The resulting output file (should end in .tar.gz)
* @param bundleRoot
* @throws IOException
*/
public static File compressFiles(Collection<File> files, File output,
String bundleRoot) throws IOException {
try (FileOutputStream fos = new FileOutputStream(output);
TarArchiveOutputStream taos = new TarArchiveOutputStream(
new BufferedOutputStream(fos))) {
taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
for (File f : files) {
addFilesToCompression(taos, f, "sysfs", bundleRoot);
}
}
return output;
}
/**
* Compile file list for compression and going recursive for
* nested directories.
*
* @param taos The archive
* @param file The file to add to the archive
* @param dir The directory that should serve as
* the parent directory in the archive
* @throws IOException
*/
private static void addFilesToCompression(TarArchiveOutputStream taos,
File file, String dir, String bundleRoot) throws IOException {
if (!file.isHidden()) {
// Create an entry for the file
if (!dir.equals(".")) {
if (File.separator.equals("\\")) {
dir = dir.replaceAll("\\\\", "/");
}
}
taos.putArchiveEntry(
new TarArchiveEntry(file, dir + "/" + file.getName()));
if (file.isFile()) {
// Add the file to the archive
try (FileInputStream input = new FileInputStream(file)) {
IOUtils.copy(input, taos);
taos.closeArchiveEntry();
}
} else if (file.isDirectory()) {
// close the archive entry
if (!dir.equals(".")) {
taos.closeArchiveEntry();
}
// go through all the files in the directory and using recursion, add
// them to the archive
File[] allFiles = file.listFiles();
if (allFiles != null) {
for (File childFile : allFiles) {
addFilesToCompression(taos, childFile,
file.getPath().substring(bundleRoot.length()), bundleRoot);
}
}
}
}
}
private void addYarnSysFs(Path path,
Map<String, LocalResource> localResources, Service app)
throws IOException {
List<Component> componentsWithYarnSysFS = new ArrayList<Component>();
for(Component c : app.getComponents()) {
boolean enabled = Boolean.parseBoolean(c.getConfiguration()
.getEnv(ApplicationConstants.Environment
.YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE.name()));
if (enabled) {
componentsWithYarnSysFS.add(c);
}
}
if(componentsWithYarnSysFS.size() == 0) {
return;
}
String buffer = ServiceApiUtil.jsonSerDeser.toJson(app);
File tmpDir = Files.createTempDir();
if (tmpDir.exists()) {
String serviceJsonPath = tmpDir.getAbsolutePath() + "/app.json";
File localFile = new File(serviceJsonPath);
if (localFile.createNewFile()) {
try (Writer writer = new OutputStreamWriter(
new FileOutputStream(localFile), StandardCharsets.UTF_8)) {
writer.write(buffer);
}
} else {
throw new IOException("Fail to write app.json to temp directory");
}
File destinationFile = new File(tmpDir.getAbsolutePath() + "/sysfs.tar");
if (!destinationFile.createNewFile()) {
throw new IOException("Fail to localize sysfs.tar.");
}
List<File> files = new ArrayList<File>();
files.add(localFile);
compressFiles(files, destinationFile, "sysfs");
LocalResource localResource =
fs.submitFile(destinationFile, path, ".", "sysfs.tar");
Path serviceJson = new Path(path, "sysfs.tar");
for (Component c : componentsWithYarnSysFS) {
ConfigFile e = new ConfigFile();
e.type(TypeEnum.ARCHIVE);
e.srcFile(serviceJson.toString());
e.destFile("/hadoop/yarn");
if (!c.getConfiguration().getFiles().contains(e)) {
c.getConfiguration().getFiles().add(e);
}
}
localResources.put("sysfs", localResource);
if (!tmpDir.delete()) {
LOG.warn("Failed to delete temp file: " + tmpDir.getAbsolutePath());
}
} else {
throw new IOException("Fail to localize sysfs resource.");
}
}
private void setLogAggregationContext(Service app, Configuration conf,
ApplicationSubmissionContext submissionContext) {
LogAggregationContext context = Records.newRecord(LogAggregationContext
@ -1565,4 +1701,5 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
this.principalName = principalName;
}
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.utils;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.GSSName;
import org.ietf.jgss.Oid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.WebResource.Builder;
/**
* Http connection utilities.
*
*/
public class HttpUtil {
private static final Logger LOG =
LoggerFactory.getLogger(HttpUtil.class);
private static final Base64 BASE_64_CODEC = new Base64(0);
protected HttpUtil() {
// prevents calls from subclass
throw new UnsupportedOperationException();
}
/**
* Generate SPNEGO challenge request token.
*
* @param server - hostname to contact
* @throws IOException
* @throws InterruptedException
*/
public static String generateToken(String server) throws
IOException, InterruptedException {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
LOG.debug("The user credential is {}", currentUser);
String challenge = currentUser
.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
try {
// This Oid for Kerberos GSS-API mechanism.
Oid mechOid = KerberosUtil.getOidInstance("GSS_KRB5_MECH_OID");
GSSManager manager = GSSManager.getInstance();
// GSS name for server
GSSName serverName = manager.createName("HTTP@" + server,
GSSName.NT_HOSTBASED_SERVICE);
// Create a GSSContext for authentication with the service.
// We're passing client credentials as null since we want them to
// be read from the Subject.
GSSContext gssContext = manager.createContext(
serverName.canonicalize(mechOid), mechOid, null,
GSSContext.DEFAULT_LIFETIME);
gssContext.requestMutualAuth(true);
gssContext.requestCredDeleg(true);
// Establish context
byte[] inToken = new byte[0];
byte[] outToken = gssContext.initSecContext(inToken, 0,
inToken.length);
gssContext.dispose();
// Base64 encoded and stringified token for server
LOG.debug("Got valid challenge for host {}", serverName);
return new String(BASE_64_CODEC.encode(outToken),
StandardCharsets.US_ASCII);
} catch (GSSException | IllegalAccessException
| NoSuchFieldException | ClassNotFoundException e) {
LOG.error("Error: {}", e);
throw new AuthenticationException(e);
}
}
});
return challenge;
}
public static Builder connect(String url) throws URISyntaxException,
IOException, InterruptedException {
boolean useKerberos = UserGroupInformation.isSecurityEnabled();
URI resource = new URI(url);
Client client = Client.create();
Builder builder = client
.resource(url).type(MediaType.APPLICATION_JSON);
if (useKerberos) {
String challenge = generateToken(resource.getHost());
builder.header(HttpHeaders.AUTHORIZATION, "Negotiate " +
challenge);
LOG.debug("Authorization: Negotiate {}", challenge);
}
return builder;
}
}

View File

@ -95,8 +95,6 @@ public class ServiceApiUtil {
private static final PatternValidator userNamePattern
= new PatternValidator("[a-z][a-z0-9-.]*");
@VisibleForTesting
public static void setJsonSerDeser(JsonSerDeser jsd) {
jsonSerDeser = jsd;

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.service;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
@ -426,4 +425,35 @@ public class TestServiceAM extends ServiceTestUtils{
am.getComponent("compa").getPendingInstances().size());
am.stop();
}
@Test(timeout = 30000)
public void testSyncSysFS() {
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
Service exampleApp = new Service();
exampleApp.setId(applicationId.toString());
exampleApp.setVersion("v1");
exampleApp.setName("tensorflow");
Component compA = createComponent("compa", 1, "pwd");
compA.getConfiguration().getEnv().put(
"YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE", "true");
Artifact artifact = new Artifact();
artifact.setType(Artifact.TypeEnum.TARBALL);
compA.artifact(artifact);
exampleApp.addComponent(compA);
try {
MockServiceAM am = new MockServiceAM(exampleApp);
am.init(conf);
am.start();
ServiceScheduler scheduler = am.context.scheduler;
scheduler.syncSysFs(exampleApp);
scheduler.close();
am.stop();
am.close();
} catch (Exception e) {
LOG.error("Fail to sync sysfs: {}", e);
Assert.fail("Fail to sync sysfs.");
}
}
}

View File

@ -258,6 +258,18 @@ public abstract class ContainerExecutor implements Configurable {
public abstract boolean isContainerAlive(ContainerLivenessContext ctx)
throws IOException;
/**
* Update cluster information inside container.
*
* @param ctx ContainerRuntimeContext
* @param user Owner of application
* @param appId YARN application ID
* @param spec Service Specification
* @throws IOException if there is a failure while writing spec to disk
*/
public abstract void updateYarnSysFS(Context ctx, String user,
String appId, String spec) throws IOException;
/**
* Recover an already existing container. This is a blocking call and returns
* only when the container exits. Note that the container must have been

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.CommandExecutor;
import org.apache.hadoop.util.Shell.ExitCodeException;
@ -1038,4 +1039,10 @@ public class DefaultContainerExecutor extends ContainerExecutor {
}
return paths;
}
@Override
public void updateYarnSysFS(Context ctx, String user,
String appId, String spec) throws IOException {
throw new ServiceStateException("Implementation unavailable");
}
}

View File

@ -67,6 +67,7 @@ import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
@ -996,4 +997,46 @@ public class LinuxContainerExecutor extends ContainerExecutor {
"containerId: {}. Exception: ", containerId, e);
}
}
@Override
public synchronized void updateYarnSysFS(Context ctx, String user,
String appId, String spec) throws IOException {
LocalDirsHandlerService dirsHandler = nmContext.getLocalDirsHandler();
Path sysFSPath = dirsHandler.getLocalPathForWrite(
"nmPrivate/" + appId + "/sysfs/app.json");
File file = new File(sysFSPath.toString());
List<String> localDirs = dirsHandler.getLocalDirs();
if (file.exists()) {
if (!file.delete()) {
LOG.warn("Unable to delete " + sysFSPath.toString());
}
}
if (file.createNewFile()) {
FileOutputStream output = new FileOutputStream(file);
try {
output.write(spec.getBytes("UTF-8"));
} finally {
output.close();
}
}
PrivilegedOperation privOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.SYNC_YARN_SYSFS);
String runAsUser = getRunAsUser(user);
privOp.appendArgs(runAsUser,
user,
Integer.toString(PrivilegedOperation.RunAsUserCommand
.SYNC_YARN_SYSFS.getValue()),
appId, StringUtils.join(PrivilegedOperation
.LINUX_FILE_PATH_SEPARATOR, localDirs));
privOp.disableFailureLogging();
PrivilegedOperationExecutor privilegedOperationExecutor =
PrivilegedOperationExecutor.getInstance(nmContext.getConf());
try {
privilegedOperationExecutor.executePrivilegedOperation(null,
privOp, null, null, false, false);
} catch (PrivilegedOperationException e) {
throw new IOException(e);
}
}
}

View File

@ -56,7 +56,8 @@ public class PrivilegedOperation {
LIST_AS_USER(""), // no CLI switch supported yet.
ADD_NUMA_PARAMS(""), // no CLI switch supported yet.
REMOVE_DOCKER_CONTAINER("--remove-docker-container"),
INSPECT_DOCKER_CONTAINER("--inspect-docker-container");
INSPECT_DOCKER_CONTAINER("--inspect-docker-container"),
SYNC_YARN_SYSFS("");
private final String option;
@ -153,7 +154,8 @@ public class PrivilegedOperation {
SIGNAL_CONTAINER(2),
DELETE_AS_USER(3),
LAUNCH_DOCKER_CONTAINER(4),
LIST_AS_USER(5);
LIST_AS_USER(5),
SYNC_YARN_SYSFS(6);
private int value;
RunAsUserCommand(int value) {

View File

@ -179,6 +179,12 @@ import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.r
* This feature is disabled by default. When this feature is disabled or set
* to false, the container will be removed as soon as it exits.
* </li>
* <li>
* {@code YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE} allows export yarn
* service json to docker container. This feature is disabled by default.
* when this feature is set, app.json will be available in
* /hadoop/yarn/sysfs/app.json.
* </li>
* </ul>
*/
@InterfaceAudience.Private
@ -231,6 +237,11 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
@InterfaceAudience.Private
public static final String ENV_DOCKER_CONTAINER_DELAYED_REMOVAL =
"YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL";
@InterfaceAudience.Private
public static final String ENV_DOCKER_CONTAINER_YARN_SYSFS =
"YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE";
public static final String YARN_SYSFS_PATH =
"/hadoop/yarn/sysfs";
private Configuration conf;
private Context nmContext;
private DockerClient dockerClient;
@ -964,6 +975,12 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand);
if(environment.containsKey(ENV_DOCKER_CONTAINER_YARN_SYSFS) &&
Boolean.parseBoolean(environment
.get(ENV_DOCKER_CONTAINER_YARN_SYSFS))) {
runCommand.setYarnSysFS(true);
}
if (useEntryPoint) {
runCommand.setOverrideDisabled(true);
runCommand.addEnv(environment);
@ -1438,4 +1455,5 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
}
}
}
}

View File

@ -217,4 +217,10 @@ public class DockerRunCommand extends DockerCommand {
public final void addEnv(Map<String, String> environment) {
userEnv.putAll(environment);
}
public DockerRunCommand setYarnSysFS(boolean toggle) {
String value = Boolean.toString(toggle);
super.addCommandArguments("use-yarn-sysfs", value);
return this;
}
}

View File

@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
@ -56,6 +57,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -554,6 +556,31 @@ public class NMWebServices {
return new NMResourceInfo();
}
@PUT
@Path("/yarn/sysfs/{user}/{appId}")
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
public Response syncYarnSysFS(@javax.ws.rs.core.Context
HttpServletRequest req,
@PathParam("user") String user,
@PathParam("appId") String appId,
String spec) {
if (UserGroupInformation.isSecurityEnabled()) {
if (!req.getRemoteUser().equals(user)) {
return Response.status(Status.FORBIDDEN).build();
}
}
try {
nmContext.getContainerExecutor().updateYarnSysFS(nmContext, user, appId,
spec);
} catch (IOException | ServiceStateException e) {
LOG.error("Fail to sync yarn sysfs for application ID: {}, reason: ",
appId, e);
return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e).build();
}
return Response.ok().build();
}
private long parseLongParam(String bytes) {
if (bytes == null || bytes.isEmpty()) {
return Long.MAX_VALUE;

View File

@ -76,6 +76,7 @@ static const char* DEFAULT_BANNED_USERS[] = {"yarn", "mapred", "hdfs", "bin", 0}
static const int DEFAULT_DOCKER_SUPPORT_ENABLED = 0;
static const int DEFAULT_TC_SUPPORT_ENABLED = 0;
static const int DEFAULT_MOUNT_CGROUP_SUPPORT_ENABLED = 0;
static const int DEFAULT_YARN_SYSFS_SUPPORT_ENABLED = 0;
static const char* PROC_PATH = "/proc";
@ -506,6 +507,11 @@ int is_mount_cgroups_support_enabled() {
&executor_cfg);
}
int is_yarn_sysfs_support_enabled() {
return is_feature_enabled(YARN_SYSFS_SUPPORT_ENABLED_KEY,
DEFAULT_YARN_SYSFS_SUPPORT_ENABLED, &executor_cfg);
}
/**
* Utility function to concatenate argB to argA using the concat_pattern.
*/
@ -1778,6 +1784,27 @@ int create_user_filecache_dirs(const char * user, char* const* local_dirs) {
return rc;
}
int create_yarn_sysfs(const char* user, const char *app_id,
const char *container_id, const char *work_dir, char* const* local_dirs) {
int result = OUT_OF_MEMORY;
const mode_t perms = S_IRWXU | S_IXGRP;
char* const* local_dir_ptr;
for(local_dir_ptr = local_dirs; *local_dir_ptr != NULL; ++local_dir_ptr) {
char *container_dir = get_container_work_directory(*local_dir_ptr, user, app_id,
container_id);
if (container_dir == NULL) {
return OUT_OF_MEMORY;
}
char *yarn_sysfs_dir = make_string("%s/%s", container_dir, "sysfs");
if (mkdir(yarn_sysfs_dir, perms) == 0) {
result = 0;
}
free(yarn_sysfs_dir);
free(container_dir);
}
return result;
}
int launch_docker_container_as_user(const char * user, const char *app_id,
const char *container_id, const char *work_dir,
const char *script_name, const char *cred_file,
@ -1834,6 +1861,14 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
goto cleanup;
}
exit_code = create_yarn_sysfs(user, app_id, container_id, work_dir, local_dirs);
if (exit_code != 0) {
fprintf(ERRORFILE, "Could not create user yarn sysfs directory");
fflush(ERRORFILE);
exit(-1);
goto cleanup;
}
docker_command = construct_docker_command(command_file);
docker_binary = get_docker_binary(&CFG);
@ -2799,6 +2834,68 @@ struct configuration* get_cfg() {
return &CFG;
}
char *locate_sysfs_path(const char *src) {
char *result = NULL;
DIR *dir;
struct dirent *entry;
if (!(dir = opendir(src))) {
return NULL;
}
while ((entry = readdir(dir)) != NULL) {
if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) {
continue;
}
char *new_src = make_string("%s/%s", src, entry->d_name);
if (str_ends_with(new_src, "/sysfs.tar/sysfs")) {
result = new_src;
goto cleanup;
}
result = locate_sysfs_path(new_src);
if (result != NULL) {
goto cleanup;
}
}
cleanup:
closedir(dir);
return result;
}
int sync_yarn_sysfs(char* const* local_dir, const char *running_user, const char *end_user, const char *app_id) {
int result = OUT_OF_MEMORY;
char *src = NULL;
char *dest = NULL;
char* const* local_dir_ptr;
for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
char *appcache_dir = make_string("%s/usercache/%s/appcache/%s", *local_dir_ptr, end_user, app_id);
char *sysfs_dir = locate_sysfs_path(appcache_dir);
char *nm_private_app_dir = make_string("%s/nmPrivate/%s/sysfs", *local_dir_ptr, app_id);
if (sysfs_dir == NULL) {
return OUT_OF_MEMORY;
}
src = make_string("%s/%s", nm_private_app_dir, "app.json");
dest = make_string("%s/%s", sysfs_dir, "app.json");
// open up the spec file
int spec_file = open_file_as_nm(src);
if (spec_file == -1) {
continue;
}
delete_path(dest, 0);
if (copy_file(spec_file, src, dest, S_IRWXU | S_IRGRP | S_IXGRP) == 0) {
result = 0;
}
// continue on to create other work directories
free(sysfs_dir);
free(src);
free(dest);
if (result == 0) {
break;
}
}
return result;
}
/**
* Flatten docker launch command
*/

View File

@ -32,7 +32,8 @@ enum command {
SIGNAL_CONTAINER = 2,
DELETE_AS_USER = 3,
LAUNCH_DOCKER_CONTAINER = 4,
LIST_AS_USER = 5
LIST_AS_USER = 5,
SYNC_YARN_SYSFS = 6
};
enum operations {
@ -49,7 +50,8 @@ enum operations {
RUN_DOCKER = 11,
RUN_AS_USER_LIST = 12,
REMOVE_DOCKER_CONTAINER = 13,
INSPECT_DOCKER_CONTAINER = 14
INSPECT_DOCKER_CONTAINER = 14,
RUN_AS_USER_SYNC_YARN_SYSFS = 15
};
#define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group"
@ -67,6 +69,7 @@ enum operations {
#define DOCKER_SUPPORT_ENABLED_KEY "feature.docker.enabled"
#define TC_SUPPORT_ENABLED_KEY "feature.tc.enabled"
#define MOUNT_CGROUP_SUPPORT_ENABLED_KEY "feature.mount-cgroup.enabled"
#define YARN_SYSFS_SUPPORT_ENABLED_KEY "feature.yarn.sysfs.enabled"
#define TMP_DIR "tmp"
extern struct passwd *user_detail;
@ -293,6 +296,21 @@ int run_docker_with_pty(const char *command_file);
*/
int exec_docker_command(char *docker_command, char **argv, int argc);
/** Check if yarn sysfs is enabled in configuration. */
int is_yarn_sysfs_support_enabled();
/**
* Create YARN SysFS
*/
int create_yarn_sysfs(const char* user, const char *app_id,
const char *container_id, const char *work_dir, char* const* local_dirs);
/**
* Sync YARN SysFS
*/
int sync_yarn_sysfs(char* const* local_dirs, const char *running_user,
const char *end_user, const char *app_id);
/*
* Compile the regex_str and determine if the input string matches.
* Return 0 on match, 1 of non-match.

View File

@ -99,11 +99,21 @@ static void display_usage(FILE *stream) {
fprintf(stream, "\n");
}
fprintf(stream,
fprintf(stream,
" signal container: %2d container-pid signal\n"
" delete as user: %2d relative-path\n"
" list as user: %2d relative-path\n",
SIGNAL_CONTAINER, DELETE_AS_USER, LIST_AS_USER);
if(is_yarn_sysfs_support_enabled()) {
fprintf(stream,
" sync yarn sysfs: %2d app-id nm-local-dirs\n",
SYNC_YARN_SYSFS);
} else {
fprintf(stream,
"[DISABLED] sync yarn sysfs: %2d app-id nm-local-dirs\n",
SYNC_YARN_SYSFS);
}
}
/* Sets up log files for normal/error logging */
@ -566,6 +576,11 @@ static int validate_run_as_user_commands(int argc, char **argv, int *operation)
cmd_input.target_dir = argv[optind++];
*operation = RUN_AS_USER_LIST;
return 0;
case SYNC_YARN_SYSFS:
cmd_input.app_id = argv[optind++];
cmd_input.local_dirs = argv[optind++];
*operation = RUN_AS_USER_SYNC_YARN_SYSFS;
return 0;
default:
fprintf(ERRORFILE, "Invalid command %d not supported.",command);
fflush(ERRORFILE);
@ -723,6 +738,19 @@ int main(int argc, char **argv) {
exit_code = list_as_user(cmd_input.target_dir);
break;
case RUN_AS_USER_SYNC_YARN_SYSFS:
exit_code = set_user(cmd_input.run_as_user_name);
if (exit_code != 0) {
break;
}
if (is_yarn_sysfs_support_enabled()) {
exit_code = sync_yarn_sysfs(split(cmd_input.local_dirs),
cmd_input.run_as_user_name, cmd_input.yarn_user_name,
cmd_input.app_id);
} else {
exit_code = FEATURE_DISABLED;
}
break;
}
flush_and_close_log_files();

View File

@ -17,6 +17,9 @@
*/
#include "util.h"
#include <unistd.h>
#include <sys/types.h>
#include <dirent.h>
#include <limits.h>
#include <errno.h>
#include <strings.h>
@ -180,3 +183,9 @@ char *make_string(const char *fmt, ...) {
}
return buf;
}
int str_ends_with(const char *s, const char *suffix) {
size_t slen = strlen(s);
size_t suffix_len = strlen(suffix);
return suffix_len <= slen && !strcmp(s + slen - suffix_len, suffix);
}

View File

@ -38,4 +38,10 @@ int get_numbers_split_by_comma(const char* input, int** numbers, size_t* n_numbe
* String format utility
*/
char *make_string(const char *fmt, ...);
/*
* Compare string end with a suffix.
* return 1 if succeeded
*/
int str_ends_with(const char *s, const char *suffix);
#endif

View File

@ -105,6 +105,7 @@ int write_config_file(char *file_name, int banned) {
fprintf(file, "min.user.id=0\n");
}
fprintf(file, "allowed.system.users=allowedUser,daemon\n");
fprintf(file, "feature.yarn.sysfs.enabled=1\n");
fclose(file);
return 0;
}
@ -524,6 +525,63 @@ void test_is_feature_enabled() {
free_configuration(&exec_cfg);
}
void test_yarn_sysfs() {
char *app_id = "app-1";
char *container_id = "container-1";
// Test create sysfs without container.
int result = create_yarn_sysfs(username, app_id, container_id, "work", local_dirs);
if (result == 0) {
printf("Should not be able to create yarn sysfs without container directories.\n");
exit(1);
}
result = sync_yarn_sysfs(local_dirs, username, username, app_id);
if (result == 0) {
printf("sync_yarn_sysfs failed.\n");
exit(1);
}
// Create container directories and init app.json
char* const* local_dir_ptr;
for (local_dir_ptr = local_dirs; *local_dir_ptr != 0; ++local_dir_ptr) {
char *user_dir = make_string("%s/usercache/%s", *local_dir_ptr, username);
if (mkdirs(user_dir, 0750) != 0) {
printf("Can not make user directories: %s\n", user_dir);
exit(1);
}
free(user_dir);
char *app_dir = make_string("%s/usercache/%s/appcache/%s/%s", *local_dir_ptr, username, app_id);
if (mkdirs(app_dir, 0750) != 0) {
printf("Can not make app directories: %s\n", app_dir);
exit(1);
}
free(app_dir);
// Simulate distributed cache created directory structures.
char *cache_dir = make_string("%s/usercache/%s/appcache/%s/filecache/%s/sysfs.tar/sysfs", *local_dir_ptr, username, app_id, container_id);
if (mkdirs(cache_dir, 0750) != 0) {
printf("Can not make container directories: %s\n", cache_dir);
exit(1);
}
free(cache_dir);
char *nm_dir = make_string("%s/nmPrivate/%s/sysfs", *local_dir_ptr, app_id);
if (mkdirs(nm_dir, 0750) != 0) {
printf("Can not make nmPrivate directories: %s\n", nm_dir);
exit(1);
}
char *sysfs_path = make_string("%s/%s", nm_dir, "app.json");
FILE *file = fopen(sysfs_path, "w");
fprintf(file, "{}\n");
fclose(file);
free(nm_dir);
}
result = sync_yarn_sysfs(local_dirs, username, username, app_id);
if (result != 0) {
printf("sync_yarn_sysfs failed.\n");
exit(1);
}
}
void test_delete_user() {
printf("\nTesting delete_user\n");
char* app_dir = get_app_directory(TEST_ROOT "/local-1", yarn_username, "app_3");
@ -1551,6 +1609,9 @@ int main(int argc, char **argv) {
printf("\nTesting is_feature_enabled()\n");
test_is_feature_enabled();
printf("\nTesting yarn sysfs\n");
test_yarn_sysfs();
test_check_user(0);
test_cleaning_docker_cgroups();

View File

@ -706,6 +706,17 @@ public class TestLinuxContainerExecutor {
verify(lce, times(1)).execContainer(ctx);
}
@Test
public void testUpdateYarnSysFS() throws Exception {
String user = System.getProperty("user.name");
String appId="app-1";
String spec="";
Context ctx = mock(Context.class);
LinuxContainerExecutor lce = mock(LinuxContainerExecutor.class);
lce.updateYarnSysFS(ctx, user, appId, spec);
verify(lce, times(1)).updateYarnSysFS(ctx, user, appId, spec);
}
private static class TestResourceHandler implements LCEResourcesHandler {
static Set<ContainerId> postExecContainers = new HashSet<ContainerId>();

View File

@ -128,6 +128,10 @@ public class TestContainersMonitorResourceChange {
throws IOException {
return true;
}
@Override
public void updateYarnSysFS(Context ctx, String user, String appId,
String spec) throws IOException {
}
}
private static class MockContainerEventHandler implements

View File

@ -287,6 +287,7 @@ The following properties are optional:
| `banned.users` | A comma-separated list of usernames who should not be allowed to launch applications. The default setting is: yarn, mapred, hdfs, and bin. |
| `allowed.system.users` | A comma-separated list of usernames who should be allowed to launch applications even if their UIDs are below the configured minimum. If a user appears in allowed.system.users and banned.users, the user will be considered banned. |
| `feature.tc.enabled` | Must be "true" or "false". "false" means traffic control commands are disabled. "true" means traffic control commands are allowed. |
| `feature.yarn.sysfs.enabled` | Must be "true" or "false". See YARN sysfs support for detail. The default setting is disabled. |
Part of a container-executor.cfg which allows Docker containers to be launched is below:
@ -369,6 +370,7 @@ environment variables in the application's environment:
| `YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS` | Adds additional volume mounts to the Docker container. The value of the environment variable should be a comma-separated list of mounts. All such mounts must be given as `source:dest[:mode]` and the mode must be "ro" (read-only) or "rw" (read-write) to specify the type of access being requested. If neither is specified, read-write will be assumed. The mode may include a bind propagation option. In that case, the mode should either be of the form `[option]`, `rw+[option]`, or `ro+[option]`. Valid bind propagation options are shared, rshared, slave, rslave, private, and rprivate. The requested mounts will be validated by container-executor based on the values set in container-executor.cfg for `docker.allowed.ro-mounts` and `docker.allowed.rw-mounts`. |
| `YARN_CONTAINER_RUNTIME_DOCKER_TMPFS_MOUNTS` | Adds additional tmpfs mounts to the Docker container. The value of the environment variable should be a comma-separated list of absolute mount points within the container. |
| `YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL` | Allows a user to request delayed deletion of the Docker container on a per container basis. If true, Docker containers will not be removed until the duration defined by yarn.nodemanager.delete.debug-delay-sec has elapsed. Administrators can disable this feature through the yarn-site property yarn.nodemanager.runtime.linux.docker.delayed-removal.allowed. This feature is disabled by default. When this feature is disabled or set to false, the container will be removed as soon as it exits. |
| `YARN_CONTAINER_RUNTIME_YARN_SYSFS_ENABLE` | Enable mounting of container working directory sysfs sub-directory into Docker container /hadoop/yarn/sysfs. This is useful for populating cluster information into container. |
The first two are required. The remainder can be set as needed. While
controlling the container type through environment variables is somewhat less
@ -767,3 +769,17 @@ In yarn-env.sh, define:
```
export YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE=true
```
Docker Container YARN SysFS Support
-----------------------------------
YARN SysFS is a pseudo file system provided by the YARN framework that
exports information about clustering information to Docker container.
Cluster information is exported to /hadoop/yarn/sysfs path. This
API allows application developer to obtain clustering information
without external service dependencies. Custom application master can
populate cluster information by calling node manager REST API.
YARN service framework automatically populates cluster information
to /hadoop/yarn/sysfs/app.json. For more information about
YARN service, see: [YARN Service](./yarn-service/Overview.html).