diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java index b4c5e4c3df5..69189f40389 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/Cli.java @@ -16,7 +16,6 @@ package org.apache.hadoop.yarn.submarine.client.cli; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.submarine.common.ClientContext; -import org.apache.hadoop.yarn.submarine.common.fs.DefaultRemoteDirectoryManager; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory; import org.slf4j.Logger; @@ -43,8 +42,6 @@ public class Cli { Configuration conf = new YarnConfiguration(); ClientContext clientContext = new ClientContext(); clientContext.setConfiguration(conf); - clientContext.setRemoteDirectoryManager( - new DefaultRemoteDirectoryManager(clientContext)); RuntimeFactory runtimeFactory = RuntimeFactory.getRuntimeFactory( clientContext); clientContext.setRuntimeFactory(runtimeFactory); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java index 454ff1cc1be..2d7a4724a4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java @@ -52,4 +52,7 @@ public class CliConstants { public static final String QUICKLINK = "quicklink"; public static final String TENSORBOARD_DOCKER_IMAGE = "tensorboard_docker_image"; + public static final String KEYTAB = "keytab"; + public static final String PRINCIPAL = "principal"; + public static final String DISTRIBUTE_KEYTAB = "distribute_keytab"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java index bfdfa9a59af..194e8dfb28e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java @@ -14,22 +14,33 @@ package org.apache.hadoop.yarn.submarine.client.cli; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters; +import org.apache.hadoop.yarn.submarine.common.exception.SubmarineRuntimeException; import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager; import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.hadoop.yarn.submarine.client.cli.CliConstants.KEYTAB; +import static org.apache.hadoop.yarn.submarine.client.cli.CliConstants.PRINCIPAL; + public class CliUtils { + private static final Logger LOG = + LoggerFactory.getLogger(CliUtils.class); private final static String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$"; /** * Replace patterns inside cli @@ -163,4 +174,49 @@ public class CliUtils { return false; } + + public static void doLoginIfSecure(String keytab, String principal) throws + IOException { + if (!UserGroupInformation.isSecurityEnabled()) { + return; + } + + if (StringUtils.isEmpty(keytab) || StringUtils.isEmpty(principal)) { + if (StringUtils.isNotEmpty(keytab)) { + SubmarineRuntimeException e = new SubmarineRuntimeException("The " + + "parameter of " + PRINCIPAL + " is missing."); + LOG.error(e.getMessage(), e); + throw e; + } + + if (StringUtils.isNotEmpty(principal)) { + SubmarineRuntimeException e = new SubmarineRuntimeException("The " + + "parameter of " + KEYTAB + " is missing."); + LOG.error(e.getMessage(), e); + throw e; + } + + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + if(user == null || user.getAuthenticationMethod() == + UserGroupInformation.AuthenticationMethod.SIMPLE) { + SubmarineRuntimeException e = new SubmarineRuntimeException("Failed " + + "to authenticate in secure environment. Please run kinit " + + "command in advance or use " + "--" + KEYTAB + "/--" + PRINCIPAL + + " parameters"); + LOG.error(e.getMessage(), e); + throw e; + } + LOG.info("Submarine job is submitted by user: " + user.getUserName()); + return; + } + + File keytabFile = new File(keytab); + if (!keytabFile.exists()) { + SubmarineRuntimeException e = new SubmarineRuntimeException("No " + + "keytab localized at " + keytab); + LOG.error(e.getMessage(), e); + throw e; + } + UserGroupInformation.loginUserFromKeytab(principal, keytab); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java index 5054a94c7bb..fc57be9f723 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java @@ -125,6 +125,15 @@ public class RunJobCli extends AbstractCli { + "if want to link to first worker's 7070 port, and text of quicklink " + "is Notebook_UI, user need to specify --quicklink " + "Notebook_UI=https://master-0:7070"); + options.addOption(CliConstants.KEYTAB, true, "Specify keytab used by the " + + "job under security environment"); + options.addOption(CliConstants.PRINCIPAL, true, "Specify principal used " + + "by the job under security environment"); + options.addOption(CliConstants.DISTRIBUTE_KEYTAB, false, "Distribute " + + "local keytab to cluster machines for service authentication. If not " + + "sepcified, pre-destributed keytab of which path specified by" + + " parameter" + CliConstants.KEYTAB + " on cluster machines will be " + + "used"); options.addOption("h", "help", false, "Print help"); return options; } @@ -153,7 +162,8 @@ public class RunJobCli extends AbstractCli { // Do parsing GnuParser parser = new GnuParser(); CommandLine cli = parser.parse(options, args); - parameters.updateParametersByParsedCommandline(cli, options, clientContext); + parameters.updateParametersByParsedCommandline(cli, options, + clientContext); } catch (ParseException e) { LOG.error("Exception in parse:", e.getMessage()); printUsages(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java index d923e0f106b..6c8307f4efa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java @@ -51,6 +51,10 @@ public class RunJobParameters extends RunParameters { private boolean waitJobFinish = false; private boolean distributed = false; + private String keytab; + private String principal; + private boolean distributeKeytab = false; + @Override public void updateParametersByParsedCommandline(CommandLine parsedCommandLine, Options options, ClientContext clientContext) @@ -85,6 +89,12 @@ public class RunJobParameters extends RunParameters { + "please double check."); } + String kerberosKeytab = parsedCommandLine.getOptionValue( + CliConstants.KEYTAB); + String kerberosPrincipal = parsedCommandLine.getOptionValue( + CliConstants.PRINCIPAL); + CliUtils.doLoginIfSecure(kerberosKeytab, kerberosPrincipal); + workerResource = null; if (nWorkers > 0) { String workerResourceStr = parsedCommandLine.getOptionValue( @@ -149,10 +159,16 @@ public class RunJobParameters extends RunParameters { String psLaunchCommand = parsedCommandLine.getOptionValue( CliConstants.PS_LAUNCH_CMD); + boolean distributeKerberosKeytab = parsedCommandLine.hasOption(CliConstants + .DISTRIBUTE_KEYTAB); + this.setInputPath(input).setCheckpointPath(jobDir).setNumPS(nPS).setNumWorkers(nWorkers) .setPSLaunchCmd(psLaunchCommand).setWorkerLaunchCmd(workerLaunchCmd) .setPsResource(psResource) - .setTensorboardEnabled(tensorboard); + .setTensorboardEnabled(tensorboard) + .setKeytab(kerberosKeytab) + .setPrincipal(kerberosPrincipal) + .setDistributeKeytab(distributeKerberosKeytab); super.updateParametersByParsedCommandline(parsedCommandLine, options, clientContext); @@ -271,4 +287,32 @@ public class RunJobParameters extends RunParameters { public List getQuicklinks() { return quicklinks; } + + public String getKeytab() { + return keytab; + } + + public RunJobParameters setKeytab(String kerberosKeytab) { + this.keytab = kerberosKeytab; + return this; + } + + public String getPrincipal() { + return principal; + } + + public RunJobParameters setPrincipal(String kerberosPrincipal) { + this.principal = kerberosPrincipal; + return this; + } + + public boolean isDistributeKeytab() { + return distributeKeytab; + } + + public RunJobParameters setDistributeKeytab( + boolean distributeKerberosKeytab) { + this.distributeKeytab = distributeKerberosKeytab; + return this; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java index 31a8b1b3268..055b3c65376 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/ClientContext.java @@ -18,13 +18,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration; +import org.apache.hadoop.yarn.submarine.common.fs.DefaultRemoteDirectoryManager; import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager; import org.apache.hadoop.yarn.submarine.runtimes.RuntimeFactory; public class ClientContext { private Configuration yarnConf = new YarnConfiguration(); - private RemoteDirectoryManager remoteDirectoryManager; + private volatile RemoteDirectoryManager remoteDirectoryManager; private YarnClient yarnClient; private Configuration submarineConfig; private RuntimeFactory runtimeFactory; @@ -51,14 +52,16 @@ public class ClientContext { } public RemoteDirectoryManager getRemoteDirectoryManager() { + if(remoteDirectoryManager == null) { + synchronized (this) { + if(remoteDirectoryManager == null) { + remoteDirectoryManager = new DefaultRemoteDirectoryManager(this); + } + } + } return remoteDirectoryManager; } - public void setRemoteDirectoryManager( - RemoteDirectoryManager remoteDirectoryManager) { - this.remoteDirectoryManager = remoteDirectoryManager; - } - public Configuration getSubmarineConfig() { return submarineConfig; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java index b2e2b410434..1d99b557c2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java @@ -84,7 +84,8 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager { } private Path getJobRootFolder(String jobName) throws IOException { - Path jobRootPath = getUserRootFolder(); + Path userRoot = getUserRootFolder(); + Path jobRootPath = new Path(userRoot, jobName); createFolderIfNotExist(jobRootPath); // Get a file status to make sure it is a absolute path. FileStatus fStatus = fs.getFileStatus(jobRootPath); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java index d9a88a56174..4b4961fd911 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java @@ -15,9 +15,11 @@ package org.apache.hadoop.yarn.submarine.runtimes.yarnservice; import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -28,6 +30,7 @@ import org.apache.hadoop.yarn.service.api.records.ConfigFile; import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink; import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters; @@ -300,8 +303,26 @@ public class YarnServiceJobSubmitter implements JobSubmitter { private void uploadToRemoteFileAndLocalizeToContainerWorkDir(Path stagingDir, String fileToUpload, String destFilename, Component comp) throws IOException { + Path uploadedFilePath = uploadToRemoteFile(stagingDir, fileToUpload); + locateRemoteFileToContainerWorkDir(destFilename, comp, uploadedFilePath); + } + + private void locateRemoteFileToContainerWorkDir(String destFilename, + Component comp, Path uploadedFilePath) throws IOException { FileSystem fs = FileSystem.get(clientContext.getYarnConfig()); + FileStatus fileStatus = fs.getFileStatus(uploadedFilePath); + LOG.info("Uploaded file path = " + fileStatus.getPath()); + + // Set it to component's files list + comp.getConfiguration().getFiles().add(new ConfigFile().srcFile( + fileStatus.getPath().toUri().toString()).destFile(destFilename) + .type(ConfigFile.TypeEnum.STATIC)); + } + + private Path uploadToRemoteFile(Path stagingDir, String fileToUpload) throws + IOException { + FileSystem fs = FileSystem.get(clientContext.getYarnConfig()); // Upload to remote FS under staging area File localFile = new File(fileToUpload); if (!localFile.exists()) { @@ -320,14 +341,13 @@ public class YarnServiceJobSubmitter implements JobSubmitter { fs.copyFromLocalFile(new Path(fileToUpload), uploadedFilePath); uploadedFiles.add(uploadedFilePath); } + return uploadedFilePath; + } - FileStatus fileStatus = fs.getFileStatus(uploadedFilePath); - LOG.info("Uploaded file path = " + fileStatus.getPath()); - - // Set it to component's files list - comp.getConfiguration().getFiles().add(new ConfigFile().srcFile( - fileStatus.getPath().toUri().toString()).destFile(destFilename) - .type(ConfigFile.TypeEnum.STATIC)); + private void setPermission(Path destPath, FsPermission permission) throws + IOException { + FileSystem fs = FileSystem.get(clientContext.getYarnConfig()); + fs.setPermission(destPath, new FsPermission(permission)); } private void handleLaunchCommand(RunJobParameters parameters, @@ -475,6 +495,7 @@ public class YarnServiceJobSubmitter implements JobSubmitter { serviceSpec.setName(parameters.getName()); serviceSpec.setVersion(String.valueOf(System.currentTimeMillis())); serviceSpec.setArtifact(getDockerArtifact(parameters.getDockerImageName())); + handleKerberosPrincipal(parameters); handleServiceEnvs(serviceSpec, parameters); @@ -547,6 +568,32 @@ public class YarnServiceJobSubmitter implements JobSubmitter { return serviceSpecFile.getAbsolutePath(); } + private void handleKerberosPrincipal(RunJobParameters parameters) throws + IOException { + if(StringUtils.isNotBlank(parameters.getKeytab()) && StringUtils + .isNotBlank(parameters.getPrincipal())) { + String keytab = parameters.getKeytab(); + String principal = parameters.getPrincipal(); + if(parameters.isDistributeKeytab()) { + Path stagingDir = + clientContext.getRemoteDirectoryManager().getJobStagingArea( + parameters.getName(), true); + Path remoteKeytabPath = uploadToRemoteFile(stagingDir, keytab); + //only the owner has read access + setPermission(remoteKeytabPath, + FsPermission.createImmutable((short)Integer.parseInt("400", 8))); + serviceSpec.setKerberosPrincipal(new KerberosPrincipal().keytab( + remoteKeytabPath.toString()).principalName(principal)); + } else { + if(!keytab.startsWith("file")) { + keytab = "file://" + keytab; + } + serviceSpec.setKerberosPrincipal(new KerberosPrincipal().keytab( + keytab).principalName(principal)); + } + } + } + /** * {@inheritDoc} */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java index 240de064b2f..184d53d7a01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java @@ -92,7 +92,9 @@ public class TestRunJobCliParsing { "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", "python run-job.py", "--worker_resources", "memory=2048M,vcores=2", "--ps_resources", "memory=4G,vcores=4", "--tensorboard", "true", - "--ps_launch_cmd", "python run-ps.py", "--verbose" }); + "--ps_launch_cmd", "python run-ps.py", "--keytab", "/keytab/path", + "--principal", "user/_HOST@domain.com", "--distribute_keytab", + "--verbose" }); RunJobParameters jobRunParameters = runJobCli.getRunJobParameters(); @@ -108,6 +110,11 @@ public class TestRunJobCliParsing { jobRunParameters.getWorkerResource()); Assert.assertEquals(jobRunParameters.getDockerImageName(), "tf-docker:1.1.0"); + Assert.assertEquals(jobRunParameters.getKeytab(), + "/keytab/path"); + Assert.assertEquals(jobRunParameters.getPrincipal(), + "user/_HOST@domain.com"); + Assert.assertTrue(jobRunParameters.isDistributeKeytab()); Assert.assertTrue(SubmarineLogs.isVerbose()); }