diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java
index 2d7a4724a4e..da4253be71b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliConstants.java
@@ -52,6 +52,7 @@ public class CliConstants {
public static final String QUICKLINK = "quicklink";
public static final String TENSORBOARD_DOCKER_IMAGE =
"tensorboard_docker_image";
+ public static final String LOCALIZATION = "localization";
public static final String KEYTAB = "keytab";
public static final String PRINCIPAL = "principal";
public static final String DISTRIBUTE_KEYTAB = "distribute_keytab";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java
index fc57be9f723..b764d6d8099 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/RunJobCli.java
@@ -125,6 +125,24 @@ public class RunJobCli extends AbstractCli {
+ "if want to link to first worker's 7070 port, and text of quicklink "
+ "is Notebook_UI, user need to specify --quicklink "
+ "Notebook_UI=https://master-0:7070");
+ options.addOption(CliConstants.LOCALIZATION, true, "Specify"
+ + " localization to make remote/local file/directory available to"
+ + " all container(Docker)."
+ + " Argument format is \"RemoteUri:LocalFilePath[:rw] \" (ro"
+ + " permission is not supported yet)"
+ + " The RemoteUri can be a file or directory in local or"
+ + " HDFS or s3 or abfs or http .etc."
+ + " The LocalFilePath can be absolute or relative."
+ + " If it's a relative path, it'll be"
+ + " under container's implied working directory"
+ + " but sub directory is not supported yet."
+ + " This option can be set mutiple times."
+ + " Examples are \n"
+ + "-localization \"hdfs:///user/yarn/mydir2:/opt/data\"\n"
+ + "-localization \"s3a:///a/b/myfile1:./\"\n"
+ + "-localization \"https:///a/b/myfile2:./myfile\"\n"
+ + "-localization \"/user/yarn/mydir3:/opt/mydir3\"\n"
+ + "-localization \"./mydir1:.\"\n");
options.addOption(CliConstants.KEYTAB, true, "Specify keytab used by the " +
"job under security environment");
options.addOption(CliConstants.PRINCIPAL, true, "Specify principal used " +
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/Localization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/Localization.java
new file mode 100644
index 00000000000..fe9c6de4123
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/Localization.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli.param;
+
+import org.apache.commons.cli.ParseException;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Localization parameter.
+ * */
+public class Localization {
+
+ private String mountPermissionPattern = "(wr|rw)$";
+ /**
+ * Regex for directory/file path in container.
+ * YARN only support absolute path for mount, but we can
+ * support some relative path.
+ * For relative path, we only allow ".", "./","./name".
+ * relative path like "./a/b" is not allowed.
+ * "." and "./" means original dir/file name in container working directory
+ * "./name" means use same or new "name" in container working directory
+ * A absolute path means same path in container filesystem
+ */
+ private String localPathPattern = "((^\\.$)|(^\\./$)|(^\\./[^/]+)|(^/.*))";
+ private String remoteUri;
+ private String localPath;
+
+ // Read write by default
+ private String mountPermission = "rw";
+
+ private static final List SUPPORTED_SCHEME = Arrays.asList(
+ "hdfs", "oss", "s3a", "s3n", "wasb",
+ "wasbs", "abfs", "abfss", "adl", "har",
+ "ftp", "http", "https", "viewfs", "swebhdfs",
+ "webhdfs", "swift");
+
+ public void parse(String arg) throws ParseException {
+ String[] tokens = arg.split(":");
+ int minimum = "a:b".split(":").length;
+ int minimumWithPermission = "a:b:rw".split(":").length;
+ int minimumParts = minimum;
+ int miniPartsWithRemoteScheme = "scheme://a:b".split(":").length;
+ int maximumParts = "scheme://a:b:rw".split(":").length;
+ // If remote uri starts with a remote scheme
+ if (isSupportedScheme(tokens[0])) {
+ minimumParts = miniPartsWithRemoteScheme;
+ }
+ if (tokens.length < minimumParts
+ || tokens.length > maximumParts) {
+ throw new ParseException("Invalid parameter,"
+ + "should be \"remoteUri:localPath[:rw|:wr]\" "
+ + "format for --localizations");
+ }
+
+ /**
+ * RemoteUri starts with remote scheme.
+ * Merge part 0 and 1 to build a hdfs path in token[0].
+ * toke[1] will be localPath to ease following logic
+ * */
+ if (minimumParts == miniPartsWithRemoteScheme) {
+ tokens[0] = tokens[0] + ":" + tokens[1];
+ tokens[1] = tokens[2];
+ if (tokens.length == maximumParts) {
+ // Has permission part
+ mountPermission = tokens[maximumParts - 1];
+ }
+ }
+ // RemoteUri starts with linux file path
+ if (minimumParts == minimum
+ && tokens.length == minimumWithPermission) {
+ // Has permission part
+ mountPermission = tokens[minimumWithPermission - 1];
+ }
+ remoteUri = tokens[0];
+ localPath = tokens[1];
+ if (!localPath.matches(localPathPattern)) {
+ throw new ParseException("Invalid local file path:"
+ + localPath
+ + ", it only support \".\", \"./\", \"./name\" and "
+ + "absolute path.");
+ }
+ if (!mountPermission.matches(mountPermissionPattern)) {
+ throw new ParseException("Invalid mount permission (ro is not "
+ + "supported yet), " + mountPermission);
+ }
+ }
+
+ public String getRemoteUri() {
+ return remoteUri;
+ }
+
+ public void setRemoteUri(String rUti) {
+ this.remoteUri = rUti;
+ }
+
+ public String getLocalPath() {
+ return localPath;
+ }
+
+ public void setLocalPath(String lPath) {
+ this.localPath = lPath;
+ }
+
+ public String getMountPermission() {
+ return mountPermission;
+ }
+
+ public void setMountPermission(String mPermission) {
+ this.mountPermission = mPermission;
+ }
+
+ private boolean isSupportedScheme(String scheme) {
+ return SUPPORTED_SCHEME.contains(scheme);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
index 6c8307f4efa..111d4ebd61a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/RunJobParameters.java
@@ -44,6 +44,7 @@ public class RunJobParameters extends RunParameters {
private String workerLaunchCmd;
private String psLaunchCmd;
private List quicklinks = new ArrayList<>();
+ private List localizations = new ArrayList<>();
private String psDockerImage = null;
private String workerDockerImage = null;
@@ -159,6 +160,16 @@ public class RunJobParameters extends RunParameters {
String psLaunchCommand = parsedCommandLine.getOptionValue(
CliConstants.PS_LAUNCH_CMD);
+ // Localizations
+ String[] localizationsStr = parsedCommandLine.getOptionValues(
+ CliConstants.LOCALIZATION);
+ if (null != localizationsStr) {
+ for (String loc : localizationsStr) {
+ Localization localization = new Localization();
+ localization.parse(loc);
+ localizations.add(localization);
+ }
+ }
boolean distributeKerberosKeytab = parsedCommandLine.hasOption(CliConstants
.DISTRIBUTE_KEYTAB);
@@ -288,6 +299,10 @@ public class RunJobParameters extends RunParameters {
return quicklinks;
}
+ public List getLocalizations() {
+ return localizations;
+ }
+
public String getKeytab() {
return keytab;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/package-info.java
new file mode 100644
index 00000000000..2df120f31c0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/param/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.submarine.client.cli.param;
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java
index c9e6b7bf6e6..7e70717a81b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/conf/SubmarineConfiguration.java
@@ -19,6 +19,21 @@ import org.apache.hadoop.conf.Configuration;
public class SubmarineConfiguration extends Configuration {
private static final String SUBMARINE_CONFIGURATION_FILE = "submarine.xml";
+ public static final String SUBMARINE_CONFIGURATION_PREFIX = "submarine.";
+
+ public static final String SUBMARINE_LOCALIZATION_PREFIX =
+ SUBMARINE_CONFIGURATION_PREFIX + "localization.";
+ /**
+ * Limit the size of directory/file to be localized.
+ * To avoid exhausting local disk space,
+ * this limit both remote and local file to be localized
+ */
+ public static final String LOCALIZATION_MAX_ALLOWED_FILE_SIZE_MB =
+ SUBMARINE_LOCALIZATION_PREFIX + "max-allowed-file-size-mb";
+
+ // Default 2GB
+ public static final long DEFAULT_MAX_ALLOWED_REMOTE_URI_SIZE_MB = 2048;
+
public SubmarineConfiguration() {
this(new Configuration(false), true);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
index 1d99b557c2f..bfb20da8766 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/DefaultRemoteDirectoryManager.java
@@ -14,22 +14,28 @@
package org.apache.hadoop.yarn.submarine.common.fs;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
import org.apache.hadoop.yarn.submarine.common.ClientContext;
+import java.io.File;
import java.io.IOException;
+import java.net.URI;
/**
* Manages remote directories for staging, log, etc.
* TODO, need to properly handle permission / name validation, etc.
*/
public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
- FileSystem fs;
+ private FileSystem fs;
+ private Configuration conf;
public DefaultRemoteDirectoryManager(ClientContext context) {
+ this.conf = context.getYarnConfig();
try {
this.fs = FileSystem.get(context.getYarnConfig());
} catch (IOException e) {
@@ -38,7 +44,8 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
}
@Override
- public Path getJobStagingArea(String jobName, boolean create) throws IOException {
+ public Path getJobStagingArea(String jobName, boolean create)
+ throws IOException {
Path staging = new Path(getJobRootFolder(jobName), "staging");
if (create) {
createFolderIfNotExist(staging);
@@ -61,7 +68,8 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
}
@Override
- public Path getModelDir(String modelName, boolean create) throws IOException {
+ public Path getModelDir(String modelName, boolean create)
+ throws IOException {
Path modelDir = new Path(new Path("submarine", "models"), modelName);
if (create) {
createFolderIfNotExist(modelDir);
@@ -70,10 +78,15 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
}
@Override
- public FileSystem getFileSystem() {
+ public FileSystem getDefaultFileSystem() {
return fs;
}
+ @Override
+ public FileSystem getFileSystemByUri(String uri) throws IOException {
+ return FileSystem.get(URI.create(uri), conf);
+ }
+
@Override
public Path getUserRootFolder() throws IOException {
Path rootPath = new Path("submarine", "jobs");
@@ -83,6 +96,55 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
return fStatus.getPath();
}
+ @Override
+ public boolean isDir(String uri) throws IOException {
+ if (isRemote(uri)) {
+ return getFileSystemByUri(uri).getFileStatus(new Path(uri)).isDirectory();
+ }
+ return new File(uri).isDirectory();
+ }
+
+ @Override
+ public boolean isRemote(String uri) {
+ String scheme = new Path(uri).toUri().getScheme();
+ if (null == scheme) {
+ return false;
+ }
+ return !scheme.startsWith("file://");
+ }
+
+ @Override
+ public boolean copyRemoteToLocal(String remoteUri, String localUri)
+ throws IOException {
+ // Delete old to avoid failure in FileUtil.copy
+ File old = new File(localUri);
+ if (old.exists()) {
+ if (!FileUtil.fullyDelete(old)) {
+ throw new IOException("Failed to delete dir:"
+ + old.getAbsolutePath());
+ }
+ }
+ return FileUtil.copy(getFileSystemByUri(remoteUri), new Path(remoteUri),
+ new File(localUri), false,
+ conf);
+ }
+
+ @Override
+ public boolean existsRemoteFile(Path url) throws IOException {
+ return getFileSystemByUri(url.toUri().toString()).exists(url);
+ }
+
+ @Override
+ public FileStatus getRemoteFileStatus(Path url) throws IOException {
+ return getFileSystemByUri(url.toUri().toString()).getFileStatus(url);
+ }
+
+ @Override
+ public long getRemoteFileSize(String uri) throws IOException {
+ return getFileSystemByUri(uri)
+ .getContentSummary(new Path(uri)).getSpaceConsumed();
+ }
+
private Path getJobRootFolder(String jobName) throws IOException {
Path userRoot = getUserRootFolder();
Path jobRootPath = new Path(userRoot, jobName);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java
index ad0d4280b3f..5f71d19d375 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/common/fs/RemoteDirectoryManager.java
@@ -14,6 +14,7 @@
package org.apache.hadoop.yarn.submarine.common.fs;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -26,7 +27,22 @@ public interface RemoteDirectoryManager {
Path getModelDir(String modelName, boolean create) throws IOException;
- FileSystem getFileSystem() throws IOException;
+ FileSystem getDefaultFileSystem() throws IOException;
+
+ FileSystem getFileSystemByUri(String uri) throws IOException;
Path getUserRootFolder() throws IOException;
+
+ boolean isDir(String uri) throws IOException;
+
+ boolean isRemote(String uri) throws IOException;
+
+ boolean copyRemoteToLocal(String remoteUri, String localUri)
+ throws IOException;
+
+ boolean existsRemoteFile(Path uri) throws IOException;
+
+ FileStatus getRemoteFileStatus(Path uri) throws IOException;
+
+ long getRemoteFileSize(String uri) throws IOException;
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java
index 18815107ffb..fb7c12f38a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/common/FSBasedSubmarineStorageImpl.java
@@ -42,7 +42,7 @@ public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
public void addNewJob(String jobName, Map jobInfo)
throws IOException {
Path jobInfoPath = getJobInfoPath(jobName, true);
- FSDataOutputStream fos = rdm.getFileSystem().create(jobInfoPath);
+ FSDataOutputStream fos = rdm.getDefaultFileSystem().create(jobInfoPath);
serializeMap(fos, jobInfo);
}
@@ -50,7 +50,7 @@ public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
public Map getJobInfoByName(String jobName)
throws IOException {
Path jobInfoPath = getJobInfoPath(jobName, false);
- FSDataInputStream fis = rdm.getFileSystem().open(jobInfoPath);
+ FSDataInputStream fis = rdm.getDefaultFileSystem().open(jobInfoPath);
return deserializeMap(fis);
}
@@ -58,7 +58,7 @@ public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
public void addNewModel(String modelName, String version,
Map modelInfo) throws IOException {
Path modelInfoPath = getModelInfoPath(modelName, version, true);
- FSDataOutputStream fos = rdm.getFileSystem().create(modelInfoPath);
+ FSDataOutputStream fos = rdm.getDefaultFileSystem().create(modelInfoPath);
serializeMap(fos, modelInfo);
}
@@ -66,7 +66,7 @@ public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
public Map getModelInfoByName(String modelName,
String version) throws IOException {
Path modelInfoPath = getModelInfoPath(modelName, version, false);
- FSDataInputStream fis = rdm.getFileSystem().open(modelInfoPath);
+ FSDataInputStream fis = rdm.getDefaultFileSystem().open(modelInfoPath);
return deserializeMap(fis);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
index 2e84c969b02..496ba7c43b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java
@@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -32,17 +33,21 @@ import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
+import org.apache.hadoop.yarn.submarine.client.cli.param.Localization;
import org.apache.hadoop.yarn.submarine.client.cli.param.Quicklink;
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
import org.apache.hadoop.yarn.submarine.common.ClientContext;
import org.apache.hadoop.yarn.submarine.common.Envs;
import org.apache.hadoop.yarn.submarine.common.api.TaskType;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration;
import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -56,6 +61,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS;
@@ -307,7 +314,8 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
}
private void locateRemoteFileToContainerWorkDir(String destFilename,
- Component comp, Path uploadedFilePath) throws IOException {
+ Component comp, Path uploadedFilePath)
+ throws IOException {
FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
FileStatus fileStatus = fs.getFileStatus(uploadedFilePath);
@@ -321,7 +329,9 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
private Path uploadToRemoteFile(Path stagingDir, String fileToUpload) throws
IOException {
- FileSystem fs = FileSystem.get(clientContext.getYarnConfig());
+ FileSystem fs = clientContext.getRemoteDirectoryManager()
+ .getDefaultFileSystem();
+
// Upload to remote FS under staging area
File localFile = new File(fileToUpload);
if (!localFile.exists()) {
@@ -368,6 +378,111 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
localScriptFile);
}
+ private String getLastNameFromPath(String srcFileStr) {
+ return new Path(srcFileStr).getName();
+ }
+
+ /**
+ * May download a remote uri(file/dir) and zip.
+ * Skip download if local dir
+ * Remote uri can be a local dir(won't download)
+ * or remote HDFS dir, s3 dir/file .etc
+ * */
+ private String mayDownloadAndZipIt(String remoteDir, String zipFileName,
+ boolean doZip)
+ throws IOException {
+ RemoteDirectoryManager rdm = clientContext.getRemoteDirectoryManager();
+ //Append original modification time and size to zip file name
+ String suffix;
+ String srcDir = remoteDir;
+ String zipDirPath =
+ System.getProperty("java.io.tmpdir") + "/" + zipFileName;
+ boolean needDeleteTempDir = false;
+ if (rdm.isRemote(remoteDir)) {
+ //Append original modification time and size to zip file name
+ FileStatus status = rdm.getRemoteFileStatus(new Path(remoteDir));
+ suffix = "_" + status.getModificationTime()
+ + "-" + rdm.getRemoteFileSize(remoteDir);
+ // Download them to temp dir
+ boolean downloaded = rdm.copyRemoteToLocal(remoteDir, zipDirPath);
+ if (!downloaded) {
+ throw new IOException("Failed to download files from "
+ + remoteDir);
+ }
+ LOG.info("Downloaded remote: {} to local: {}", remoteDir, zipDirPath);
+ srcDir = zipDirPath;
+ needDeleteTempDir = true;
+ } else {
+ File localDir = new File(remoteDir);
+ suffix = "_" + localDir.lastModified()
+ + "-" + localDir.length();
+ }
+ if (!doZip) {
+ return srcDir;
+ }
+ // zip a local dir
+ String zipFileUri = zipDir(srcDir, zipDirPath + suffix + ".zip");
+ // delete downloaded temp dir
+ if (needDeleteTempDir) {
+ deleteFiles(srcDir);
+ }
+ return zipFileUri;
+ }
+
+ @VisibleForTesting
+ public String zipDir(String srcDir, String dstFile) throws IOException {
+ FileOutputStream fos = new FileOutputStream(dstFile);
+ ZipOutputStream zos = new ZipOutputStream(fos);
+ File srcFile = new File(srcDir);
+ LOG.info("Compressing {}", srcDir);
+ addDirToZip(zos, srcFile, srcFile);
+ // close the ZipOutputStream
+ zos.close();
+ LOG.info("Compressed {} to {}", srcDir, dstFile);
+ return dstFile;
+ }
+
+ private void deleteFiles(String localUri) {
+ boolean success = FileUtil.fullyDelete(new File(localUri));
+ if (!success) {
+ LOG.warn("Fail to delete {}", localUri);
+ }
+ LOG.info("Deleted {}", localUri);
+ }
+
+ private void addDirToZip(ZipOutputStream zos, File srcFile, File base)
+ throws IOException {
+ File[] files = srcFile.listFiles();
+ if (null == files) {
+ return;
+ }
+ FileInputStream fis = null;
+ for (int i = 0; i < files.length; i++) {
+ // if it's directory, add recursively
+ if (files[i].isDirectory()) {
+ addDirToZip(zos, files[i], base);
+ continue;
+ }
+ byte[] buffer = new byte[1024];
+ try {
+ fis = new FileInputStream(files[i]);
+ String name = base.toURI().relativize(files[i].toURI()).getPath();
+ LOG.info(" Zip adding: " + name);
+ zos.putNextEntry(new ZipEntry(name));
+ int length;
+ while ((length = fis.read(buffer)) > 0) {
+ zos.write(buffer, 0, length);
+ }
+ zos.flush();
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ zos.closeEntry();
+ }
+ }
+ }
+
private void addWorkerComponent(Service service,
RunJobParameters parameters, TaskType taskType) throws IOException {
Component workerComponent = new Component();
@@ -498,6 +613,8 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
handleServiceEnvs(serviceSpec, parameters);
+ handleLocalizations(parameters);
+
if (parameters.getNumWorkers() > 0) {
addWorkerComponents(serviceSpec, parameters);
}
@@ -553,6 +670,142 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
return serviceSpec;
}
+ /**
+ * Localize dependencies for all containers.
+ * If remoteUri is a local directory,
+ * we'll zip it, upload to HDFS staging dir HDFS.
+ * If remoteUri is directory, we'll download it, zip it and upload
+ * to HDFS.
+ * If localFilePath is ".", we'll use remoteUri's file/dir name
+ * */
+ private void handleLocalizations(RunJobParameters parameters)
+ throws IOException {
+ // Handle localizations
+ Path stagingDir =
+ clientContext.getRemoteDirectoryManager().getJobStagingArea(
+ parameters.getName(), true);
+ List locs = parameters.getLocalizations();
+ String remoteUri;
+ String containerLocalPath;
+ RemoteDirectoryManager rdm = clientContext.getRemoteDirectoryManager();
+
+ // Check to fail fast
+ for (Localization loc : locs) {
+ remoteUri = loc.getRemoteUri();
+ Path resourceToLocalize = new Path(remoteUri);
+ // Check if remoteUri exists
+ if (rdm.isRemote(remoteUri)) {
+ // check if exists
+ if (!rdm.existsRemoteFile(resourceToLocalize)) {
+ throw new FileNotFoundException(
+ "File " + remoteUri + " doesn't exists.");
+ }
+ } else {
+ // Check if exists
+ File localFile = new File(remoteUri);
+ if (!localFile.exists()) {
+ throw new FileNotFoundException(
+ "File " + remoteUri + " doesn't exists.");
+ }
+ }
+ // check remote file size
+ validFileSize(remoteUri);
+ }
+ // Start download remote if needed and upload to HDFS
+ for (Localization loc : locs) {
+ remoteUri = loc.getRemoteUri();
+ containerLocalPath = loc.getLocalPath();
+ String srcFileStr = remoteUri;
+ ConfigFile.TypeEnum destFileType = ConfigFile.TypeEnum.STATIC;
+ Path resourceToLocalize = new Path(remoteUri);
+ boolean needUploadToHDFS = true;
+
+ /**
+ * Special handling for remoteUri directory.
+ * */
+ boolean needDeleteTempFile = false;
+ if (rdm.isDir(remoteUri)) {
+ destFileType = ConfigFile.TypeEnum.ARCHIVE;
+ srcFileStr = mayDownloadAndZipIt(
+ remoteUri, getLastNameFromPath(srcFileStr), true);
+ } else if (rdm.isRemote(remoteUri)) {
+ if (!needHdfs(remoteUri)) {
+ // Non HDFS remote uri. Non directory, no need to zip
+ srcFileStr = mayDownloadAndZipIt(
+ remoteUri, getLastNameFromPath(srcFileStr), false);
+ needDeleteTempFile = true;
+ } else {
+ // HDFS file, no need to upload
+ needUploadToHDFS = false;
+ }
+ }
+
+ // Upload file to HDFS
+ if (needUploadToHDFS) {
+ resourceToLocalize = uploadToRemoteFile(stagingDir, srcFileStr);
+ }
+ if (needDeleteTempFile) {
+ deleteFiles(srcFileStr);
+ }
+ // Remove .zip from zipped dir name
+ if (destFileType == ConfigFile.TypeEnum.ARCHIVE
+ && srcFileStr.endsWith(".zip")) {
+ // Delete local zip file
+ deleteFiles(srcFileStr);
+ int suffixIndex = srcFileStr.lastIndexOf('_');
+ srcFileStr = srcFileStr.substring(0, suffixIndex);
+ }
+ // If provided, use the name of local uri
+ if (!containerLocalPath.equals(".")
+ && !containerLocalPath.equals("./")) {
+ // Change the YARN localized file name to what'll used in container
+ srcFileStr = getLastNameFromPath(containerLocalPath);
+ }
+ String localizedName = getLastNameFromPath(srcFileStr);
+ LOG.info("The file/dir to be localized is {}",
+ resourceToLocalize.toString());
+ LOG.info("Its localized file name will be {}", localizedName);
+ serviceSpec.getConfiguration().getFiles().add(new ConfigFile().srcFile(
+ resourceToLocalize.toUri().toString()).destFile(localizedName)
+ .type(destFileType));
+ // set mounts
+ // if mount path is absolute, just use it.
+ // if relative, no need to mount explicitly
+ if (containerLocalPath.startsWith("/")) {
+ String mountStr = getLastNameFromPath(srcFileStr) + ":"
+ + containerLocalPath + ":" + loc.getMountPermission();
+ LOG.info("Add bind-mount string {}", mountStr);
+ appendToEnv(serviceSpec, "YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS",
+ mountStr, ",");
+ }
+ }
+ }
+
+ private void validFileSize(String uri) throws IOException {
+ RemoteDirectoryManager rdm = clientContext.getRemoteDirectoryManager();
+ long actualSizeByte;
+ String locationType = "Local";
+ if (rdm.isRemote(uri)) {
+ actualSizeByte = clientContext.getRemoteDirectoryManager()
+ .getRemoteFileSize(uri);
+ locationType = "Remote";
+ } else {
+ actualSizeByte = FileUtil.getDU(new File(uri));
+ }
+ long maxFileSizeMB = clientContext.getSubmarineConfig()
+ .getLong(SubmarineConfiguration.LOCALIZATION_MAX_ALLOWED_FILE_SIZE_MB,
+ SubmarineConfiguration.DEFAULT_MAX_ALLOWED_REMOTE_URI_SIZE_MB);
+ LOG.info("{} fie/dir: {}, size(Byte):{},"
+ + " Allowed max file/dir size: {}",
+ locationType, uri, actualSizeByte, maxFileSizeMB * 1024 * 1024);
+
+ if (actualSizeByte > maxFileSizeMB * 1024 * 1024) {
+ throw new IOException(uri + " size(Byte): "
+ + actualSizeByte + " exceeds configured max size:"
+ + maxFileSizeMB * 1024 * 1024);
+ }
+ }
+
private String generateServiceSpecFile(Service service) throws IOException {
File serviceSpecFile = File.createTempFile(service.getName(), ".json");
String buffer = jsonSerDeser.toJson(service);
@@ -622,8 +875,6 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
return appid;
}
-
-
@VisibleForTesting
public Service getServiceSpec() {
return serviceSpec;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/markdown/QuickStart.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/markdown/QuickStart.md
index da4fb95a04f..8e7a9566b81 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/markdown/QuickStart.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/site/markdown/QuickStart.md
@@ -70,7 +70,33 @@ usage: job run
directly used to launch the worker
-worker_resources Resource of each worker, for example
memory-mb=2048,vcores=2,yarn.io/gpu=2
+ -localization Specify localization to remote/local
+ file/directory available to all container(Docker).
+ Argument format is "RemoteUri:LocalFilePath[:rw]"
+ (ro permission is not supported yet).
+ The RemoteUri can be a file or directory in local
+ or HDFS or s3 or abfs or http .etc.
+ The LocalFilePath can be absolute or relative.
+ If relative, it'll be under container's implied
+ working directory.
+ This option can be set mutiple times.
+ Examples are
+ -localization "hdfs:///user/yarn/mydir2:/opt/data"
+ -localization "s3a:///a/b/myfile1:./"
+ -localization "https:///a/b/myfile2:./myfile"
+ -localization "/user/yarn/mydir3:/opt/mydir3"
+ -localization "./mydir1:."
```
+### Submarine Configuration
+
+For submarine internal configuration, please create a `submarine.xml` which should be placed under `$HADOOP_CONF_DIR`.
+
+|Configuration Name | Description |
+|:---- |:---- |
+| `submarine.runtime.class` | Optional. Full qualified class name for your runtime factory. |
+| `submarine.localization.max-allowed-file-size-mb` | Optional. This sets a size limit to the file/directory to be localized in "-localization" CLI option. 2GB by default. |
+
+
### Launch Standalone Tensorflow Application:
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
index 439103042a0..f3d140975fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/yarnservice/TestYarnServiceRunJobCli.java
@@ -19,15 +19,20 @@
package org.apache.hadoop.yarn.submarine.client.cli.yarnservice;
import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.submarine.client.cli.RunJobCli;
import org.apache.hadoop.yarn.submarine.common.MockClientContext;
import org.apache.hadoop.yarn.submarine.common.api.TaskType;
+import org.apache.hadoop.yarn.submarine.common.conf.SubmarineConfiguration;
import org.apache.hadoop.yarn.submarine.common.conf.SubmarineLogs;
+import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants;
import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
@@ -38,15 +43,22 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.List;
import java.util.Map;
import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestYarnServiceRunJobCli {
@@ -137,13 +149,13 @@ public class TestYarnServiceRunJobCli {
Assert.assertFalse(SubmarineLogs.isVerbose());
runJobCli.run(
- new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
"--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
"python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
"--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
"ps.image", "--worker_docker_image", "worker.image",
- "--ps_launch_cmd", "python run-ps.py", "--verbose" });
+ "--ps_launch_cmd", "python run-ps.py", "--verbose"});
Service serviceSpec = getServiceSpecFromJobSubmitter(
runJobCli.getJobSubmitter());
Assert.assertEquals(3, serviceSpec.getComponents().size());
@@ -162,14 +174,14 @@ public class TestYarnServiceRunJobCli {
Assert.assertFalse(SubmarineLogs.isVerbose());
runJobCli.run(
- new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
"--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
"python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
"--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
"ps.image", "--worker_docker_image", "worker.image",
"--tensorboard", "--ps_launch_cmd", "python run-ps.py",
- "--verbose" });
+ "--verbose"});
Service serviceSpec = getServiceSpecFromJobSubmitter(
runJobCli.getJobSubmitter());
Assert.assertEquals(4, serviceSpec.getComponents().size());
@@ -192,10 +204,10 @@ public class TestYarnServiceRunJobCli {
Assert.assertFalse(SubmarineLogs.isVerbose());
runJobCli.run(
- new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
"--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=2G,vcores=2", "--verbose" });
+ "--worker_resources", "memory=2G,vcores=2", "--verbose"});
Service serviceSpec = getServiceSpecFromJobSubmitter(
runJobCli.getJobSubmitter());
@@ -212,9 +224,9 @@ public class TestYarnServiceRunJobCli {
Assert.assertFalse(SubmarineLogs.isVerbose());
runJobCli.run(
- new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
- "--num_workers", "0", "--tensorboard", "--verbose" });
+ "--num_workers", "0", "--tensorboard", "--verbose"});
Service serviceSpec = getServiceSpecFromJobSubmitter(
runJobCli.getJobSubmitter());
@@ -233,11 +245,11 @@ public class TestYarnServiceRunJobCli {
Assert.assertFalse(SubmarineLogs.isVerbose());
runJobCli.run(
- new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
"--num_workers", "0", "--tensorboard", "--verbose",
"--tensorboard_resources", "memory=2G,vcores=2",
- "--tensorboard_docker_image", "tb_docker_image:001" });
+ "--tensorboard_docker_image", "tb_docker_image:001"});
Service serviceSpec = getServiceSpecFromJobSubmitter(
runJobCli.getJobSubmitter());
@@ -256,10 +268,10 @@ public class TestYarnServiceRunJobCli {
Assert.assertFalse(SubmarineLogs.isVerbose());
runJobCli.run(
- new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
"--num_workers", "0", "--tensorboard", "--verbose",
"--tensorboard_resources", "memory=2G,vcores=2",
- "--tensorboard_docker_image", "tb_docker_image:001" });
+ "--tensorboard_docker_image", "tb_docker_image:001"});
Service serviceSpec = getServiceSpecFromJobSubmitter(
runJobCli.getJobSubmitter());
@@ -307,7 +319,7 @@ public class TestYarnServiceRunJobCli {
Assert.assertEquals(
runJobCli.getRunJobParameters().getTensorboardDockerImage(),
tensorboardComp.getArtifact().getId());
- } else{
+ } else {
Assert.assertNull(tensorboardComp.getArtifact());
}
@@ -352,11 +364,11 @@ public class TestYarnServiceRunJobCli {
Assert.assertFalse(SubmarineLogs.isVerbose());
runJobCli.run(
- new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
"--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
"--worker_resources", "memory=2G,vcores=2", "--tensorboard",
- "--verbose" });
+ "--verbose"});
Service serviceSpec = getServiceSpecFromJobSubmitter(
runJobCli.getJobSubmitter());
@@ -376,10 +388,10 @@ public class TestYarnServiceRunJobCli {
Assert.assertFalse(SubmarineLogs.isVerbose());
runJobCli.run(
- new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
"--input_path", "s3://input", "--num_workers", "1",
"--worker_launch_cmd", "python run-job.py", "--worker_resources",
- "memory=2G,vcores=2", "--tensorboard", "--verbose" });
+ "memory=2G,vcores=2", "--tensorboard", "--verbose"});
Service serviceSpec = getServiceSpecFromJobSubmitter(
runJobCli.getJobSubmitter());
@@ -398,11 +410,11 @@ public class TestYarnServiceRunJobCli {
Assert.assertFalse(SubmarineLogs.isVerbose());
runJobCli.run(
- new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
"--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
"--worker_resources", "memory=2G,vcores=2", "--tensorboard", "true",
- "--verbose" });
+ "--verbose"});
SubmarineStorage storage =
mockClientContext.getRuntimeFactory().getSubmarineStorage();
Map jobInfo = storage.getJobInfoByName("my-job");
@@ -419,7 +431,7 @@ public class TestYarnServiceRunJobCli {
Assert.assertFalse(SubmarineLogs.isVerbose());
runJobCli.run(
- new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
"--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
"python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
@@ -427,7 +439,7 @@ public class TestYarnServiceRunJobCli {
"ps.image", "--worker_docker_image", "worker.image",
"--ps_launch_cmd", "python run-ps.py", "--verbose", "--quicklink",
"AAA=http://master-0:8321", "--quicklink",
- "BBB=http://worker-0:1234" });
+ "BBB=http://worker-0:1234"});
Service serviceSpec = getServiceSpecFromJobSubmitter(
runJobCli.getJobSubmitter());
Assert.assertEquals(3, serviceSpec.getComponents().size());
@@ -447,7 +459,7 @@ public class TestYarnServiceRunJobCli {
Assert.assertFalse(SubmarineLogs.isVerbose());
runJobCli.run(
- new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
"--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
"python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
@@ -455,7 +467,7 @@ public class TestYarnServiceRunJobCli {
"ps.image", "--worker_docker_image", "worker.image",
"--ps_launch_cmd", "python run-ps.py", "--verbose", "--quicklink",
"AAA=http://master-0:8321", "--quicklink",
- "BBB=http://worker-0:1234", "--tensorboard" });
+ "BBB=http://worker-0:1234", "--tensorboard"});
Service serviceSpec = getServiceSpecFromJobSubmitter(
runJobCli.getJobSubmitter());
Assert.assertEquals(4, serviceSpec.getComponents().size());
@@ -468,4 +480,741 @@ public class TestYarnServiceRunJobCli {
YarnServiceJobSubmitter.TENSORBOARD_QUICKLINK_LABEL,
"http://tensorboard-0.my-job.username.null:6006"));
}
+
+ /**
+ * Basic test.
+ * In one hand, create local temp file/dir for hdfs URI in
+ * local staging dir.
+ * In the other hand, use MockRemoteDirectoryManager mock
+ * implementation when check FileStatus or exists of HDFS file/dir
+ * --localization hdfs:///user/yarn/script1.py:.
+ * --localization /temp/script2.py:./
+ * --localization /temp/script2.py:/opt/script.py
+ */
+ @Test
+ public void testRunJobWithBasicLocalization() throws Exception {
+ String remoteUrl = "hdfs:///user/yarn/script1.py";
+ String containerLocal1 = ".";
+ String localUrl = "/temp/script2.py";
+ String containerLocal2 = "./";
+ String containerLocal3 = "/opt/script.py";
+ String fakeLocalDir = System.getProperty("java.io.tmpdir");
+ // create local file, we need to put it under local temp dir
+ File localFile1 = new File(fakeLocalDir,
+ new Path(localUrl).getName());
+ localFile1.createNewFile();
+
+
+ MockClientContext mockClientContext =
+ YarnServiceCliTestUtils.getMockClientContext();
+ RunJobCli runJobCli = new RunJobCli(mockClientContext);
+ Assert.assertFalse(SubmarineLogs.isVerbose());
+
+ RemoteDirectoryManager spyRdm =
+ spy(mockClientContext.getRemoteDirectoryManager());
+ mockClientContext.setRemoteDirectoryMgr(spyRdm);
+
+ // create remote file in local staging dir to simulate HDFS
+ Path stagingDir = mockClientContext.getRemoteDirectoryManager()
+ .getJobStagingArea("my-job", true);
+ File remoteFile1 = new File(stagingDir.toUri().getPath()
+ + "/" + new Path(remoteUrl).getName());
+ remoteFile1.createNewFile();
+
+ Assert.assertTrue(localFile1.exists());
+ Assert.assertTrue(remoteFile1.exists());
+
+ runJobCli.run(
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+ "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+ "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
+ "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+ "ps.image", "--worker_docker_image", "worker.image",
+ "--ps_launch_cmd", "python run-ps.py", "--verbose",
+ "--localization",
+ remoteUrl + ":" + containerLocal1,
+ "--localization",
+ localFile1.getAbsolutePath() + ":" + containerLocal2,
+ "--localization",
+ localFile1.getAbsolutePath() + ":" + containerLocal3});
+ Service serviceSpec = getServiceSpecFromJobSubmitter(
+ runJobCli.getJobSubmitter());
+ Assert.assertEquals(3, serviceSpec.getComponents().size());
+
+ // No remote dir and hdfs file exists. Ensure download 0 times
+ verify(spyRdm, times(0)).copyRemoteToLocal(
+ anyString(), anyString());
+ // Ensure local original files are not deleted
+ Assert.assertTrue(localFile1.exists());
+
+ List files = serviceSpec.getConfiguration().getFiles();
+ Assert.assertEquals(3, files.size());
+ ConfigFile file = files.get(0);
+ Assert.assertEquals(ConfigFile.TypeEnum.STATIC, file.getType());
+ String expectedSrcLocalization = remoteUrl;
+ Assert.assertEquals(expectedSrcLocalization,
+ file.getSrcFile());
+ String expectedDstFileName = new Path(remoteUrl).getName();
+ Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+ file = files.get(1);
+ Assert.assertEquals(ConfigFile.TypeEnum.STATIC, file.getType());
+ expectedSrcLocalization = stagingDir.toUri().getPath()
+ + "/" + new Path(localUrl).getName();
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+ expectedDstFileName = new Path(localUrl).getName();
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+
+ file = files.get(2);
+ Assert.assertEquals(ConfigFile.TypeEnum.STATIC, file.getType());
+ expectedSrcLocalization = stagingDir.toUri().getPath()
+ + "/" + new Path(localUrl).getName();
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+ expectedDstFileName = new Path(localUrl).getName();
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+
+ // Ensure env value is correct
+ String env = serviceSpec.getConfiguration().getEnv()
+ .get("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS");
+ String expectedMounts = new Path(containerLocal3).getName()
+ + ":" + containerLocal3 + ":rw";
+ Assert.assertTrue(env.contains(expectedMounts));
+
+ remoteFile1.delete();
+ localFile1.delete();
+ }
+
+ /**
+ * Non HDFS remote URI test.
+ * --localization https://a/b/1.patch:.
+ * --localization s3a://a/dir:/opt/mys3dir
+ */
+ @Test
+ public void testRunJobWithNonHDFSRemoteLocalization() throws Exception {
+ String remoteUri1 = "https://a/b/1.patch";
+ String containerLocal1 = ".";
+ String remoteUri2 = "s3a://a/s3dir";
+ String containerLocal2 = "/opt/mys3dir";
+
+ MockClientContext mockClientContext =
+ YarnServiceCliTestUtils.getMockClientContext();
+ RunJobCli runJobCli = new RunJobCli(mockClientContext);
+ Assert.assertFalse(SubmarineLogs.isVerbose());
+
+ RemoteDirectoryManager spyRdm =
+ spy(mockClientContext.getRemoteDirectoryManager());
+ mockClientContext.setRemoteDirectoryMgr(spyRdm);
+
+ // create remote file in local staging dir to simulate HDFS
+ Path stagingDir = mockClientContext.getRemoteDirectoryManager()
+ .getJobStagingArea("my-job", true);
+ File remoteFile1 = new File(stagingDir.toUri().getPath()
+ + "/" + new Path(remoteUri1).getName());
+ remoteFile1.createNewFile();
+
+ File remoteDir1 = new File(stagingDir.toUri().getPath()
+ + "/" + new Path(remoteUri2).getName());
+ remoteDir1.mkdir();
+ File remoteDir1File1 = new File(remoteDir1, "afile");
+ remoteDir1File1.createNewFile();
+
+ Assert.assertTrue(remoteFile1.exists());
+ Assert.assertTrue(remoteDir1.exists());
+ Assert.assertTrue(remoteDir1File1.exists());
+
+ String suffix1 = "_" + remoteDir1.lastModified()
+ + "-" + mockClientContext.getRemoteDirectoryManager()
+ .getRemoteFileSize(remoteUri2);
+ runJobCli.run(
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+ "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+ "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
+ "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+ "ps.image", "--worker_docker_image", "worker.image",
+ "--ps_launch_cmd", "python run-ps.py", "--verbose",
+ "--localization",
+ remoteUri1 + ":" + containerLocal1,
+ "--localization",
+ remoteUri2 + ":" + containerLocal2});
+ Service serviceSpec = getServiceSpecFromJobSubmitter(
+ runJobCli.getJobSubmitter());
+ Assert.assertEquals(3, serviceSpec.getComponents().size());
+
+ // Ensure download remote dir 2 times
+ verify(spyRdm, times(2)).copyRemoteToLocal(
+ anyString(), anyString());
+
+ // Ensure downloaded temp files are deleted
+ Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+ + "/" + new Path(remoteUri1).getName()).exists());
+ Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+ + "/" + new Path(remoteUri2).getName()).exists());
+
+ // Ensure zip file are deleted
+ Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+ + "/" + new Path(remoteUri2).getName()
+ + "_" + suffix1 + ".zip").exists());
+
+ List files = serviceSpec.getConfiguration().getFiles();
+ Assert.assertEquals(2, files.size());
+ ConfigFile file = files.get(0);
+ Assert.assertEquals(ConfigFile.TypeEnum.STATIC, file.getType());
+ String expectedSrcLocalization = stagingDir.toUri().getPath()
+ + "/" + new Path(remoteUri1).getName();
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+ String expectedDstFileName = new Path(remoteUri1).getName();
+ Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+ file = files.get(1);
+ Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+ expectedSrcLocalization = stagingDir.toUri().getPath()
+ + "/" + new Path(remoteUri2).getName() + suffix1 + ".zip";
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+
+ expectedDstFileName = new Path(containerLocal2).getName();
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+
+ // Ensure env value is correct
+ String env = serviceSpec.getConfiguration().getEnv()
+ .get("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS");
+ String expectedMounts = new Path(remoteUri2).getName()
+ + ":" + containerLocal2 + ":rw";
+ Assert.assertTrue(env.contains(expectedMounts));
+
+ remoteDir1File1.delete();
+ remoteFile1.delete();
+ remoteDir1.delete();
+ }
+
+ /**
+ * Test HDFS dir localization.
+ * --localization hdfs:///user/yarn/mydir:./mydir1
+ * --localization hdfs:///user/yarn/mydir2:/opt/dir2:rw
+ * --localization hdfs:///user/yarn/mydir:.
+ * --localization hdfs:///user/yarn/mydir2:./
+ */
+ @Test
+ public void testRunJobWithHdfsDirLocalization() throws Exception {
+ String remoteUrl = "hdfs:///user/yarn/mydir";
+ String containerPath = "./mydir1";
+ String remoteUrl2 = "hdfs:///user/yarn/mydir2";
+ String containPath2 = "/opt/dir2";
+ String containerPath3 = ".";
+ String containerPath4 = "./";
+ MockClientContext mockClientContext =
+ YarnServiceCliTestUtils.getMockClientContext();
+ RunJobCli runJobCli = new RunJobCli(mockClientContext);
+ Assert.assertFalse(SubmarineLogs.isVerbose());
+
+ RemoteDirectoryManager spyRdm =
+ spy(mockClientContext.getRemoteDirectoryManager());
+ mockClientContext.setRemoteDirectoryMgr(spyRdm);
+ // create remote file in local staging dir to simulate HDFS
+ Path stagingDir = mockClientContext.getRemoteDirectoryManager()
+ .getJobStagingArea("my-job", true);
+ File remoteDir1 = new File(stagingDir.toUri().getPath().toString()
+ + "/" + new Path(remoteUrl).getName());
+ remoteDir1.mkdir();
+ File remoteFile1 = new File(remoteDir1.getAbsolutePath() + "/1.py");
+ File remoteFile2 = new File(remoteDir1.getAbsolutePath() + "/2.py");
+ remoteFile1.createNewFile();
+ remoteFile2.createNewFile();
+
+ File remoteDir2 = new File(stagingDir.toUri().getPath().toString()
+ + "/" + new Path(remoteUrl2).getName());
+ remoteDir2.mkdir();
+ File remoteFile3 = new File(remoteDir1.getAbsolutePath() + "/3.py");
+ File remoteFile4 = new File(remoteDir1.getAbsolutePath() + "/4.py");
+ remoteFile3.createNewFile();
+ remoteFile4.createNewFile();
+
+ Assert.assertTrue(remoteDir1.exists());
+ Assert.assertTrue(remoteDir2.exists());
+
+ String suffix1 = "_" + remoteDir1.lastModified()
+ + "-" + mockClientContext.getRemoteDirectoryManager()
+ .getRemoteFileSize(remoteUrl);
+ String suffix2 = "_" + remoteDir2.lastModified()
+ + "-" + mockClientContext.getRemoteDirectoryManager()
+ .getRemoteFileSize(remoteUrl2);
+ runJobCli.run(
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+ "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+ "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
+ "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+ "ps.image", "--worker_docker_image", "worker.image",
+ "--ps_launch_cmd", "python run-ps.py", "--verbose",
+ "--localization",
+ remoteUrl + ":" + containerPath,
+ "--localization",
+ remoteUrl2 + ":" + containPath2 + ":rw",
+ "--localization",
+ remoteUrl + ":" + containerPath3,
+ "--localization",
+ remoteUrl2 + ":" + containerPath4});
+ Service serviceSpec = getServiceSpecFromJobSubmitter(
+ runJobCli.getJobSubmitter());
+ Assert.assertEquals(3, serviceSpec.getComponents().size());
+
+ // Ensure download remote dir 4 times
+ verify(spyRdm, times(4)).copyRemoteToLocal(
+ anyString(), anyString());
+
+ // Ensure downloaded temp files are deleted
+ Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+ + "/" + new Path(remoteUrl).getName()).exists());
+ Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+ + "/" + new Path(remoteUrl2).getName()).exists());
+ // Ensure zip file are deleted
+ Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+ + "/" + new Path(remoteUrl).getName()
+ + suffix1 + ".zip").exists());
+ Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+ + "/" + new Path(remoteUrl2).getName()
+ + suffix2 + ".zip").exists());
+
+ // Ensure files will be localized
+ List files = serviceSpec.getConfiguration().getFiles();
+ Assert.assertEquals(4, files.size());
+ ConfigFile file = files.get(0);
+ // The hdfs dir should be download and compress and let YARN to uncompress
+ Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+ String expectedSrcLocalization = stagingDir.toUri().getPath()
+ + "/" + new Path(remoteUrl).getName() + suffix1 + ".zip";
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+
+ // Relative path in container, but not "." or "./". Use its own name
+ String expectedDstFileName = new Path(containerPath).getName();
+ Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+ file = files.get(1);
+ Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+ expectedSrcLocalization = stagingDir.toUri().getPath()
+ + "/" + new Path(remoteUrl2).getName() + suffix2 + ".zip";
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+
+ expectedDstFileName = new Path(containPath2).getName();
+ Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+ file = files.get(2);
+ Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+ expectedSrcLocalization = stagingDir.toUri().getPath()
+ + "/" + new Path(remoteUrl).getName() + suffix1 + ".zip";
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+ // Relative path in container ".", use remote path name
+ expectedDstFileName = new Path(remoteUrl).getName();
+ Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+ file = files.get(3);
+ Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+ expectedSrcLocalization = stagingDir.toUri().getPath()
+ + "/" + new Path(remoteUrl2).getName() + suffix2 + ".zip";
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+ // Relative path in container "./", use remote path name
+ expectedDstFileName = new Path(remoteUrl2).getName();
+ Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+ // Ensure mounts env value is correct. Add one mount string
+ String env = serviceSpec.getConfiguration().getEnv()
+ .get("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS");
+
+ String expectedMounts =
+ new Path(containPath2).getName() + ":" + containPath2 + ":rw";
+ Assert.assertTrue(env.contains(expectedMounts));
+
+ remoteFile1.delete();
+ remoteFile2.delete();
+ remoteFile3.delete();
+ remoteFile4.delete();
+ remoteDir1.delete();
+ remoteDir2.delete();
+ }
+
+ /**
+ * Test if file/dir to be localized whose size exceeds limit.
+ * Max 10MB in configuration, mock remote will
+ * always return file size 100MB.
+ * This configuration will fail the job which has remoteUri
+ * But don't impact local dir/file
+ *
+ * --localization https://a/b/1.patch:.
+ * --localization s3a://a/dir:/opt/mys3dir
+ * --localization /temp/script2.py:./
+ */
+ @Test
+ public void testRunJobRemoteUriExceedLocalizationSize() throws Exception {
+ String remoteUri1 = "https://a/b/1.patch";
+ String containerLocal1 = ".";
+ String remoteUri2 = "s3a://a/s3dir";
+ String containerLocal2 = "/opt/mys3dir";
+ String localUri1 = "/temp/script2";
+ String containerLocal3 = "./";
+
+ MockClientContext mockClientContext =
+ YarnServiceCliTestUtils.getMockClientContext();
+ SubmarineConfiguration submarineConf = new SubmarineConfiguration();
+ RemoteDirectoryManager spyRdm =
+ spy(mockClientContext.getRemoteDirectoryManager());
+ mockClientContext.setRemoteDirectoryMgr(spyRdm);
+ /**
+ * Max 10MB, mock remote will always return file size 100MB.
+ * */
+ submarineConf.set(
+ SubmarineConfiguration.LOCALIZATION_MAX_ALLOWED_FILE_SIZE_MB,
+ "10");
+ mockClientContext.setSubmarineConfig(submarineConf);
+
+ RunJobCli runJobCli = new RunJobCli(mockClientContext);
+ Assert.assertFalse(SubmarineLogs.isVerbose());
+
+ // create remote file in local staging dir to simulate
+ Path stagingDir = mockClientContext.getRemoteDirectoryManager()
+ .getJobStagingArea("my-job", true);
+ File remoteFile1 = new File(stagingDir.toUri().getPath()
+ + "/" + new Path(remoteUri1).getName());
+ remoteFile1.createNewFile();
+ File remoteDir1 = new File(stagingDir.toUri().getPath()
+ + "/" + new Path(remoteUri2).getName());
+ remoteDir1.mkdir();
+
+ File remoteDir1File1 = new File(remoteDir1, "afile");
+ remoteDir1File1.createNewFile();
+
+ String fakeLocalDir = System.getProperty("java.io.tmpdir");
+ // create local file, we need to put it under local temp dir
+ File localFile1 = new File(fakeLocalDir,
+ new Path(localUri1).getName());
+ localFile1.createNewFile();
+
+ Assert.assertTrue(remoteFile1.exists());
+ Assert.assertTrue(remoteDir1.exists());
+ Assert.assertTrue(remoteDir1File1.exists());
+
+ String suffix1 = "_" + remoteDir1.lastModified()
+ + "-" + remoteDir1.length();
+ try {
+ runJobCli = new RunJobCli(mockClientContext);
+ runJobCli.run(
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+ "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+ "python run-job.py", "--worker_resources",
+ "memory=2048M,vcores=2",
+ "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+ "ps.image", "--worker_docker_image", "worker.image",
+ "--ps_launch_cmd", "python run-ps.py", "--verbose",
+ "--localization",
+ remoteUri1 + ":" + containerLocal1});
+ } catch (IOException e) {
+ // Shouldn't have exception because it's within file size limit
+ Assert.assertFalse(true);
+ }
+ // we should download because fail fast
+ verify(spyRdm, times(1)).copyRemoteToLocal(
+ anyString(), anyString());
+ try {
+ // reset
+ reset(spyRdm);
+ runJobCli = new RunJobCli(mockClientContext);
+ runJobCli.run(
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+ "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+ "python run-job.py", "--worker_resources",
+ "memory=2048M,vcores=2",
+ "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+ "ps.image", "--worker_docker_image", "worker.image",
+ "--ps_launch_cmd", "python run-ps.py", "--verbose",
+ "--localization",
+ remoteUri1 + ":" + containerLocal1,
+ "--localization",
+ remoteUri2 + ":" + containerLocal2,
+ "--localization",
+ localFile1.getAbsolutePath() + ":" + containerLocal3});
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage()
+ .contains("104857600 exceeds configured max size:10485760"));
+ // we shouldn't do any download because fail fast
+ verify(spyRdm, times(0)).copyRemoteToLocal(
+ anyString(), anyString());
+ }
+
+ try {
+ runJobCli = new RunJobCli(mockClientContext);
+ runJobCli.run(
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+ "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+ "python run-job.py", "--worker_resources",
+ "memory=2048M,vcores=2",
+ "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+ "ps.image", "--worker_docker_image", "worker.image",
+ "--ps_launch_cmd", "python run-ps.py", "--verbose",
+ "--localization",
+ localFile1.getAbsolutePath() + ":" + containerLocal3});
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage()
+ .contains("104857600 exceeds configured max size:10485760"));
+ // we shouldn't do any download because fail fast
+ verify(spyRdm, times(0)).copyRemoteToLocal(
+ anyString(), anyString());
+ }
+
+ localFile1.delete();
+ remoteDir1File1.delete();
+ remoteFile1.delete();
+ remoteDir1.delete();
+ }
+
+ /**
+ * Test remote Uri doesn't exist.
+ * */
+ @Test
+ public void testRunJobWithNonExistRemoteUri() throws Exception {
+ String remoteUri1 = "hdfs:///a/b/1.patch";
+ String containerLocal1 = ".";
+ String localUri1 = "/a/b/c";
+ String containerLocal2 = "./";
+ MockClientContext mockClientContext =
+ YarnServiceCliTestUtils.getMockClientContext();
+
+ RunJobCli runJobCli = new RunJobCli(mockClientContext);
+ Assert.assertFalse(SubmarineLogs.isVerbose());
+
+ try {
+ runJobCli.run(
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+ "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+ "python run-job.py", "--worker_resources",
+ "memory=2048M,vcores=2",
+ "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+ "ps.image", "--worker_docker_image", "worker.image",
+ "--ps_launch_cmd", "python run-ps.py", "--verbose",
+ "--localization",
+ remoteUri1 + ":" + containerLocal1});
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage()
+ .contains("doesn't exists"));
+ }
+
+ try {
+ runJobCli = new RunJobCli(mockClientContext);
+ runJobCli.run(
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+ "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+ "python run-job.py", "--worker_resources",
+ "memory=2048M,vcores=2",
+ "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+ "ps.image", "--worker_docker_image", "worker.image",
+ "--ps_launch_cmd", "python run-ps.py", "--verbose",
+ "--localization",
+ localUri1 + ":" + containerLocal2});
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage()
+ .contains("doesn't exists"));
+ }
+ }
+
+ /**
+ * Test local dir
+ * --localization /user/yarn/mydir:./mydir1
+ * --localization /user/yarn/mydir2:/opt/dir2:rw
+ * --localization /user/yarn/mydir2:.
+ */
+ @Test
+ public void testRunJobWithLocalDirLocalization() throws Exception {
+ String fakeLocalDir = System.getProperty("java.io.tmpdir");
+ String localUrl = "/user/yarn/mydir";
+ String containerPath = "./mydir1";
+ String localUrl2 = "/user/yarn/mydir2";
+ String containPath2 = "/opt/dir2";
+ String containerPath3 = ".";
+
+ MockClientContext mockClientContext =
+ YarnServiceCliTestUtils.getMockClientContext();
+ RunJobCli runJobCli = new RunJobCli(mockClientContext);
+ Assert.assertFalse(SubmarineLogs.isVerbose());
+
+ RemoteDirectoryManager spyRdm =
+ spy(mockClientContext.getRemoteDirectoryManager());
+ mockClientContext.setRemoteDirectoryMgr(spyRdm);
+ // create local file
+ File localDir1 = new File(fakeLocalDir,
+ localUrl);
+ localDir1.mkdirs();
+ File temp1 = new File(localDir1.getAbsolutePath() + "/1.py");
+ File temp2 = new File(localDir1.getAbsolutePath() + "/2.py");
+ temp1.createNewFile();
+ temp2.createNewFile();
+
+ File localDir2 = new File(fakeLocalDir,
+ localUrl2);
+ localDir2.mkdirs();
+ File temp3 = new File(localDir1.getAbsolutePath() + "/3.py");
+ File temp4 = new File(localDir1.getAbsolutePath() + "/4.py");
+ temp3.createNewFile();
+ temp4.createNewFile();
+
+ Assert.assertTrue(localDir1.exists());
+ Assert.assertTrue(localDir2.exists());
+
+ String suffix1 = "_" + localDir1.lastModified()
+ + "-" + localDir1.length();
+ String suffix2 = "_" + localDir2.lastModified()
+ + "-" + localDir2.length();
+
+ runJobCli.run(
+ new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
+ "--input_path", "s3://input", "--checkpoint_path", "s3://output",
+ "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
+ "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
+ "--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
+ "ps.image", "--worker_docker_image", "worker.image",
+ "--ps_launch_cmd", "python run-ps.py", "--verbose",
+ "--localization",
+ fakeLocalDir + localUrl + ":" + containerPath,
+ "--localization",
+ fakeLocalDir + localUrl2 + ":" + containPath2 + ":rw",
+ "--localization",
+ fakeLocalDir + localUrl2 + ":" + containerPath3});
+
+ Service serviceSpec = getServiceSpecFromJobSubmitter(
+ runJobCli.getJobSubmitter());
+ Assert.assertEquals(3, serviceSpec.getComponents().size());
+
+ // we shouldn't do any download
+ verify(spyRdm, times(0)).copyRemoteToLocal(
+ anyString(), anyString());
+
+ // Ensure local original files are not deleted
+ Assert.assertTrue(localDir1.exists());
+ Assert.assertTrue(localDir2.exists());
+
+ // Ensure zip file are deleted
+ Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+ + "/" + new Path(localUrl).getName()
+ + suffix1 + ".zip").exists());
+ Assert.assertFalse(new File(System.getProperty("java.io.tmpdir")
+ + "/" + new Path(localUrl2).getName()
+ + suffix2 + ".zip").exists());
+
+ // Ensure dirs will be zipped and localized
+ List files = serviceSpec.getConfiguration().getFiles();
+ Assert.assertEquals(3, files.size());
+ ConfigFile file = files.get(0);
+ Path stagingDir = mockClientContext.getRemoteDirectoryManager()
+ .getJobStagingArea("my-job", true);
+ Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+ String expectedSrcLocalization = stagingDir.toUri().getPath()
+ + "/" + new Path(localUrl).getName() + suffix1 + ".zip";
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+ String expectedDstFileName = new Path(containerPath).getName();
+ Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+ file = files.get(1);
+ Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+ expectedSrcLocalization = stagingDir.toUri().getPath()
+ + "/" + new Path(localUrl2).getName() + suffix2 + ".zip";
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+ expectedDstFileName = new Path(containPath2).getName();
+ Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+ file = files.get(2);
+ Assert.assertEquals(ConfigFile.TypeEnum.ARCHIVE, file.getType());
+ expectedSrcLocalization = stagingDir.toUri().getPath()
+ + "/" + new Path(localUrl2).getName() + suffix2 + ".zip";
+ Assert.assertEquals(expectedSrcLocalization,
+ new Path(file.getSrcFile()).toUri().getPath());
+ expectedDstFileName = new Path(localUrl2).getName();
+ Assert.assertEquals(expectedDstFileName, file.getDestFile());
+
+ // Ensure mounts env value is correct
+ String env = serviceSpec.getConfiguration().getEnv()
+ .get("YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS");
+ String expectedMounts = new Path(containPath2).getName()
+ + ":" + containPath2 + ":rw";
+
+ Assert.assertTrue(env.contains(expectedMounts));
+
+ temp1.delete();
+ temp2.delete();
+ temp3.delete();
+ temp4.delete();
+ localDir2.delete();
+ localDir1.delete();
+ }
+
+ /**
+ * Test zip function.
+ * A dir "/user/yarn/mydir" has two files and one subdir
+ * */
+ @Test
+ public void testYarnServiceSubmitterZipFunction()
+ throws Exception {
+ MockClientContext mockClientContext =
+ YarnServiceCliTestUtils.getMockClientContext();
+ RunJobCli runJobCli = new RunJobCli(mockClientContext);
+ YarnServiceJobSubmitter submitter =
+ (YarnServiceJobSubmitter)mockClientContext
+ .getRuntimeFactory().getJobSubmitterInstance();
+ String fakeLocalDir = System.getProperty("java.io.tmpdir");
+ String localUrl = "/user/yarn/mydir";
+ String localSubDirName = "subdir1";
+ // create local file
+ File localDir1 = new File(fakeLocalDir,
+ localUrl);
+ localDir1.mkdirs();
+ File temp1 = new File(localDir1.getAbsolutePath() + "/1.py");
+ File temp2 = new File(localDir1.getAbsolutePath() + "/2.py");
+ temp1.createNewFile();
+ temp2.createNewFile();
+
+
+ File localSubDir = new File(localDir1.getAbsolutePath(), localSubDirName);
+ localSubDir.mkdir();
+ File temp3 = new File(localSubDir.getAbsolutePath(), "3.py");
+ temp3.createNewFile();
+
+
+ String zipFilePath = submitter.zipDir(localDir1.getAbsolutePath(),
+ fakeLocalDir + "/user/yarn/mydir.zip");
+ File zipFile = new File(zipFilePath);
+ File unzipTargetDir = new File(fakeLocalDir, "unzipDir");
+ FileUtil.unZip(zipFile, unzipTargetDir);
+ Assert.assertTrue(
+ new File(fakeLocalDir + "/unzipDir/1.py").exists());
+ Assert.assertTrue(
+ new File(fakeLocalDir + "/unzipDir/2.py").exists());
+ Assert.assertTrue(
+ new File(fakeLocalDir + "/unzipDir/subdir1").exists());
+ Assert.assertTrue(
+ new File(fakeLocalDir + "/unzipDir/subdir1/3.py").exists());
+
+ zipFile.delete();
+ unzipTargetDir.delete();
+ temp1.delete();
+ temp2.delete();
+ temp3.delete();
+ localSubDir.delete();
+ localDir1.delete();
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java
index b59c01e6e21..23c45d20e34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/MockClientContext.java
@@ -31,7 +31,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MockClientContext extends ClientContext {
- private MockRemoteDirectoryManager remoteDirectoryMgr =
+
+ private RemoteDirectoryManager remoteDirectoryMgr =
new MockRemoteDirectoryManager();
@Override
@@ -39,6 +40,11 @@ public class MockClientContext extends ClientContext {
return remoteDirectoryMgr;
}
+ public void setRemoteDirectoryMgr(
+ RemoteDirectoryManager remoteDirectoryMgr) {
+ this.remoteDirectoryMgr = remoteDirectoryMgr;
+ }
+
@Override
public synchronized YarnClient getOrCreateYarnClient() {
YarnClient client = mock(YarnClient.class);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java
index b637036b671..43342932b22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/common/fs/MockRemoteDirectoryManager.java
@@ -19,7 +19,9 @@
package org.apache.hadoop.yarn.submarine.common.fs;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import java.io.File;
@@ -29,6 +31,7 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
private File jobsParentDir = null;
private File modelParentDir = null;
+ private File jobDir = null;
@Override
public Path getJobStagingArea(String jobName, boolean create)
throws IOException {
@@ -41,10 +44,11 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
}
}
- File jobDir = new File(jobsParentDir.getAbsolutePath(), jobName);
+ this.jobDir = new File(jobsParentDir.getAbsolutePath(), jobName);
if (create && !jobDir.exists()) {
if (!jobDir.mkdirs()) {
- throw new IOException("Failed to mkdirs for " + jobDir.getAbsolutePath());
+ throw new IOException("Failed to mkdirs for "
+ + jobDir.getAbsolutePath());
}
}
return new Path(jobDir.getAbsolutePath());
@@ -57,7 +61,8 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
}
@Override
- public Path getModelDir(String modelName, boolean create) throws IOException {
+ public Path getModelDir(String modelName, boolean create)
+ throws IOException {
if (modelParentDir == null && create) {
modelParentDir = new File(
"target/_models_" + System.currentTimeMillis());
@@ -70,19 +75,94 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
File modelDir = new File(modelParentDir.getAbsolutePath(), modelName);
if (create) {
if (!modelDir.exists() && !modelDir.mkdirs()) {
- throw new IOException("Failed to mkdirs for " + modelDir.getAbsolutePath());
+ throw new IOException("Failed to mkdirs for "
+ + modelDir.getAbsolutePath());
}
}
return new Path(modelDir.getAbsolutePath());
}
@Override
- public FileSystem getFileSystem() throws IOException {
+ public FileSystem getDefaultFileSystem() throws IOException {
return FileSystem.getLocal(new Configuration());
}
+ @Override
+ public FileSystem getFileSystemByUri(String uri) throws IOException {
+ return getDefaultFileSystem();
+ }
+
@Override
public Path getUserRootFolder() throws IOException {
return new Path("s3://generated_root_dir");
}
+
+ @Override
+ public boolean isDir(String uri) throws IOException {
+ return getDefaultFileSystem().getFileStatus(
+ new Path(convertToStagingPath(uri))).isDirectory();
+
+ }
+
+ @Override
+ public boolean isRemote(String uri) throws IOException {
+ String scheme = new Path(uri).toUri().getScheme();
+ if (null == scheme) {
+ return false;
+ }
+ return !scheme.startsWith("file://");
+ }
+
+ private String convertToStagingPath(String uri) throws IOException {
+ String ret = uri;
+ if (isRemote(uri)) {
+ String dirName = new Path(uri).getName();
+ ret = this.jobDir.getAbsolutePath()
+ + "/" + dirName;
+ }
+ return ret;
+ }
+
+ /**
+ * We use staging dir as mock HDFS dir.
+ * */
+ @Override
+ public boolean copyRemoteToLocal(String remoteUri, String localUri)
+ throws IOException {
+ // mock the copy from HDFS into a local copy
+ Path remoteToLocalDir = new Path(convertToStagingPath(remoteUri));
+ File old = new File(convertToStagingPath(localUri));
+ if (old.isDirectory() && old.exists()) {
+ if (!FileUtil.fullyDelete(old)) {
+ throw new IOException("Cannot delete temp dir:"
+ + old.getAbsolutePath());
+ }
+ }
+ return FileUtil.copy(getDefaultFileSystem(), remoteToLocalDir,
+ new File(localUri), false,
+ getDefaultFileSystem().getConf());
+ }
+
+ @Override
+ public boolean existsRemoteFile(Path uri) throws IOException {
+ String fakeLocalFilePath = this.jobDir.getAbsolutePath()
+ + "/" + uri.getName();
+ return new File(fakeLocalFilePath).exists();
+ }
+
+ @Override
+ public FileStatus getRemoteFileStatus(Path p) throws IOException {
+ return getDefaultFileSystem().getFileStatus(new Path(
+ convertToStagingPath(p.toUri().toString())));
+ }
+
+ @Override
+ public long getRemoteFileSize(String uri) throws IOException {
+ // 5 byte for this file to test
+ if (uri.equals("https://a/b/1.patch")) {
+ return 5;
+ }
+ return 100 * 1024 * 1024;
+ }
+
}