From c771fe6e10bd3f8533a01026de3c33db2a983cf4 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Wed, 12 Dec 2018 11:43:23 -0800 Subject: [PATCH] YARN-8714. [Submarine] Support files/tarballs to be localized for a training job. (Zhankun Tang via wangda) Change-Id: I845131273e52a9d81dbc813ea6d4af06b205e334 --- .../submarine/client/cli/CliConstants.java | 1 + .../yarn/submarine/client/cli/RunJobCli.java | 18 + .../client/cli/param/Localization.java | 133 +++ .../client/cli/param/RunJobParameters.java | 15 + .../client/cli/param/package-info.java | 19 + .../common/conf/SubmarineConfiguration.java | 15 + .../fs/DefaultRemoteDirectoryManager.java | 70 +- .../common/fs/RemoteDirectoryManager.java | 18 +- .../common/FSBasedSubmarineStorageImpl.java | 8 +- .../yarnservice/YarnServiceJobSubmitter.java | 259 +++++- .../src/site/markdown/QuickStart.md | 26 + .../yarnservice/TestYarnServiceRunJobCli.java | 795 +++++++++++++++++- .../submarine/common/MockClientContext.java | 8 +- .../common/fs/MockRemoteDirectoryManager.java | 90 +- 14 files changed, 1433 insertions(+), 42 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/Localization.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java index 2d7a4724a4e..da4253be71b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java @@ -52,6 +52,7 @@ public class CliConstants { public static final String QUICKLINK = "quicklink"; public static final String TENSORBOARD_DOCKER_IMAGE = "tensorboard_docker_image"; + public static final String LOCALIZATION = "localization"; public static final String KEYTAB = "keytab"; public static final String PRINCIPAL = "principal"; public static final String DISTRIBUTE_KEYTAB = "distribute_keytab"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java index fc57be9f723..b764d6d8099 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java @@ -125,6 +125,24 @@ private Options generateOptions() { + "if want to link to first worker's 7070 port, and text of quicklink " + "is Notebook_UI, user need to specify --quicklink " + "Notebook_UI=https://master-0:7070"); + options.addOption(CliConstants.LOCALIZATION, true, "Specify" + + " localization to make remote/local file/directory available to" + + " all container(Docker)." + + " Argument format is \"RemoteUri:LocalFilePath[:rw] \" (ro" + + " permission is not supported yet)" + + " The RemoteUri can be a file or directory in local or" + + " HDFS or s3 or abfs or http .etc." + + " The LocalFilePath can be absolute or relative." + + " If it's a relative path, it'll be" + + " under container's implied working directory" + + " but sub directory is not supported yet." + + " This option can be set mutiple times." + + " Examples are \n" + + "-localization \"hdfs:///user/yarn/mydir2:/opt/data\"\n" + + "-localization \"s3a:///a/b/myfile1:./\"\n" + + "-localization \"https:///a/b/myfile2:./myfile\"\n" + + "-localization \"/user/yarn/mydir3:/opt/mydir3\"\n" + + "-localization \"./mydir1:.\"\n"); options.addOption(CliConstants.KEYTAB, true, "Specify keytab used by the " + "job under security environment"); options.addOption(CliConstants.PRINCIPAL, true, "Specify principal used " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/Localization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/Localization.java new file mode 100644 index 00000000000..fe9c6de4123 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/Localization.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.submarine.client.cli.param; + +import org.apache.commons.cli.ParseException; + +import java.util.Arrays; +import java.util.List; + +/** + * Localization parameter. + * */ +public class Localization { + + private String mountPermissionPattern = "(wr|rw)$"; + /** + * Regex for directory/file path in container. + * YARN only support absolute path for mount, but we can + * support some relative path. + * For relative path, we only allow ".", "./","./name". + * relative path like "./a/b" is not allowed. + * "." and "./" means original dir/file name in container working directory + * "./name" means use same or new "name" in container working directory + * A absolute path means same path in container filesystem + */ + private String localPathPattern = "((^\\.$)|(^\\./$)|(^\\./[^/]+)|(^/.*))"; + private String remoteUri; + private String localPath; + + // Read write by default + private String mountPermission = "rw"; + + private static final List SUPPORTED_SCHEME = Arrays.asList( + "hdfs", "oss", "s3a", "s3n", "wasb", + "wasbs", "abfs", "abfss", "adl", "har", + "ftp", "http", "https", "viewfs", "swebhdfs", + "webhdfs", "swift"); + + public void parse(String arg) throws ParseException { + String[] tokens = arg.split(":"); + int minimum = "a:b".split(":").length; + int minimumWithPermission = "a:b:rw".split(":").length; + int minimumParts = minimum; + int miniPartsWithRemoteScheme = "scheme://a:b".split(":").length; + int maximumParts = "scheme://a:b:rw".split(":").length; + // If remote uri starts with a remote scheme + if (isSupportedScheme(tokens[0])) { + minimumParts = miniPartsWithRemoteScheme; + } + if (tokens.length < minimumParts + || tokens.length > maximumParts) { + throw new ParseException("Invalid parameter," + + "should be \"remoteUri:localPath[:rw|:wr]\" " + + "format for --localizations"); + } + + /** + * RemoteUri starts with remote scheme. + * Merge part 0 and 1 to build a hdfs path in token[0]. + * toke[1] will be localPath to ease following logic + * */ + if (minimumParts == miniPartsWithRemoteScheme) { + tokens[0] = tokens[0] + ":" + tokens[1]; + tokens[1] = tokens[2]; + if (tokens.length == maximumParts) { + // Has permission part + mountPermission = tokens[maximumParts - 1]; + } + } + // RemoteUri starts with linux file path + if (minimumParts == minimum + && tokens.length == minimumWithPermission) { + // Has permission part + mountPermission = tokens[minimumWithPermission - 1]; + } + remoteUri = tokens[0]; + localPath = tokens[1]; + if (!localPath.matches(localPathPattern)) { + throw new ParseException("Invalid local file path:" + + localPath + + ", it only support \".\", \"./\", \"./name\" and " + + "absolute path."); + } + if (!mountPermission.matches(mountPermissionPattern)) { + throw new ParseException("Invalid mount permission (ro is not " + + "supported yet), " + mountPermission); + } + } + + public String getRemoteUri() { + return remoteUri; + } + + public void setRemoteUri(String rUti) { + this.remoteUri = rUti; + } + + public String getLocalPath() { + return localPath; + } + + public void setLocalPath(String lPath) { + this.localPath = lPath; + } + + public String getMountPermission() { + return mountPermission; + } + + public void setMountPermission(String mPermission) { + this.mountPermission = mPermission; + } + + private boolean isSupportedScheme(String scheme) { + return SUPPORTED_SCHEME.contains(scheme); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java index 6c8307f4efa..111d4ebd61a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java @@ -44,6 +44,7 @@ public class RunJobParameters extends RunParameters { private String workerLaunchCmd; private String psLaunchCmd; private List quicklinks = new ArrayList<>(); + private List localizations = new ArrayList<>(); private String psDockerImage = null; private String workerDockerImage = null; @@ -159,6 +160,16 @@ public void updateParametersByParsedCommandline(CommandLine parsedCommandLine, String psLaunchCommand = parsedCommandLine.getOptionValue( CliConstants.PS_LAUNCH_CMD); + // Localizations + String[] localizationsStr = parsedCommandLine.getOptionValues( + CliConstants.LOCALIZATION); + if (null != localizationsStr) { + for (String loc : localizationsStr) { + Localization localization = new Localization(); + localization.parse(loc); + localizations.add(localization); + } + } boolean distributeKerberosKeytab = parsedCommandLine.hasOption(CliConstants .DISTRIBUTE_KEYTAB); @@ -288,6 +299,10 @@ public List getQuicklinks() { return quicklinks; } + public List getLocalizations() { + return localizations; + } + public String getKeytab() { return keytab; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/package-info.java new file mode 100644 index 00000000000..2df120f31c0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.submarine.client.cli.param; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java index c9e6b7bf6e6..7e70717a81b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java @@ -19,6 +19,21 @@ public class SubmarineConfiguration extends Configuration { private static final String SUBMARINE_CONFIGURATION_FILE = "submarine.xml"; + public static final String SUBMARINE_CONFIGURATION_PREFIX = "submarine."; + + public static final String SUBMARINE_LOCALIZATION_PREFIX = + SUBMARINE_CONFIGURATION_PREFIX + "localization."; + /** + * Limit the size of directory/file to be localized. + * To avoid exhausting local disk space, + * this limit both remote and local file to be localized + */ + public static final String LOCALIZATION_MAX_ALLOWED_FILE_SIZE_MB = + SUBMARINE_LOCALIZATION_PREFIX + "max-allowed-file-size-mb"; + + // Default 2GB + public static final long DEFAULT_MAX_ALLOWED_REMOTE_URI_SIZE_MB = 2048; + public SubmarineConfiguration() { this(new Configuration(false), true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java index 1d99b557c2f..bfb20da8766 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java @@ -14,22 +14,28 @@ package org.apache.hadoop.yarn.submarine.common.fs; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.submarine.client.cli.CliConstants; import org.apache.hadoop.yarn.submarine.common.ClientContext; +import java.io.File; import java.io.IOException; +import java.net.URI; /** * Manages remote directories for staging, log, etc. * TODO, need to properly handle permission / name validation, etc. */ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager { - FileSystem fs; + private FileSystem fs; + private Configuration conf; public DefaultRemoteDirectoryManager(ClientContext context) { + this.conf = context.getYarnConfig(); try { this.fs = FileSystem.get(context.getYarnConfig()); } catch (IOException e) { @@ -38,7 +44,8 @@ public DefaultRemoteDirectoryManager(ClientContext context) { } @Override - public Path getJobStagingArea(String jobName, boolean create) throws IOException { + public Path getJobStagingArea(String jobName, boolean create) + throws IOException { Path staging = new Path(getJobRootFolder(jobName), "staging"); if (create) { createFolderIfNotExist(staging); @@ -61,7 +68,8 @@ public Path getJobCheckpointDir(String jobName, boolean create) } @Override - public Path getModelDir(String modelName, boolean create) throws IOException { + public Path getModelDir(String modelName, boolean create) + throws IOException { Path modelDir = new Path(new Path("submarine", "models"), modelName); if (create) { createFolderIfNotExist(modelDir); @@ -70,10 +78,15 @@ public Path getModelDir(String modelName, boolean create) throws IOException { } @Override - public FileSystem getFileSystem() { + public FileSystem getDefaultFileSystem() { return fs; } + @Override + public FileSystem getFileSystemByUri(String uri) throws IOException { + return FileSystem.get(URI.create(uri), conf); + } + @Override public Path getUserRootFolder() throws IOException { Path rootPath = new Path("submarine", "jobs"); @@ -83,6 +96,55 @@ public Path getUserRootFolder() throws IOException { return fStatus.getPath(); } + @Override + public boolean isDir(String uri) throws IOException { + if (isRemote(uri)) { + return getFileSystemByUri(uri).getFileStatus(new Path(uri)).isDirectory(); + } + return new File(uri).isDirectory(); + } + + @Override + public boolean isRemote(String uri) { + String scheme = new Path(uri).toUri().getScheme(); + if (null == scheme) { + return false; + } + return !scheme.startsWith("file://"); + } + + @Override + public boolean copyRemoteToLocal(String remoteUri, String localUri) + throws IOException { + // Delete old to avoid failure in FileUtil.copy + File old = new File(localUri); + if (old.exists()) { + if (!FileUtil.fullyDelete(old)) { + throw new IOException("Failed to delete dir:" + + old.getAbsolutePath()); + } + } + return FileUtil.copy(getFileSystemByUri(remoteUri), new Path(remoteUri), + new File(localUri), false, + conf); + } + + @Override + public boolean existsRemoteFile(Path url) throws IOException { + return getFileSystemByUri(url.toUri().toString()).exists(url); + } + + @Override + public FileStatus getRemoteFileStatus(Path url) throws IOException { + return getFileSystemByUri(url.toUri().toString()).getFileStatus(url); + } + + @Override + public long getRemoteFileSize(String uri) throws IOException { + return getFileSystemByUri(uri) + .getContentSummary(new Path(uri)).getSpaceConsumed(); + } + private Path getJobRootFolder(String jobName) throws IOException { Path userRoot = getUserRootFolder(); Path jobRootPath = new Path(userRoot, jobName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java index ad0d4280b3f..5f71d19d375 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java @@ -14,6 +14,7 @@ package org.apache.hadoop.yarn.submarine.common.fs; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -26,7 +27,22 @@ public interface RemoteDirectoryManager { Path getModelDir(String modelName, boolean create) throws IOException; - FileSystem getFileSystem() throws IOException; + FileSystem getDefaultFileSystem() throws IOException; + + FileSystem getFileSystemByUri(String uri) throws IOException; Path getUserRootFolder() throws IOException; + + boolean isDir(String uri) throws IOException; + + boolean isRemote(String uri) throws IOException; + + boolean copyRemoteToLocal(String remoteUri, String localUri) + throws IOException; + + boolean existsRemoteFile(Path uri) throws IOException; + + FileStatus getRemoteFileStatus(Path uri) throws IOException; + + long getRemoteFileSize(String uri) throws IOException; } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java index 18815107ffb..fb7c12f38a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java @@ -42,7 +42,7 @@ public FSBasedSubmarineStorageImpl(ClientContext clientContext) { public void addNewJob(String jobName, Map jobInfo) throws IOException { Path jobInfoPath = getJobInfoPath(jobName, true); - FSDataOutputStream fos = rdm.getFileSystem().create(jobInfoPath); + FSDataOutputStream fos = rdm.getDefaultFileSystem().create(jobInfoPath); serializeMap(fos, jobInfo); } @@ -50,7 +50,7 @@ public void addNewJob(String jobName, Map jobInfo) public Map getJobInfoByName(String jobName) throws IOException { Path jobInfoPath = getJobInfoPath(jobName, false); - FSDataInputStream fis = rdm.getFileSystem().open(jobInfoPath); + FSDataInputStream fis = rdm.getDefaultFileSystem().open(jobInfoPath); return deserializeMap(fis); } @@ -58,7 +58,7 @@ public Map getJobInfoByName(String jobName) public void addNewModel(String modelName, String version, Map modelInfo) throws IOException { Path modelInfoPath = getModelInfoPath(modelName, version, true); - FSDataOutputStream fos = rdm.getFileSystem().create(modelInfoPath); + FSDataOutputStream fos = rdm.getDefaultFileSystem().create(modelInfoPath); serializeMap(fos, modelInfo); } @@ -66,7 +66,7 @@ public void addNewModel(String modelName, String version, public Map getModelInfoByName(String modelName, String version) throws IOException { Path modelInfoPath = getModelInfoPath(modelName, version, false); - FSDataInputStream fis = rdm.getFileSystem().open(modelInfoPath); + FSDataInputStream fis = rdm.getDefaultFileSystem().open(modelInfoPath); return deserializeMap(fis); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java index 2e84c969b02..496ba7c43b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java @@ -18,6 +18,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -32,17 +33,21 @@ import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; +import org.apache.hadoop.yarn.submarine.client.cli.param.Localization; import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink; import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters; import org.apache.hadoop.yarn.submarine.common.ClientContext; import org.apache.hadoop.yarn.submarine.common.Envs; import org.apache.hadoop.yarn.submarine.common.api.TaskType; +import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration; import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs; +import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager; import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; @@ -56,6 +61,8 @@ import java.util.Map; import java.util.Set; import java.util.StringTokenizer; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS; @@ -307,7 +314,8 @@ private void uploadToRemoteFileAndLocalizeToContainerWorkDir(Path stagingDir, } private void locateRemoteFileToContainerWorkDir(String destFilename, - Component comp, Path uploadedFilePath) throws IOException { + Component comp, Path uploadedFilePath) + throws IOException { FileSystem fs = FileSystem.get(clientContext.getYarnConfig()); FileStatus fileStatus = fs.getFileStatus(uploadedFilePath); @@ -321,7 +329,9 @@ private void locateRemoteFileToContainerWorkDir(String destFilename, private Path uploadToRemoteFile(Path stagingDir, String fileToUpload) throws IOException { - FileSystem fs = FileSystem.get(clientContext.getYarnConfig()); + FileSystem fs = clientContext.getRemoteDirectoryManager() + .getDefaultFileSystem(); + // Upload to remote FS under staging area File localFile = new File(fileToUpload); if (!localFile.exists()) { @@ -368,6 +378,111 @@ private void handleLaunchCommand(RunJobParameters parameters, localScriptFile); } + private String getLastNameFromPath(String srcFileStr) { + return new Path(srcFileStr).getName(); + } + + /** + * May download a remote uri(file/dir) and zip. + * Skip download if local dir + * Remote uri can be a local dir(won't download) + * or remote HDFS dir, s3 dir/file .etc + * */ + private String mayDownloadAndZipIt(String remoteDir, String zipFileName, + boolean doZip) + throws IOException { + RemoteDirectoryManager rdm = clientContext.getRemoteDirectoryManager(); + //Append original modification time and size to zip file name + String suffix; + String srcDir = remoteDir; + String zipDirPath = + System.getProperty("java.io.tmpdir") + "/" + zipFileName; + boolean needDeleteTempDir = false; + if (rdm.isRemote(remoteDir)) { + //Append original modification time and size to zip file name + FileStatus status = rdm.getRemoteFileStatus(new Path(remoteDir)); + suffix = "_" + status.getModificationTime() + + "-" + rdm.getRemoteFileSize(remoteDir); + // Download them to temp dir + boolean downloaded = rdm.copyRemoteToLocal(remoteDir, zipDirPath); + if (!downloaded) { + throw new IOException("Failed to download files from " + + remoteDir); + } + LOG.info("Downloaded remote: {} to local: {}", remoteDir, zipDirPath); + srcDir = zipDirPath; + needDeleteTempDir = true; + } else { + File localDir = new File(remoteDir); + suffix = "_" + localDir.lastModified() + + "-" + localDir.length(); + } + if (!doZip) { + return srcDir; + } + // zip a local dir + String zipFileUri = zipDir(srcDir, zipDirPath + suffix + ".zip"); + // delete downloaded temp dir + if (needDeleteTempDir) { + deleteFiles(srcDir); + } + return zipFileUri; + } + + @VisibleForTesting + public String zipDir(String srcDir, String dstFile) throws IOException { + FileOutputStream fos = new FileOutputStream(dstFile); + ZipOutputStream zos = new ZipOutputStream(fos); + File srcFile = new File(srcDir); + LOG.info("Compressing {}", srcDir); + addDirToZip(zos, srcFile, srcFile); + // close the ZipOutputStream + zos.close(); + LOG.info("Compressed {} to {}", srcDir, dstFile); + return dstFile; + } + + private void deleteFiles(String localUri) { + boolean success = FileUtil.fullyDelete(new File(localUri)); + if (!success) { + LOG.warn("Fail to delete {}", localUri); + } + LOG.info("Deleted {}", localUri); + } + + private void addDirToZip(ZipOutputStream zos, File srcFile, File base) + throws IOException { + File[] files = srcFile.listFiles(); + if (null == files) { + return; + } + FileInputStream fis = null; + for (int i = 0; i < files.length; i++) { + // if it's directory, add recursively + if (files[i].isDirectory()) { + addDirToZip(zos, files[i], base); + continue; + } + byte[] buffer = new byte[1024]; + try { + fis = new FileInputStream(files[i]); + String name = base.toURI().relativize(files[i].toURI()).getPath(); + LOG.info(" Zip adding: " + name); + zos.putNextEntry(new ZipEntry(name)); + int length; + while ((length = fis.read(buffer)) > 0) { + zos.write(buffer, 0, length); + } + zos.flush(); + } finally { + if (fis != null) { + fis.close(); + } + zos.closeEntry(); + } + } + } + private void addWorkerComponent(Service service, RunJobParameters parameters, TaskType taskType) throws IOException { Component workerComponent = new Component(); @@ -498,6 +613,8 @@ private Service createServiceByParameters(RunJobParameters parameters) handleServiceEnvs(serviceSpec, parameters); + handleLocalizations(parameters); + if (parameters.getNumWorkers() > 0) { addWorkerComponents(serviceSpec, parameters); } @@ -553,6 +670,142 @@ private Service createServiceByParameters(RunJobParameters parameters) return serviceSpec; } + /** + * Localize dependencies for all containers. + * If remoteUri is a local directory, + * we'll zip it, upload to HDFS staging dir HDFS. + * If remoteUri is directory, we'll download it, zip it and upload + * to HDFS. + * If localFilePath is ".", we'll use remoteUri's file/dir name + * */ + private void handleLocalizations(RunJobParameters parameters) + throws IOException { + // Handle localizations + Path stagingDir = + clientContext.getRemoteDirectoryManager().getJobStagingArea( + parameters.getName(), true); + List locs = parameters.getLocalizations(); + String remoteUri; + String containerLocalPath; + RemoteDirectoryManager rdm = clientContext.getRemoteDirectoryManager(); + + // Check to fail fast + for (Localization loc : locs) { + remoteUri = loc.getRemoteUri(); + Path resourceToLocalize = new Path(remoteUri); + // Check if remoteUri exists + if (rdm.isRemote(remoteUri)) { + // check if exists + if (!rdm.existsRemoteFile(resourceToLocalize)) { + throw new FileNotFoundException( + "File " + remoteUri + " doesn't exists."); + } + } else { + // Check if exists + File localFile = new File(remoteUri); + if (!localFile.exists()) { + throw new FileNotFoundException( + "File " + remoteUri + " doesn't exists."); + } + } + // check remote file size + validFileSize(remoteUri); + } + // Start download remote if needed and upload to HDFS + for (Localization loc : locs) { + remoteUri = loc.getRemoteUri(); + containerLocalPath = loc.getLocalPath(); + String srcFileStr = remoteUri; + ConfigFile.TypeEnum destFileType = ConfigFile.TypeEnum.STATIC; + Path resourceToLocalize = new Path(remoteUri); + boolean needUploadToHDFS = true; + + /** + * Special handling for remoteUri directory. + * */ + boolean needDeleteTempFile = false; + if (rdm.isDir(remoteUri)) { + destFileType = ConfigFile.TypeEnum.ARCHIVE; + srcFileStr = mayDownloadAndZipIt( + remoteUri, getLastNameFromPath(srcFileStr), true); + } else if (rdm.isRemote(remoteUri)) { + if (!needHdfs(remoteUri)) { + // Non HDFS remote uri. Non directory, no need to zip + srcFileStr = mayDownloadAndZipIt( + remoteUri, getLastNameFromPath(srcFileStr), false); + needDeleteTempFile = true; + } else { + // HDFS file, no need to upload + needUploadToHDFS = false; + } + } + + // Upload file to HDFS + if (needUploadToHDFS) { + resourceToLocalize = uploadToRemoteFile(stagingDir, srcFileStr); + } + if (needDeleteTempFile) { + deleteFiles(srcFileStr); + } + // Remove .zip from zipped dir name + if (destFileType == ConfigFile.TypeEnum.ARCHIVE + && srcFileStr.endsWith(".zip")) { + // Delete local zip file + deleteFiles(srcFileStr); + int suffixIndex = srcFileStr.lastIndexOf('_'); + srcFileStr = srcFileStr.substring(0, suffixIndex); + } + // If provided, use the name of local uri + if (!containerLocalPath.equals(".") + && !containerLocalPath.equals("./")) { + // Change the YARN localized file name to what'll used in container + srcFileStr = getLastNameFromPath(containerLocalPath); + } + String localizedName = getLastNameFromPath(srcFileStr); + LOG.info("The file/dir to be localized is {}", + resourceToLocalize.toString()); + LOG.info("Its localized file name will be {}", localizedName); + serviceSpec.getConfiguration().getFiles().add(new ConfigFile().srcFile( + resourceToLocalize.toUri().toString()).destFile(localizedName) + .type(destFileType)); + // set mounts + // if mount path is absolute, just use it. + // if relative, no need to mount explicitly + if (containerLocalPath.startsWith("/")) { + String mountStr = getLastNameFromPath(srcFileStr) + ":" + + containerLocalPath + ":" + loc.getMountPermission(); + LOG.info("Add bind-mount string {}", mountStr); + appendToEnv(serviceSpec, "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS", + mountStr, ","); + } + } + } + + private void validFileSize(String uri) throws IOException { + RemoteDirectoryManager rdm = clientContext.getRemoteDirectoryManager(); + long actualSizeByte; + String locationType = "Local"; + if (rdm.isRemote(uri)) { + actualSizeByte = clientContext.getRemoteDirectoryManager() + .getRemoteFileSize(uri); + locationType = "Remote"; + } else { + actualSizeByte = FileUtil.getDU(new File(uri)); + } + long maxFileSizeMB = clientContext.getSubmarineConfig() + .getLong(SubmarineConfiguration.LOCALIZATION_MAX_ALLOWED_FILE_SIZE_MB, + SubmarineConfiguration.DEFAULT_MAX_ALLOWED_REMOTE_URI_SIZE_MB); + LOG.info("{} fie/dir: {}, size(Byte):{}," + + " Allowed max file/dir size: {}", + locationType, uri, actualSizeByte, maxFileSizeMB * 1024 * 1024); + + if (actualSizeByte > maxFileSizeMB * 1024 * 1024) { + throw new IOException(uri + " size(Byte): " + + actualSizeByte + " exceeds configured max size:" + + maxFileSizeMB * 1024 * 1024); + } + } + private String generateServiceSpecFile(Service service) throws IOException { File serviceSpecFile = File.createTempFile(service.getName(), ".json"); String buffer = jsonSerDeser.toJson(service); @@ -622,8 +875,6 @@ public ApplicationId submitJob(RunJobParameters parameters) return appid; } - - @VisibleForTesting public Service getServiceSpec() { return serviceSpec; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/markdown/QuickStart.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/markdown/QuickStart.md index da4fb95a04f..8e7a9566b81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/markdown/QuickStart.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/markdown/QuickStart.md @@ -70,7 +70,33 @@ usage: job run directly used to launch the worker -worker_resources Resource of each worker, for example memory-mb=2048,vcores=2,yarn.io/gpu=2 + -localization Specify localization to remote/local + file/directory available to all container(Docker). + Argument format is "RemoteUri:LocalFilePath[:rw]" + (ro permission is not supported yet). + The RemoteUri can be a file or directory in local + or HDFS or s3 or abfs or http .etc. + The LocalFilePath can be absolute or relative. + If relative, it'll be under container's implied + working directory. + This option can be set mutiple times. + Examples are + -localization "hdfs:///user/yarn/mydir2:/opt/data" + -localization "s3a:///a/b/myfile1:./" + -localization "https:///a/b/myfile2:./myfile" + -localization "/user/yarn/mydir3:/opt/mydir3" + -localization "./mydir1:." ``` +### Submarine Configuration + +For submarine internal configuration, please create a `submarine.xml` which should be placed under `$HADOOP_CONF_DIR`. + +|Configuration Name | Description | +|:---- |:---- | +| `submarine.runtime.class` | Optional. Full qualified class name for your runtime factory. | +| `submarine.localization.max-allowed-file-size-mb` | Optional. This sets a size limit to the file/directory to be localized in "-localization" CLI option. 2GB by default. | + + ### Launch Standalone Tensorflow Application: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java index 439103042a0..f3d140975fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java @@ -19,15 +19,20 @@ package org.apache.hadoop.yarn.submarine.client.cli.yarnservice; import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ConfigFile; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli; import org.apache.hadoop.yarn.submarine.common.MockClientContext; import org.apache.hadoop.yarn.submarine.common.api.TaskType; +import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration; import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs; +import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager; import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter; import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants; import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage; @@ -38,15 +43,22 @@ import org.junit.Before; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.List; import java.util.Map; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class TestYarnServiceRunJobCli { @@ -137,13 +149,13 @@ public void testBasicRunJobForDistributedTraining() throws Exception { Assert.assertFalse(SubmarineLogs.isVerbose()); runJobCli.run( - new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", "--input_path", "s3://input", "--checkpoint_path", "s3://output", "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", "python run-job.py", "--worker_resources", "memory=2048M,vcores=2", "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", "ps.image", "--worker_docker_image", "worker.image", - "--ps_launch_cmd", "python run-ps.py", "--verbose" }); + "--ps_launch_cmd", "python run-ps.py", "--verbose"}); Service serviceSpec = getServiceSpecFromJobSubmitter( runJobCli.getJobSubmitter()); Assert.assertEquals(3, serviceSpec.getComponents().size()); @@ -162,14 +174,14 @@ public void testBasicRunJobForDistributedTrainingWithTensorboard() Assert.assertFalse(SubmarineLogs.isVerbose()); runJobCli.run( - new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", "--input_path", "s3://input", "--checkpoint_path", "s3://output", "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", "python run-job.py", "--worker_resources", "memory=2048M,vcores=2", "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", "ps.image", "--worker_docker_image", "worker.image", "--tensorboard", "--ps_launch_cmd", "python run-ps.py", - "--verbose" }); + "--verbose"}); Service serviceSpec = getServiceSpecFromJobSubmitter( runJobCli.getJobSubmitter()); Assert.assertEquals(4, serviceSpec.getComponents().size()); @@ -192,10 +204,10 @@ public void testBasicRunJobForSingleNodeTraining() throws Exception { Assert.assertFalse(SubmarineLogs.isVerbose()); runJobCli.run( - new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", "--input_path", "s3://input", "--checkpoint_path", "s3://output", "--num_workers", "1", "--worker_launch_cmd", "python run-job.py", - "--worker_resources", "memory=2G,vcores=2", "--verbose" }); + "--worker_resources", "memory=2G,vcores=2", "--verbose"}); Service serviceSpec = getServiceSpecFromJobSubmitter( runJobCli.getJobSubmitter()); @@ -212,9 +224,9 @@ public void testTensorboardOnlyService() throws Exception { Assert.assertFalse(SubmarineLogs.isVerbose()); runJobCli.run( - new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", "--input_path", "s3://input", "--checkpoint_path", "s3://output", - "--num_workers", "0", "--tensorboard", "--verbose" }); + "--num_workers", "0", "--tensorboard", "--verbose"}); Service serviceSpec = getServiceSpecFromJobSubmitter( runJobCli.getJobSubmitter()); @@ -233,11 +245,11 @@ public void testTensorboardOnlyServiceWithCustomizedDockerImageAndResourceCkptPa Assert.assertFalse(SubmarineLogs.isVerbose()); runJobCli.run( - new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", "--input_path", "s3://input", "--checkpoint_path", "s3://output", "--num_workers", "0", "--tensorboard", "--verbose", "--tensorboard_resources", "memory=2G,vcores=2", - "--tensorboard_docker_image", "tb_docker_image:001" }); + "--tensorboard_docker_image", "tb_docker_image:001"}); Service serviceSpec = getServiceSpecFromJobSubmitter( runJobCli.getJobSubmitter()); @@ -256,10 +268,10 @@ public void testTensorboardOnlyServiceWithCustomizedDockerImageAndResource() Assert.assertFalse(SubmarineLogs.isVerbose()); runJobCli.run( - new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", "--num_workers", "0", "--tensorboard", "--verbose", "--tensorboard_resources", "memory=2G,vcores=2", - "--tensorboard_docker_image", "tb_docker_image:001" }); + "--tensorboard_docker_image", "tb_docker_image:001"}); Service serviceSpec = getServiceSpecFromJobSubmitter( runJobCli.getJobSubmitter()); @@ -307,7 +319,7 @@ private void verifyTensorboardComponent(RunJobCli runJobCli, Assert.assertEquals( runJobCli.getRunJobParameters().getTensorboardDockerImage(), tensorboardComp.getArtifact().getId()); - } else{ + } else { Assert.assertNull(tensorboardComp.getArtifact()); } @@ -352,11 +364,11 @@ public void testBasicRunJobForSingleNodeTrainingWithTensorboard() Assert.assertFalse(SubmarineLogs.isVerbose()); runJobCli.run( - new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", "--input_path", "s3://input", "--checkpoint_path", "s3://output", "--num_workers", "1", "--worker_launch_cmd", "python run-job.py", "--worker_resources", "memory=2G,vcores=2", "--tensorboard", - "--verbose" }); + "--verbose"}); Service serviceSpec = getServiceSpecFromJobSubmitter( runJobCli.getJobSubmitter()); @@ -376,10 +388,10 @@ public void testBasicRunJobForSingleNodeTrainingWithGeneratedCheckpoint() Assert.assertFalse(SubmarineLogs.isVerbose()); runJobCli.run( - new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", "--input_path", "s3://input", "--num_workers", "1", "--worker_launch_cmd", "python run-job.py", "--worker_resources", - "memory=2G,vcores=2", "--tensorboard", "--verbose" }); + "memory=2G,vcores=2", "--tensorboard", "--verbose"}); Service serviceSpec = getServiceSpecFromJobSubmitter( runJobCli.getJobSubmitter()); @@ -398,11 +410,11 @@ public void testParameterStorageForTrainingJob() throws Exception { Assert.assertFalse(SubmarineLogs.isVerbose()); runJobCli.run( - new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", "--input_path", "s3://input", "--checkpoint_path", "s3://output", "--num_workers", "1", "--worker_launch_cmd", "python run-job.py", "--worker_resources", "memory=2G,vcores=2", "--tensorboard", "true", - "--verbose" }); + "--verbose"}); SubmarineStorage storage = mockClientContext.getRuntimeFactory().getSubmarineStorage(); Map jobInfo = storage.getJobInfoByName("my-job"); @@ -419,7 +431,7 @@ public void testAddQuicklinksWithoutTensorboard() throws Exception { Assert.assertFalse(SubmarineLogs.isVerbose()); runJobCli.run( - new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", "--input_path", "s3://input", "--checkpoint_path", "s3://output", "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", "python run-job.py", "--worker_resources", "memory=2048M,vcores=2", @@ -427,7 +439,7 @@ public void testAddQuicklinksWithoutTensorboard() throws Exception { "ps.image", "--worker_docker_image", "worker.image", "--ps_launch_cmd", "python run-ps.py", "--verbose", "--quicklink", "AAA=http://master-0:8321", "--quicklink", - "BBB=http://worker-0:1234" }); + "BBB=http://worker-0:1234"}); Service serviceSpec = getServiceSpecFromJobSubmitter( runJobCli.getJobSubmitter()); Assert.assertEquals(3, serviceSpec.getComponents().size()); @@ -447,7 +459,7 @@ public void testAddQuicklinksWithTensorboard() throws Exception { Assert.assertFalse(SubmarineLogs.isVerbose()); runJobCli.run( - new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0", + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", "--input_path", "s3://input", "--checkpoint_path", "s3://output", "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", "python run-job.py", "--worker_resources", "memory=2048M,vcores=2", @@ -455,7 +467,7 @@ public void testAddQuicklinksWithTensorboard() throws Exception { "ps.image", "--worker_docker_image", "worker.image", "--ps_launch_cmd", "python run-ps.py", "--verbose", "--quicklink", "AAA=http://master-0:8321", "--quicklink", - "BBB=http://worker-0:1234", "--tensorboard" }); + "BBB=http://worker-0:1234", "--tensorboard"}); Service serviceSpec = getServiceSpecFromJobSubmitter( runJobCli.getJobSubmitter()); Assert.assertEquals(4, serviceSpec.getComponents().size()); @@ -468,4 +480,741 @@ public void testAddQuicklinksWithTensorboard() throws Exception { YarnServiceJobSubmitter.TENSORBOARD_QUICKLINK_LABEL, "http://tensorboard-0.my-job.username.null:6006")); } + + /** + * Basic test. + * In one hand, create local temp file/dir for hdfs URI in + * local staging dir. + * In the other hand, use MockRemoteDirectoryManager mock + * implementation when check FileStatus or exists of HDFS file/dir + * --localization hdfs:///user/yarn/script1.py:. + * --localization /temp/script2.py:./ + * --localization /temp/script2.py:/opt/script.py + */ + @Test + public void testRunJobWithBasicLocalization() throws Exception { + String remoteUrl = "hdfs:///user/yarn/script1.py"; + String containerLocal1 = "."; + String localUrl = "/temp/script2.py"; + String containerLocal2 = "./"; + String containerLocal3 = "/opt/script.py"; + String fakeLocalDir = System.getProperty("java.io.tmpdir"); + // create local file, we need to put it under local temp dir + File localFile1 = new File(fakeLocalDir, + new Path(localUrl).getName()); + localFile1.createNewFile(); + + + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + RemoteDirectoryManager spyRdm = + spy(mockClientContext.getRemoteDirectoryManager()); + mockClientContext.setRemoteDirectoryMgr(spyRdm); + + // create remote file in local staging dir to simulate HDFS + Path stagingDir = mockClientContext.getRemoteDirectoryManager() + .getJobStagingArea("my-job", true); + File remoteFile1 = new File(stagingDir.toUri().getPath() + + "/" + new Path(remoteUrl).getName()); + remoteFile1.createNewFile(); + + Assert.assertTrue(localFile1.exists()); + Assert.assertTrue(remoteFile1.exists()); + + runJobCli.run( + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", "memory=2048M,vcores=2", + "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", + "ps.image", "--worker_docker_image", "worker.image", + "--ps_launch_cmd", "python run-ps.py", "--verbose", + "--localization", + remoteUrl + ":" + containerLocal1, + "--localization", + localFile1.getAbsolutePath() + ":" + containerLocal2, + "--localization", + localFile1.getAbsolutePath() + ":" + containerLocal3}); + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + Assert.assertEquals(3, serviceSpec.getComponents().size()); + + // No remote dir and hdfs file exists. Ensure download 0 times + verify(spyRdm, times(0)).copyRemoteToLocal( + anyString(), anyString()); + // Ensure local original files are not deleted + Assert.assertTrue(localFile1.exists()); + + List files = serviceSpec.getConfiguration().getFiles(); + Assert.assertEquals(3, files.size()); + ConfigFile file = files.get(0); + Assert.assertEquals(ConfigFile.TypeEnum.STATIC, file.getType()); + String expectedSrcLocalization = remoteUrl; + Assert.assertEquals(expectedSrcLocalization, + file.getSrcFile()); + String expectedDstFileName = new Path(remoteUrl).getName(); + Assert.assertEquals(expectedDstFileName, file.getDestFile()); + + file = files.get(1); + Assert.assertEquals(ConfigFile.TypeEnum.STATIC, file.getType()); + expectedSrcLocalization = stagingDir.toUri().getPath() + + "/" + new Path(localUrl).getName(); + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + expectedDstFileName = new Path(localUrl).getName(); + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + + file = files.get(2); + Assert.assertEquals(ConfigFile.TypeEnum.STATIC, file.getType()); + expectedSrcLocalization = stagingDir.toUri().getPath() + + "/" + new Path(localUrl).getName(); + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + expectedDstFileName = new Path(localUrl).getName(); + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + + // Ensure env value is correct + String env = serviceSpec.getConfiguration().getEnv() + .get("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS"); + String expectedMounts = new Path(containerLocal3).getName() + + ":" + containerLocal3 + ":rw"; + Assert.assertTrue(env.contains(expectedMounts)); + + remoteFile1.delete(); + localFile1.delete(); + } + + /** + * Non HDFS remote URI test. + * --localization https://a/b/1.patch:. + * --localization s3a://a/dir:/opt/mys3dir + */ + @Test + public void testRunJobWithNonHDFSRemoteLocalization() throws Exception { + String remoteUri1 = "https://a/b/1.patch"; + String containerLocal1 = "."; + String remoteUri2 = "s3a://a/s3dir"; + String containerLocal2 = "/opt/mys3dir"; + + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + RemoteDirectoryManager spyRdm = + spy(mockClientContext.getRemoteDirectoryManager()); + mockClientContext.setRemoteDirectoryMgr(spyRdm); + + // create remote file in local staging dir to simulate HDFS + Path stagingDir = mockClientContext.getRemoteDirectoryManager() + .getJobStagingArea("my-job", true); + File remoteFile1 = new File(stagingDir.toUri().getPath() + + "/" + new Path(remoteUri1).getName()); + remoteFile1.createNewFile(); + + File remoteDir1 = new File(stagingDir.toUri().getPath() + + "/" + new Path(remoteUri2).getName()); + remoteDir1.mkdir(); + File remoteDir1File1 = new File(remoteDir1, "afile"); + remoteDir1File1.createNewFile(); + + Assert.assertTrue(remoteFile1.exists()); + Assert.assertTrue(remoteDir1.exists()); + Assert.assertTrue(remoteDir1File1.exists()); + + String suffix1 = "_" + remoteDir1.lastModified() + + "-" + mockClientContext.getRemoteDirectoryManager() + .getRemoteFileSize(remoteUri2); + runJobCli.run( + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", "memory=2048M,vcores=2", + "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", + "ps.image", "--worker_docker_image", "worker.image", + "--ps_launch_cmd", "python run-ps.py", "--verbose", + "--localization", + remoteUri1 + ":" + containerLocal1, + "--localization", + remoteUri2 + ":" + containerLocal2}); + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + Assert.assertEquals(3, serviceSpec.getComponents().size()); + + // Ensure download remote dir 2 times + verify(spyRdm, times(2)).copyRemoteToLocal( + anyString(), anyString()); + + // Ensure downloaded temp files are deleted + Assert.assertFalse(new File(System.getProperty("java.io.tmpdir") + + "/" + new Path(remoteUri1).getName()).exists()); + Assert.assertFalse(new File(System.getProperty("java.io.tmpdir") + + "/" + new Path(remoteUri2).getName()).exists()); + + // Ensure zip file are deleted + Assert.assertFalse(new File(System.getProperty("java.io.tmpdir") + + "/" + new Path(remoteUri2).getName() + + "_" + suffix1 + ".zip").exists()); + + List files = serviceSpec.getConfiguration().getFiles(); + Assert.assertEquals(2, files.size()); + ConfigFile file = files.get(0); + Assert.assertEquals(ConfigFile.TypeEnum.STATIC, file.getType()); + String expectedSrcLocalization = stagingDir.toUri().getPath() + + "/" + new Path(remoteUri1).getName(); + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + String expectedDstFileName = new Path(remoteUri1).getName(); + Assert.assertEquals(expectedDstFileName, file.getDestFile()); + + file = files.get(1); + Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType()); + expectedSrcLocalization = stagingDir.toUri().getPath() + + "/" + new Path(remoteUri2).getName() + suffix1 + ".zip"; + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + + expectedDstFileName = new Path(containerLocal2).getName(); + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + + // Ensure env value is correct + String env = serviceSpec.getConfiguration().getEnv() + .get("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS"); + String expectedMounts = new Path(remoteUri2).getName() + + ":" + containerLocal2 + ":rw"; + Assert.assertTrue(env.contains(expectedMounts)); + + remoteDir1File1.delete(); + remoteFile1.delete(); + remoteDir1.delete(); + } + + /** + * Test HDFS dir localization. + * --localization hdfs:///user/yarn/mydir:./mydir1 + * --localization hdfs:///user/yarn/mydir2:/opt/dir2:rw + * --localization hdfs:///user/yarn/mydir:. + * --localization hdfs:///user/yarn/mydir2:./ + */ + @Test + public void testRunJobWithHdfsDirLocalization() throws Exception { + String remoteUrl = "hdfs:///user/yarn/mydir"; + String containerPath = "./mydir1"; + String remoteUrl2 = "hdfs:///user/yarn/mydir2"; + String containPath2 = "/opt/dir2"; + String containerPath3 = "."; + String containerPath4 = "./"; + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + RemoteDirectoryManager spyRdm = + spy(mockClientContext.getRemoteDirectoryManager()); + mockClientContext.setRemoteDirectoryMgr(spyRdm); + // create remote file in local staging dir to simulate HDFS + Path stagingDir = mockClientContext.getRemoteDirectoryManager() + .getJobStagingArea("my-job", true); + File remoteDir1 = new File(stagingDir.toUri().getPath().toString() + + "/" + new Path(remoteUrl).getName()); + remoteDir1.mkdir(); + File remoteFile1 = new File(remoteDir1.getAbsolutePath() + "/1.py"); + File remoteFile2 = new File(remoteDir1.getAbsolutePath() + "/2.py"); + remoteFile1.createNewFile(); + remoteFile2.createNewFile(); + + File remoteDir2 = new File(stagingDir.toUri().getPath().toString() + + "/" + new Path(remoteUrl2).getName()); + remoteDir2.mkdir(); + File remoteFile3 = new File(remoteDir1.getAbsolutePath() + "/3.py"); + File remoteFile4 = new File(remoteDir1.getAbsolutePath() + "/4.py"); + remoteFile3.createNewFile(); + remoteFile4.createNewFile(); + + Assert.assertTrue(remoteDir1.exists()); + Assert.assertTrue(remoteDir2.exists()); + + String suffix1 = "_" + remoteDir1.lastModified() + + "-" + mockClientContext.getRemoteDirectoryManager() + .getRemoteFileSize(remoteUrl); + String suffix2 = "_" + remoteDir2.lastModified() + + "-" + mockClientContext.getRemoteDirectoryManager() + .getRemoteFileSize(remoteUrl2); + runJobCli.run( + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", "memory=2048M,vcores=2", + "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", + "ps.image", "--worker_docker_image", "worker.image", + "--ps_launch_cmd", "python run-ps.py", "--verbose", + "--localization", + remoteUrl + ":" + containerPath, + "--localization", + remoteUrl2 + ":" + containPath2 + ":rw", + "--localization", + remoteUrl + ":" + containerPath3, + "--localization", + remoteUrl2 + ":" + containerPath4}); + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + Assert.assertEquals(3, serviceSpec.getComponents().size()); + + // Ensure download remote dir 4 times + verify(spyRdm, times(4)).copyRemoteToLocal( + anyString(), anyString()); + + // Ensure downloaded temp files are deleted + Assert.assertFalse(new File(System.getProperty("java.io.tmpdir") + + "/" + new Path(remoteUrl).getName()).exists()); + Assert.assertFalse(new File(System.getProperty("java.io.tmpdir") + + "/" + new Path(remoteUrl2).getName()).exists()); + // Ensure zip file are deleted + Assert.assertFalse(new File(System.getProperty("java.io.tmpdir") + + "/" + new Path(remoteUrl).getName() + + suffix1 + ".zip").exists()); + Assert.assertFalse(new File(System.getProperty("java.io.tmpdir") + + "/" + new Path(remoteUrl2).getName() + + suffix2 + ".zip").exists()); + + // Ensure files will be localized + List files = serviceSpec.getConfiguration().getFiles(); + Assert.assertEquals(4, files.size()); + ConfigFile file = files.get(0); + // The hdfs dir should be download and compress and let YARN to uncompress + Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType()); + String expectedSrcLocalization = stagingDir.toUri().getPath() + + "/" + new Path(remoteUrl).getName() + suffix1 + ".zip"; + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + + // Relative path in container, but not "." or "./". Use its own name + String expectedDstFileName = new Path(containerPath).getName(); + Assert.assertEquals(expectedDstFileName, file.getDestFile()); + + file = files.get(1); + Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType()); + expectedSrcLocalization = stagingDir.toUri().getPath() + + "/" + new Path(remoteUrl2).getName() + suffix2 + ".zip"; + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + + expectedDstFileName = new Path(containPath2).getName(); + Assert.assertEquals(expectedDstFileName, file.getDestFile()); + + file = files.get(2); + Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType()); + expectedSrcLocalization = stagingDir.toUri().getPath() + + "/" + new Path(remoteUrl).getName() + suffix1 + ".zip"; + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + // Relative path in container ".", use remote path name + expectedDstFileName = new Path(remoteUrl).getName(); + Assert.assertEquals(expectedDstFileName, file.getDestFile()); + + file = files.get(3); + Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType()); + expectedSrcLocalization = stagingDir.toUri().getPath() + + "/" + new Path(remoteUrl2).getName() + suffix2 + ".zip"; + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + // Relative path in container "./", use remote path name + expectedDstFileName = new Path(remoteUrl2).getName(); + Assert.assertEquals(expectedDstFileName, file.getDestFile()); + + // Ensure mounts env value is correct. Add one mount string + String env = serviceSpec.getConfiguration().getEnv() + .get("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS"); + + String expectedMounts = + new Path(containPath2).getName() + ":" + containPath2 + ":rw"; + Assert.assertTrue(env.contains(expectedMounts)); + + remoteFile1.delete(); + remoteFile2.delete(); + remoteFile3.delete(); + remoteFile4.delete(); + remoteDir1.delete(); + remoteDir2.delete(); + } + + /** + * Test if file/dir to be localized whose size exceeds limit. + * Max 10MB in configuration, mock remote will + * always return file size 100MB. + * This configuration will fail the job which has remoteUri + * But don't impact local dir/file + * + * --localization https://a/b/1.patch:. + * --localization s3a://a/dir:/opt/mys3dir + * --localization /temp/script2.py:./ + */ + @Test + public void testRunJobRemoteUriExceedLocalizationSize() throws Exception { + String remoteUri1 = "https://a/b/1.patch"; + String containerLocal1 = "."; + String remoteUri2 = "s3a://a/s3dir"; + String containerLocal2 = "/opt/mys3dir"; + String localUri1 = "/temp/script2"; + String containerLocal3 = "./"; + + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + SubmarineConfiguration submarineConf = new SubmarineConfiguration(); + RemoteDirectoryManager spyRdm = + spy(mockClientContext.getRemoteDirectoryManager()); + mockClientContext.setRemoteDirectoryMgr(spyRdm); + /** + * Max 10MB, mock remote will always return file size 100MB. + * */ + submarineConf.set( + SubmarineConfiguration.LOCALIZATION_MAX_ALLOWED_FILE_SIZE_MB, + "10"); + mockClientContext.setSubmarineConfig(submarineConf); + + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + // create remote file in local staging dir to simulate + Path stagingDir = mockClientContext.getRemoteDirectoryManager() + .getJobStagingArea("my-job", true); + File remoteFile1 = new File(stagingDir.toUri().getPath() + + "/" + new Path(remoteUri1).getName()); + remoteFile1.createNewFile(); + File remoteDir1 = new File(stagingDir.toUri().getPath() + + "/" + new Path(remoteUri2).getName()); + remoteDir1.mkdir(); + + File remoteDir1File1 = new File(remoteDir1, "afile"); + remoteDir1File1.createNewFile(); + + String fakeLocalDir = System.getProperty("java.io.tmpdir"); + // create local file, we need to put it under local temp dir + File localFile1 = new File(fakeLocalDir, + new Path(localUri1).getName()); + localFile1.createNewFile(); + + Assert.assertTrue(remoteFile1.exists()); + Assert.assertTrue(remoteDir1.exists()); + Assert.assertTrue(remoteDir1File1.exists()); + + String suffix1 = "_" + remoteDir1.lastModified() + + "-" + remoteDir1.length(); + try { + runJobCli = new RunJobCli(mockClientContext); + runJobCli.run( + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", + "memory=2048M,vcores=2", + "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", + "ps.image", "--worker_docker_image", "worker.image", + "--ps_launch_cmd", "python run-ps.py", "--verbose", + "--localization", + remoteUri1 + ":" + containerLocal1}); + } catch (IOException e) { + // Shouldn't have exception because it's within file size limit + Assert.assertFalse(true); + } + // we should download because fail fast + verify(spyRdm, times(1)).copyRemoteToLocal( + anyString(), anyString()); + try { + // reset + reset(spyRdm); + runJobCli = new RunJobCli(mockClientContext); + runJobCli.run( + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", + "memory=2048M,vcores=2", + "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", + "ps.image", "--worker_docker_image", "worker.image", + "--ps_launch_cmd", "python run-ps.py", "--verbose", + "--localization", + remoteUri1 + ":" + containerLocal1, + "--localization", + remoteUri2 + ":" + containerLocal2, + "--localization", + localFile1.getAbsolutePath() + ":" + containerLocal3}); + } catch (IOException e) { + Assert.assertTrue(e.getMessage() + .contains("104857600 exceeds configured max size:10485760")); + // we shouldn't do any download because fail fast + verify(spyRdm, times(0)).copyRemoteToLocal( + anyString(), anyString()); + } + + try { + runJobCli = new RunJobCli(mockClientContext); + runJobCli.run( + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", + "memory=2048M,vcores=2", + "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", + "ps.image", "--worker_docker_image", "worker.image", + "--ps_launch_cmd", "python run-ps.py", "--verbose", + "--localization", + localFile1.getAbsolutePath() + ":" + containerLocal3}); + } catch (IOException e) { + Assert.assertTrue(e.getMessage() + .contains("104857600 exceeds configured max size:10485760")); + // we shouldn't do any download because fail fast + verify(spyRdm, times(0)).copyRemoteToLocal( + anyString(), anyString()); + } + + localFile1.delete(); + remoteDir1File1.delete(); + remoteFile1.delete(); + remoteDir1.delete(); + } + + /** + * Test remote Uri doesn't exist. + * */ + @Test + public void testRunJobWithNonExistRemoteUri() throws Exception { + String remoteUri1 = "hdfs:///a/b/1.patch"; + String containerLocal1 = "."; + String localUri1 = "/a/b/c"; + String containerLocal2 = "./"; + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + try { + runJobCli.run( + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", + "memory=2048M,vcores=2", + "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", + "ps.image", "--worker_docker_image", "worker.image", + "--ps_launch_cmd", "python run-ps.py", "--verbose", + "--localization", + remoteUri1 + ":" + containerLocal1}); + } catch (IOException e) { + Assert.assertTrue(e.getMessage() + .contains("doesn't exists")); + } + + try { + runJobCli = new RunJobCli(mockClientContext); + runJobCli.run( + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", + "memory=2048M,vcores=2", + "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", + "ps.image", "--worker_docker_image", "worker.image", + "--ps_launch_cmd", "python run-ps.py", "--verbose", + "--localization", + localUri1 + ":" + containerLocal2}); + } catch (IOException e) { + Assert.assertTrue(e.getMessage() + .contains("doesn't exists")); + } + } + + /** + * Test local dir + * --localization /user/yarn/mydir:./mydir1 + * --localization /user/yarn/mydir2:/opt/dir2:rw + * --localization /user/yarn/mydir2:. + */ + @Test + public void testRunJobWithLocalDirLocalization() throws Exception { + String fakeLocalDir = System.getProperty("java.io.tmpdir"); + String localUrl = "/user/yarn/mydir"; + String containerPath = "./mydir1"; + String localUrl2 = "/user/yarn/mydir2"; + String containPath2 = "/opt/dir2"; + String containerPath3 = "."; + + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + Assert.assertFalse(SubmarineLogs.isVerbose()); + + RemoteDirectoryManager spyRdm = + spy(mockClientContext.getRemoteDirectoryManager()); + mockClientContext.setRemoteDirectoryMgr(spyRdm); + // create local file + File localDir1 = new File(fakeLocalDir, + localUrl); + localDir1.mkdirs(); + File temp1 = new File(localDir1.getAbsolutePath() + "/1.py"); + File temp2 = new File(localDir1.getAbsolutePath() + "/2.py"); + temp1.createNewFile(); + temp2.createNewFile(); + + File localDir2 = new File(fakeLocalDir, + localUrl2); + localDir2.mkdirs(); + File temp3 = new File(localDir1.getAbsolutePath() + "/3.py"); + File temp4 = new File(localDir1.getAbsolutePath() + "/4.py"); + temp3.createNewFile(); + temp4.createNewFile(); + + Assert.assertTrue(localDir1.exists()); + Assert.assertTrue(localDir2.exists()); + + String suffix1 = "_" + localDir1.lastModified() + + "-" + localDir1.length(); + String suffix2 = "_" + localDir2.lastModified() + + "-" + localDir2.length(); + + runJobCli.run( + new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0", + "--input_path", "s3://input", "--checkpoint_path", "s3://output", + "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd", + "python run-job.py", "--worker_resources", "memory=2048M,vcores=2", + "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image", + "ps.image", "--worker_docker_image", "worker.image", + "--ps_launch_cmd", "python run-ps.py", "--verbose", + "--localization", + fakeLocalDir + localUrl + ":" + containerPath, + "--localization", + fakeLocalDir + localUrl2 + ":" + containPath2 + ":rw", + "--localization", + fakeLocalDir + localUrl2 + ":" + containerPath3}); + + Service serviceSpec = getServiceSpecFromJobSubmitter( + runJobCli.getJobSubmitter()); + Assert.assertEquals(3, serviceSpec.getComponents().size()); + + // we shouldn't do any download + verify(spyRdm, times(0)).copyRemoteToLocal( + anyString(), anyString()); + + // Ensure local original files are not deleted + Assert.assertTrue(localDir1.exists()); + Assert.assertTrue(localDir2.exists()); + + // Ensure zip file are deleted + Assert.assertFalse(new File(System.getProperty("java.io.tmpdir") + + "/" + new Path(localUrl).getName() + + suffix1 + ".zip").exists()); + Assert.assertFalse(new File(System.getProperty("java.io.tmpdir") + + "/" + new Path(localUrl2).getName() + + suffix2 + ".zip").exists()); + + // Ensure dirs will be zipped and localized + List files = serviceSpec.getConfiguration().getFiles(); + Assert.assertEquals(3, files.size()); + ConfigFile file = files.get(0); + Path stagingDir = mockClientContext.getRemoteDirectoryManager() + .getJobStagingArea("my-job", true); + Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType()); + String expectedSrcLocalization = stagingDir.toUri().getPath() + + "/" + new Path(localUrl).getName() + suffix1 + ".zip"; + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + String expectedDstFileName = new Path(containerPath).getName(); + Assert.assertEquals(expectedDstFileName, file.getDestFile()); + + file = files.get(1); + Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType()); + expectedSrcLocalization = stagingDir.toUri().getPath() + + "/" + new Path(localUrl2).getName() + suffix2 + ".zip"; + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + expectedDstFileName = new Path(containPath2).getName(); + Assert.assertEquals(expectedDstFileName, file.getDestFile()); + + file = files.get(2); + Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType()); + expectedSrcLocalization = stagingDir.toUri().getPath() + + "/" + new Path(localUrl2).getName() + suffix2 + ".zip"; + Assert.assertEquals(expectedSrcLocalization, + new Path(file.getSrcFile()).toUri().getPath()); + expectedDstFileName = new Path(localUrl2).getName(); + Assert.assertEquals(expectedDstFileName, file.getDestFile()); + + // Ensure mounts env value is correct + String env = serviceSpec.getConfiguration().getEnv() + .get("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS"); + String expectedMounts = new Path(containPath2).getName() + + ":" + containPath2 + ":rw"; + + Assert.assertTrue(env.contains(expectedMounts)); + + temp1.delete(); + temp2.delete(); + temp3.delete(); + temp4.delete(); + localDir2.delete(); + localDir1.delete(); + } + + /** + * Test zip function. + * A dir "/user/yarn/mydir" has two files and one subdir + * */ + @Test + public void testYarnServiceSubmitterZipFunction() + throws Exception { + MockClientContext mockClientContext = + YarnServiceCliTestUtils.getMockClientContext(); + RunJobCli runJobCli = new RunJobCli(mockClientContext); + YarnServiceJobSubmitter submitter = + (YarnServiceJobSubmitter)mockClientContext + .getRuntimeFactory().getJobSubmitterInstance(); + String fakeLocalDir = System.getProperty("java.io.tmpdir"); + String localUrl = "/user/yarn/mydir"; + String localSubDirName = "subdir1"; + // create local file + File localDir1 = new File(fakeLocalDir, + localUrl); + localDir1.mkdirs(); + File temp1 = new File(localDir1.getAbsolutePath() + "/1.py"); + File temp2 = new File(localDir1.getAbsolutePath() + "/2.py"); + temp1.createNewFile(); + temp2.createNewFile(); + + + File localSubDir = new File(localDir1.getAbsolutePath(), localSubDirName); + localSubDir.mkdir(); + File temp3 = new File(localSubDir.getAbsolutePath(), "3.py"); + temp3.createNewFile(); + + + String zipFilePath = submitter.zipDir(localDir1.getAbsolutePath(), + fakeLocalDir + "/user/yarn/mydir.zip"); + File zipFile = new File(zipFilePath); + File unzipTargetDir = new File(fakeLocalDir, "unzipDir"); + FileUtil.unZip(zipFile, unzipTargetDir); + Assert.assertTrue( + new File(fakeLocalDir + "/unzipDir/1.py").exists()); + Assert.assertTrue( + new File(fakeLocalDir + "/unzipDir/2.py").exists()); + Assert.assertTrue( + new File(fakeLocalDir + "/unzipDir/subdir1").exists()); + Assert.assertTrue( + new File(fakeLocalDir + "/unzipDir/subdir1/3.py").exists()); + + zipFile.delete(); + unzipTargetDir.delete(); + temp1.delete(); + temp2.delete(); + temp3.delete(); + localSubDir.delete(); + localDir1.delete(); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java index b59c01e6e21..23c45d20e34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java @@ -31,7 +31,8 @@ import static org.mockito.Mockito.when; public class MockClientContext extends ClientContext { - private MockRemoteDirectoryManager remoteDirectoryMgr = + + private RemoteDirectoryManager remoteDirectoryMgr = new MockRemoteDirectoryManager(); @Override @@ -39,6 +40,11 @@ public RemoteDirectoryManager getRemoteDirectoryManager() { return remoteDirectoryMgr; } + public void setRemoteDirectoryMgr( + RemoteDirectoryManager remoteDirectoryMgr) { + this.remoteDirectoryMgr = remoteDirectoryMgr; + } + @Override public synchronized YarnClient getOrCreateYarnClient() { YarnClient client = mock(YarnClient.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java index b637036b671..43342932b22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java @@ -19,7 +19,9 @@ package org.apache.hadoop.yarn.submarine.common.fs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import java.io.File; @@ -29,6 +31,7 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager { private File jobsParentDir = null; private File modelParentDir = null; + private File jobDir = null; @Override public Path getJobStagingArea(String jobName, boolean create) throws IOException { @@ -41,10 +44,11 @@ public Path getJobStagingArea(String jobName, boolean create) } } - File jobDir = new File(jobsParentDir.getAbsolutePath(), jobName); + this.jobDir = new File(jobsParentDir.getAbsolutePath(), jobName); if (create && !jobDir.exists()) { if (!jobDir.mkdirs()) { - throw new IOException("Failed to mkdirs for " + jobDir.getAbsolutePath()); + throw new IOException("Failed to mkdirs for " + + jobDir.getAbsolutePath()); } } return new Path(jobDir.getAbsolutePath()); @@ -57,7 +61,8 @@ public Path getJobCheckpointDir(String jobName, boolean create) } @Override - public Path getModelDir(String modelName, boolean create) throws IOException { + public Path getModelDir(String modelName, boolean create) + throws IOException { if (modelParentDir == null && create) { modelParentDir = new File( "target/_models_" + System.currentTimeMillis()); @@ -70,19 +75,94 @@ public Path getModelDir(String modelName, boolean create) throws IOException { File modelDir = new File(modelParentDir.getAbsolutePath(), modelName); if (create) { if (!modelDir.exists() && !modelDir.mkdirs()) { - throw new IOException("Failed to mkdirs for " + modelDir.getAbsolutePath()); + throw new IOException("Failed to mkdirs for " + + modelDir.getAbsolutePath()); } } return new Path(modelDir.getAbsolutePath()); } @Override - public FileSystem getFileSystem() throws IOException { + public FileSystem getDefaultFileSystem() throws IOException { return FileSystem.getLocal(new Configuration()); } + @Override + public FileSystem getFileSystemByUri(String uri) throws IOException { + return getDefaultFileSystem(); + } + @Override public Path getUserRootFolder() throws IOException { return new Path("s3://generated_root_dir"); } + + @Override + public boolean isDir(String uri) throws IOException { + return getDefaultFileSystem().getFileStatus( + new Path(convertToStagingPath(uri))).isDirectory(); + + } + + @Override + public boolean isRemote(String uri) throws IOException { + String scheme = new Path(uri).toUri().getScheme(); + if (null == scheme) { + return false; + } + return !scheme.startsWith("file://"); + } + + private String convertToStagingPath(String uri) throws IOException { + String ret = uri; + if (isRemote(uri)) { + String dirName = new Path(uri).getName(); + ret = this.jobDir.getAbsolutePath() + + "/" + dirName; + } + return ret; + } + + /** + * We use staging dir as mock HDFS dir. + * */ + @Override + public boolean copyRemoteToLocal(String remoteUri, String localUri) + throws IOException { + // mock the copy from HDFS into a local copy + Path remoteToLocalDir = new Path(convertToStagingPath(remoteUri)); + File old = new File(convertToStagingPath(localUri)); + if (old.isDirectory() && old.exists()) { + if (!FileUtil.fullyDelete(old)) { + throw new IOException("Cannot delete temp dir:" + + old.getAbsolutePath()); + } + } + return FileUtil.copy(getDefaultFileSystem(), remoteToLocalDir, + new File(localUri), false, + getDefaultFileSystem().getConf()); + } + + @Override + public boolean existsRemoteFile(Path uri) throws IOException { + String fakeLocalFilePath = this.jobDir.getAbsolutePath() + + "/" + uri.getName(); + return new File(fakeLocalFilePath).exists(); + } + + @Override + public FileStatus getRemoteFileStatus(Path p) throws IOException { + return getDefaultFileSystem().getFileStatus(new Path( + convertToStagingPath(p.toUri().toString()))); + } + + @Override + public long getRemoteFileSize(String uri) throws IOException { + // 5 byte for this file to test + if (uri.equals("https://a/b/1.patch")) { + return 5; + } + return 100 * 1024 * 1024; + } + }