YARN-8757. [Submarine] Add Tensorboard component when --tensorboard is specified. Contributed by Wangda Tan.
This commit is contained in:
parent
56e0d635e0
commit
1824d5d1c4
|
@ -35,6 +35,10 @@ public class CliConstants {
|
|||
public static final String DOCKER_IMAGE = "docker_image";
|
||||
public static final String QUEUE = "queue";
|
||||
public static final String TENSORBOARD = "tensorboard";
|
||||
public static final String TENSORBOARD_RESOURCES = "tensorboard_resources";
|
||||
public static final String TENSORBOARD_DEFAULT_RESOURCES =
|
||||
"memory=4G,vcores=1";
|
||||
|
||||
public static final String WORKER_LAUNCH_CMD = "worker_launch_cmd";
|
||||
public static final String SERVING_LAUNCH_CMD = "serving_launch_cmd";
|
||||
public static final String PS_LAUNCH_CMD = "ps_launch_cmd";
|
||||
|
@ -45,4 +49,6 @@ public class CliConstants {
|
|||
public static final String WAIT_JOB_FINISH = "wait_job_finish";
|
||||
public static final String PS_DOCKER_IMAGE = "ps_docker_image";
|
||||
public static final String WORKER_DOCKER_IMAGE = "worker_docker_image";
|
||||
public static final String TENSORBOARD_DOCKER_IMAGE =
|
||||
"tensorboard_docker_image";
|
||||
}
|
||||
|
|
|
@ -39,17 +39,9 @@ public class CliUtils {
|
|||
public static String replacePatternsInLaunchCommand(String specifiedCli,
|
||||
RunJobParameters jobRunParameters,
|
||||
RemoteDirectoryManager directoryManager) throws IOException {
|
||||
String jobDir = jobRunParameters.getCheckpointPath();
|
||||
if (null == jobDir) {
|
||||
jobDir = directoryManager.getJobCheckpointDir(jobRunParameters.getName(),
|
||||
true).toString();
|
||||
}
|
||||
|
||||
String input = jobRunParameters.getInputPath();
|
||||
String jobDir = jobRunParameters.getCheckpointPath();
|
||||
String savedModelDir = jobRunParameters.getSavedModelPath();
|
||||
if (null == savedModelDir) {
|
||||
savedModelDir = jobDir;
|
||||
}
|
||||
|
||||
Map<String, String> replacePattern = new HashMap<>();
|
||||
if (jobDir != null) {
|
||||
|
|
|
@ -89,8 +89,16 @@ public class RunJobCli extends AbstractCli {
|
|||
options.addOption(CliConstants.DOCKER_IMAGE, true, "Docker image name/tag");
|
||||
options.addOption(CliConstants.QUEUE, true,
|
||||
"Name of queue to run the job, by default it uses default queue");
|
||||
options.addOption(CliConstants.TENSORBOARD, true,
|
||||
"Should we run TensorBoard" + " for this job? By default it's true");
|
||||
options.addOption(CliConstants.TENSORBOARD, false,
|
||||
"Should we run TensorBoard"
|
||||
+ " for this job? By default it's disabled");
|
||||
options.addOption(CliConstants.TENSORBOARD_RESOURCES, true,
|
||||
"Specify resources of Tensorboard, by default it is "
|
||||
+ CliConstants.TENSORBOARD_DEFAULT_RESOURCES);
|
||||
options.addOption(CliConstants.TENSORBOARD_DOCKER_IMAGE, true,
|
||||
"Specify Tensorboard docker image. when this is not "
|
||||
+ "specified, Tensorboard " + "uses --" + CliConstants.DOCKER_IMAGE
|
||||
+ " as default.");
|
||||
options.addOption(CliConstants.WORKER_LAUNCH_CMD, true,
|
||||
"Commandline of worker, arguments will be "
|
||||
+ "directly used to launch the worker");
|
||||
|
@ -144,10 +152,39 @@ public class RunJobCli extends AbstractCli {
|
|||
throw e;
|
||||
}
|
||||
|
||||
// Set default job dir / saved model dir, etc.
|
||||
setDefaultDirs();
|
||||
|
||||
// replace patterns
|
||||
replacePatternsInParameters();
|
||||
}
|
||||
|
||||
private void setDefaultDirs() throws IOException {
|
||||
// Create directories if needed
|
||||
String jobDir = parameters.getCheckpointPath();
|
||||
if (null == jobDir) {
|
||||
if (parameters.getNumWorkers() > 0) {
|
||||
jobDir = clientContext.getRemoteDirectoryManager().getJobCheckpointDir(
|
||||
parameters.getName(), true).toString();
|
||||
} else {
|
||||
// when #workers == 0, it means we only launch TB. In that case,
|
||||
// point job dir to root dir so all job's metrics will be shown.
|
||||
jobDir = clientContext.getRemoteDirectoryManager().getUserRootFolder()
|
||||
.toString();
|
||||
}
|
||||
parameters.setCheckpointPath(jobDir);
|
||||
}
|
||||
|
||||
if (parameters.getNumWorkers() > 0) {
|
||||
// Only do this when #worker > 0
|
||||
String savedModelDir = parameters.getSavedModelPath();
|
||||
if (null == savedModelDir) {
|
||||
savedModelDir = jobDir;
|
||||
parameters.setSavedModelPath(savedModelDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void storeJobInformation(String jobName, ApplicationId applicationId,
|
||||
String[] args) throws IOException {
|
||||
Map<String, String> jobInfo = new HashMap<>();
|
||||
|
@ -198,7 +235,7 @@ public class RunJobCli extends AbstractCli {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
RunJobParameters getRunJobParameters() {
|
||||
public RunJobParameters getRunJobParameters() {
|
||||
return parameters;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,6 +37,8 @@ public class RunJobParameters extends RunParameters {
|
|||
private Resource workerResource;
|
||||
private Resource psResource;
|
||||
private boolean tensorboardEnabled;
|
||||
private Resource tensorboardResource;
|
||||
private String tensorboardDockerImage;
|
||||
private String workerLaunchCmd;
|
||||
private String psLaunchCmd;
|
||||
|
||||
|
@ -69,19 +71,23 @@ public class RunJobParameters extends RunParameters {
|
|||
// When distributed training is required
|
||||
if (nWorkers >= 2 && nPS > 0) {
|
||||
distributed = true;
|
||||
} else if (nWorkers == 1 && nPS > 0) {
|
||||
} else if (nWorkers <= 1 && nPS > 0) {
|
||||
throw new ParseException("Only specified one worker but non-zero PS, "
|
||||
+ "please double check.");
|
||||
}
|
||||
|
||||
String workerResourceStr = parsedCommandLine.getOptionValue(
|
||||
CliConstants.WORKER_RES);
|
||||
if (workerResourceStr == null) {
|
||||
throw new ParseException("--" + CliConstants.WORKER_RES + " is absent.");
|
||||
workerResource = null;
|
||||
if (nWorkers > 0) {
|
||||
String workerResourceStr = parsedCommandLine.getOptionValue(
|
||||
CliConstants.WORKER_RES);
|
||||
if (workerResourceStr == null) {
|
||||
throw new ParseException(
|
||||
"--" + CliConstants.WORKER_RES + " is absent.");
|
||||
}
|
||||
workerResource = CliUtils.createResourceFromString(
|
||||
workerResourceStr,
|
||||
clientContext.getOrCreateYarnClient().getResourceTypeInfo());
|
||||
}
|
||||
Resource workerResource = CliUtils.createResourceFromString(
|
||||
workerResourceStr,
|
||||
clientContext.getOrCreateYarnClient().getResourceTypeInfo());
|
||||
|
||||
Resource psResource = null;
|
||||
if (nPS > 0) {
|
||||
|
@ -94,9 +100,19 @@ public class RunJobParameters extends RunParameters {
|
|||
}
|
||||
|
||||
boolean tensorboard = false;
|
||||
if (parsedCommandLine.getOptionValue(CliConstants.TENSORBOARD) != null) {
|
||||
tensorboard = Boolean.parseBoolean(
|
||||
parsedCommandLine.getOptionValue(CliConstants.TENSORBOARD));
|
||||
if (parsedCommandLine.hasOption(CliConstants.TENSORBOARD)) {
|
||||
tensorboard = true;
|
||||
String tensorboardResourceStr = parsedCommandLine.getOptionValue(
|
||||
CliConstants.TENSORBOARD_RESOURCES);
|
||||
if (tensorboardResourceStr == null || tensorboardResourceStr.isEmpty()) {
|
||||
tensorboardResourceStr = CliConstants.TENSORBOARD_DEFAULT_RESOURCES;
|
||||
}
|
||||
tensorboardResource = CliUtils.createResourceFromString(
|
||||
tensorboardResourceStr,
|
||||
clientContext.getOrCreateYarnClient().getResourceTypeInfo());
|
||||
tensorboardDockerImage = parsedCommandLine.getOptionValue(
|
||||
CliConstants.TENSORBOARD_DOCKER_IMAGE);
|
||||
this.setTensorboardResource(tensorboardResource);
|
||||
}
|
||||
|
||||
if (parsedCommandLine.hasOption(CliConstants.WAIT_JOB_FINISH)) {
|
||||
|
@ -115,7 +131,7 @@ public class RunJobParameters extends RunParameters {
|
|||
|
||||
this.setInputPath(input).setCheckpointPath(jobDir).setNumPS(nPS).setNumWorkers(nWorkers)
|
||||
.setPSLaunchCmd(psLaunchCommand).setWorkerLaunchCmd(workerLaunchCmd)
|
||||
.setPsResource(psResource).setWorkerResource(workerResource)
|
||||
.setPsResource(psResource)
|
||||
.setTensorboardEnabled(tensorboard);
|
||||
|
||||
super.updateParametersByParsedCommandline(parsedCommandLine,
|
||||
|
@ -219,4 +235,16 @@ public class RunJobParameters extends RunParameters {
|
|||
public boolean isDistributed() {
|
||||
return distributed;
|
||||
}
|
||||
|
||||
public Resource getTensorboardResource() {
|
||||
return tensorboardResource;
|
||||
}
|
||||
|
||||
public void setTensorboardResource(Resource tensorboardResource) {
|
||||
this.tensorboardResource = tensorboardResource;
|
||||
}
|
||||
|
||||
public String getTensorboardDockerImage() {
|
||||
return tensorboardDockerImage;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.submarine.common.fs;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
|
||||
|
@ -42,7 +43,10 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
|
|||
if (create) {
|
||||
createFolderIfNotExist(staging);
|
||||
}
|
||||
return staging;
|
||||
|
||||
// Get a file status to make sure it is a absolute path.
|
||||
FileStatus fStatus = fs.getFileStatus(staging);
|
||||
return fStatus.getPath();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -70,8 +74,21 @@ public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
|
|||
return fs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getUserRootFolder() throws IOException {
|
||||
Path rootPath = new Path("submarine", "jobs");
|
||||
createFolderIfNotExist(rootPath);
|
||||
// Get a file status to make sure it is a absolute path.
|
||||
FileStatus fStatus = fs.getFileStatus(rootPath);
|
||||
return fStatus.getPath();
|
||||
}
|
||||
|
||||
private Path getJobRootFolder(String jobName) throws IOException {
|
||||
return new Path(new Path("submarine", "jobs"), jobName);
|
||||
Path jobRootPath = getUserRootFolder();
|
||||
createFolderIfNotExist(jobRootPath);
|
||||
// Get a file status to make sure it is a absolute path.
|
||||
FileStatus fStatus = fs.getFileStatus(jobRootPath);
|
||||
return fStatus.getPath();
|
||||
}
|
||||
|
||||
private void createFolderIfNotExist(Path path) throws IOException {
|
||||
|
|
|
@ -27,4 +27,6 @@ public interface RemoteDirectoryManager {
|
|||
Path getModelDir(String modelName, boolean create) throws IOException;
|
||||
|
||||
FileSystem getFileSystem() throws IOException;
|
||||
}
|
||||
|
||||
Path getUserRootFolder() throws IOException;
|
||||
}
|
|
@ -32,11 +32,9 @@ import java.util.Map;
|
|||
* A super naive FS-based storage.
|
||||
*/
|
||||
public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
|
||||
ClientContext clientContext;
|
||||
RemoteDirectoryManager rdm;
|
||||
|
||||
public FSBasedSubmarineStorageImpl(ClientContext clientContext) {
|
||||
this.clientContext = clientContext;
|
||||
rdm = clientContext.getRemoteDirectoryManager();
|
||||
}
|
||||
|
||||
|
@ -89,7 +87,7 @@ public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
|
|||
private Map<String, String> deserializeMap(FSDataInputStream fis)
|
||||
throws IOException {
|
||||
ObjectInput oi = new ObjectInputStream(fis);
|
||||
Map<String, String> newMap = null;
|
||||
Map<String, String> newMap;
|
||||
try {
|
||||
newMap = (Map<String, String>) oi.readObject();
|
||||
} catch (ClassNotFoundException e) {
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package org.apache.hadoop.yarn.submarine.runtimes.yarnservice;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -59,6 +60,10 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|||
Service serviceSpec;
|
||||
private Set<Path> uploadedFiles = new HashSet<>();
|
||||
|
||||
// Used by testing
|
||||
private Map<String, String> componentToLocalLaunchScriptPath =
|
||||
new HashMap<>();
|
||||
|
||||
public YarnServiceJobSubmitter(ClientContext clientContext) {
|
||||
this.clientContext = clientContext;
|
||||
}
|
||||
|
@ -186,6 +191,14 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|||
envs.put(Envs.TASK_TYPE_ENV, taskType.name());
|
||||
}
|
||||
|
||||
private String getUserName() {
|
||||
return System.getProperty("user.name");
|
||||
}
|
||||
|
||||
private String getDNSDomain() {
|
||||
return clientContext.getYarnConfig().get("hadoop.registry.dns.domain-name");
|
||||
}
|
||||
|
||||
/*
|
||||
* Generate a command launch script on local disk, returns patch to the script
|
||||
*/
|
||||
|
@ -194,50 +207,48 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|||
File file = File.createTempFile(taskType.name() + "-launch-script", ".sh");
|
||||
FileWriter fw = new FileWriter(file);
|
||||
|
||||
fw.append("#!/bin/bash\n");
|
||||
try {
|
||||
fw.append("#!/bin/bash\n");
|
||||
|
||||
addHdfsClassPathIfNeeded(parameters, fw, comp);
|
||||
addHdfsClassPathIfNeeded(parameters, fw, comp);
|
||||
|
||||
// For primary_worker
|
||||
if (taskType == TaskType.PRIMARY_WORKER) {
|
||||
// Do we need tensorboard?
|
||||
if (parameters.isTensorboardEnabled()) {
|
||||
int tensorboardPort = 6006;
|
||||
// Run tensorboard at the background
|
||||
fw.append(
|
||||
"tensorboard --port " + tensorboardPort + " --logdir " + parameters
|
||||
.getCheckpointPath() + " &\n");
|
||||
if (taskType.equals(TaskType.TENSORBOARD)) {
|
||||
String tbCommand =
|
||||
"export LC_ALL=C && tensorboard --logdir=" + parameters
|
||||
.getCheckpointPath();
|
||||
fw.append(tbCommand + "\n");
|
||||
LOG.info("Tensorboard command=" + tbCommand);
|
||||
} else{
|
||||
// When distributed training is required
|
||||
if (parameters.isDistributed()) {
|
||||
// Generated TF_CONFIG
|
||||
String tfConfigEnv = YarnServiceUtils.getTFConfigEnv(
|
||||
taskType.getComponentName(), parameters.getNumWorkers(),
|
||||
parameters.getNumPS(), parameters.getName(), getUserName(),
|
||||
getDNSDomain());
|
||||
fw.append("export TF_CONFIG=\"" + tfConfigEnv + "\"\n");
|
||||
}
|
||||
|
||||
// Print launch command
|
||||
if (taskType.equals(TaskType.WORKER) || taskType.equals(
|
||||
TaskType.PRIMARY_WORKER)) {
|
||||
fw.append(parameters.getWorkerLaunchCmd() + '\n');
|
||||
|
||||
if (SubmarineLogs.isVerbose()) {
|
||||
LOG.info(
|
||||
"Worker command =[" + parameters.getWorkerLaunchCmd() + "]");
|
||||
}
|
||||
} else if (taskType.equals(TaskType.PS)) {
|
||||
fw.append(parameters.getPSLaunchCmd() + '\n');
|
||||
|
||||
if (SubmarineLogs.isVerbose()) {
|
||||
LOG.info("PS command =[" + parameters.getPSLaunchCmd() + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
fw.close();
|
||||
}
|
||||
|
||||
// When distributed training is required
|
||||
if (parameters.isDistributed()) {
|
||||
// Generated TF_CONFIG
|
||||
String tfConfigEnv = YarnServiceUtils.getTFConfigEnv(
|
||||
taskType.getComponentName(), parameters.getNumWorkers(),
|
||||
parameters.getNumPS(), parameters.getName(),
|
||||
System.getProperty("user.name"),
|
||||
clientContext.getYarnConfig().get("hadoop.registry.dns.domain-name"));
|
||||
fw.append("export TF_CONFIG=\"" + tfConfigEnv + "\"\n");
|
||||
}
|
||||
|
||||
// Print launch command
|
||||
if (taskType.equals(TaskType.WORKER) || taskType.equals(
|
||||
TaskType.PRIMARY_WORKER)) {
|
||||
fw.append(parameters.getWorkerLaunchCmd() + '\n');
|
||||
|
||||
if (SubmarineLogs.isVerbose()) {
|
||||
LOG.info("Worker command =[" + parameters.getWorkerLaunchCmd() + "]");
|
||||
}
|
||||
} else if (taskType.equals(TaskType.PS)) {
|
||||
fw.append(parameters.getPSLaunchCmd() + '\n');
|
||||
|
||||
if (SubmarineLogs.isVerbose()) {
|
||||
LOG.info("PS command =[" + parameters.getPSLaunchCmd() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
fw.close();
|
||||
return file.getAbsolutePath();
|
||||
}
|
||||
|
||||
|
@ -320,6 +331,8 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|||
destScriptFileName, component);
|
||||
|
||||
component.setLaunchCommand("./" + destScriptFileName);
|
||||
componentToLocalLaunchScriptPath.put(taskType.getComponentName(),
|
||||
localScriptFile);
|
||||
}
|
||||
|
||||
private void addWorkerComponent(Service service,
|
||||
|
@ -410,6 +423,7 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|||
|
||||
private Service createServiceByParameters(RunJobParameters parameters)
|
||||
throws IOException {
|
||||
componentToLocalLaunchScriptPath.clear();
|
||||
Service service = new Service();
|
||||
service.setName(parameters.getName());
|
||||
service.setVersion(String.valueOf(System.currentTimeMillis()));
|
||||
|
@ -417,7 +431,9 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|||
|
||||
handleServiceEnvs(service, parameters);
|
||||
|
||||
addWorkerComponents(service, parameters);
|
||||
if (parameters.getNumWorkers() > 0) {
|
||||
addWorkerComponents(service, parameters);
|
||||
}
|
||||
|
||||
if (parameters.getNumPS() > 0) {
|
||||
Component psComponent = new Component();
|
||||
|
@ -436,6 +452,31 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|||
handleLaunchCommand(parameters, TaskType.PS, psComponent);
|
||||
service.addComponent(psComponent);
|
||||
}
|
||||
|
||||
if (parameters.isTensorboardEnabled()) {
|
||||
Component tbComponent = new Component();
|
||||
tbComponent.setName(TaskType.TENSORBOARD.getComponentName());
|
||||
addCommonEnvironments(tbComponent, TaskType.TENSORBOARD);
|
||||
tbComponent.setNumberOfContainers(1L);
|
||||
tbComponent.setRestartPolicy(Component.RestartPolicyEnum.NEVER);
|
||||
tbComponent.setResource(getServiceResourceFromYarnResource(
|
||||
parameters.getTensorboardResource()));
|
||||
if (parameters.getTensorboardDockerImage() != null) {
|
||||
tbComponent.setArtifact(
|
||||
getDockerArtifact(parameters.getTensorboardDockerImage()));
|
||||
}
|
||||
|
||||
handleLaunchCommand(parameters, TaskType.TENSORBOARD, tbComponent);
|
||||
|
||||
// Add tensorboard to quicklink
|
||||
String tensorboardLink = "http://" + YarnServiceUtils.getDNSName(
|
||||
parameters.getName(), TaskType.TENSORBOARD.getComponentName(), 0,
|
||||
getUserName(), getDNSDomain(), 6006);
|
||||
LOG.info("Link to tensorboard:" + tensorboardLink);
|
||||
service.addComponent(tbComponent);
|
||||
service.setQuicklinks(ImmutableMap.of("Tensorboard", tensorboardLink));
|
||||
}
|
||||
|
||||
return service;
|
||||
}
|
||||
|
||||
|
@ -458,4 +499,9 @@ public class YarnServiceJobSubmitter implements JobSubmitter {
|
|||
public Service getServiceSpec() {
|
||||
return serviceSpec;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Map<String, String> getComponentToLocalLaunchScriptPath() {
|
||||
return componentToLocalLaunchScriptPath;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,10 +40,23 @@ public class YarnServiceUtils {
|
|||
YarnServiceUtils.stubServiceClient = stubServiceClient;
|
||||
}
|
||||
|
||||
public static String getDNSName(String serviceName, String componentName,
|
||||
int index, String userName, String domain, int port) {
|
||||
return componentName + "-" + index + getDNSNameCommonSuffix(serviceName,
|
||||
userName, domain, port);
|
||||
}
|
||||
|
||||
private static String getDNSNameCommonSuffix(String serviceName,
|
||||
String userName, String domain, int port) {
|
||||
String commonEndpointSuffix =
|
||||
"." + serviceName + "." + userName + "." + domain + ":" + port;
|
||||
return commonEndpointSuffix;
|
||||
}
|
||||
|
||||
public static String getTFConfigEnv(String curCommponentName, int nWorkers,
|
||||
int nPs, String serviceName, String userName, String domain) {
|
||||
String commonEndpointSuffix =
|
||||
"." + serviceName + "." + userName + "." + domain + ":8000";
|
||||
String commonEndpointSuffix = getDNSNameCommonSuffix(serviceName, userName,
|
||||
domain, 8000);
|
||||
|
||||
String json = "{\\\"cluster\\\":{";
|
||||
|
||||
|
@ -58,7 +71,14 @@ public class YarnServiceUtils {
|
|||
+ " \\\"index\\\":" + '$' + Envs.TASK_INDEX_ENV + "},";
|
||||
String environment = "\\\"environment\\\":\\\"cloud\\\"}";
|
||||
|
||||
return json + master + worker + ps + task + environment;
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(json);
|
||||
sb.append(master);
|
||||
sb.append(worker);
|
||||
sb.append(ps);
|
||||
sb.append(task);
|
||||
sb.append(environment);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private static String getComponentArrayJson(String componentName, int count,
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.submarine.client.cli.yarnservice;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
|
@ -32,11 +33,15 @@ import org.apache.hadoop.yarn.submarine.runtimes.common.StorageKeyConstants;
|
|||
import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
|
||||
import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceJobSubmitter;
|
||||
import org.apache.hadoop.yarn.submarine.runtimes.yarnservice.YarnServiceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
|
@ -65,25 +70,8 @@ public class TestYarnServiceRunJobCli {
|
|||
return ((YarnServiceJobSubmitter) jobSubmitter).getServiceSpec();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicRunJobForDistributedTraining() throws Exception {
|
||||
MockClientContext mockClientContext =
|
||||
YarnServiceCliTestUtils.getMockClientContext();
|
||||
RunJobCli runJobCli = new RunJobCli(mockClientContext);
|
||||
Assert.assertFalse(SubmarineLogs.isVerbose());
|
||||
|
||||
runJobCli.run(
|
||||
new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
|
||||
"--input_path", "s3://input", "--checkpoint_path",
|
||||
"s3://output", "--num_workers", "3", "--num_ps", "2",
|
||||
"--worker_launch_cmd", "python run-job.py", "--worker_resources",
|
||||
"memory=2048M,vcores=2", "--ps_resources", "memory=4096M,vcores=4",
|
||||
"--tensorboard", "true", "--ps_docker_image", "ps.image",
|
||||
"--worker_docker_image", "worker.image",
|
||||
"--ps_launch_cmd", "python run-ps.py", "--verbose" });
|
||||
Service serviceSpec = getServiceSpecFromJobSubmitter(
|
||||
runJobCli.getJobSubmitter());
|
||||
Assert.assertEquals(3, serviceSpec.getComponents().size());
|
||||
private void commonVerifyDistributedTrainingSpec(Service serviceSpec)
|
||||
throws Exception {
|
||||
Assert.assertTrue(
|
||||
serviceSpec.getComponent(TaskType.WORKER.getComponentName()) != null);
|
||||
Assert.assertTrue(
|
||||
|
@ -98,7 +86,7 @@ public class TestYarnServiceRunJobCli {
|
|||
primaryWorkerComp.getResource().getCpus().intValue());
|
||||
|
||||
Component workerComp = serviceSpec.getComponent(
|
||||
TaskType.WORKER.getComponentName());
|
||||
TaskType.WORKER.getComponentName());
|
||||
Assert.assertEquals(2048, workerComp.getResource().calcMemoryMB());
|
||||
Assert.assertEquals(2, workerComp.getResource().getCpus().intValue());
|
||||
|
||||
|
@ -110,8 +98,55 @@ public class TestYarnServiceRunJobCli {
|
|||
Assert.assertEquals("ps.image", psComp.getArtifact().getId());
|
||||
|
||||
Assert.assertTrue(SubmarineLogs.isVerbose());
|
||||
}
|
||||
|
||||
// TODO, ADD TEST TO USE SERVICE CLIENT TO VALIDATE THE JSON SPEC
|
||||
@Test
|
||||
public void testBasicRunJobForDistributedTraining() throws Exception {
|
||||
MockClientContext mockClientContext =
|
||||
YarnServiceCliTestUtils.getMockClientContext();
|
||||
RunJobCli runJobCli = new RunJobCli(mockClientContext);
|
||||
Assert.assertFalse(SubmarineLogs.isVerbose());
|
||||
|
||||
runJobCli.run(
|
||||
new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
|
||||
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
|
||||
"--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
|
||||
"python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
|
||||
"--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
|
||||
"ps.image", "--worker_docker_image", "worker.image",
|
||||
"--ps_launch_cmd", "python run-ps.py", "--verbose" });
|
||||
Service serviceSpec = getServiceSpecFromJobSubmitter(
|
||||
runJobCli.getJobSubmitter());
|
||||
Assert.assertEquals(3, serviceSpec.getComponents().size());
|
||||
|
||||
commonVerifyDistributedTrainingSpec(serviceSpec);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicRunJobForDistributedTrainingWithTensorboard()
|
||||
throws Exception {
|
||||
MockClientContext mockClientContext =
|
||||
YarnServiceCliTestUtils.getMockClientContext();
|
||||
RunJobCli runJobCli = new RunJobCli(mockClientContext);
|
||||
Assert.assertFalse(SubmarineLogs.isVerbose());
|
||||
|
||||
runJobCli.run(
|
||||
new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
|
||||
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
|
||||
"--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
|
||||
"python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
|
||||
"--ps_resources", "memory=4096M,vcores=4", "--ps_docker_image",
|
||||
"ps.image", "--worker_docker_image", "worker.image",
|
||||
"--tensorboard", "--ps_launch_cmd", "python run-ps.py",
|
||||
"--verbose" });
|
||||
Service serviceSpec = getServiceSpecFromJobSubmitter(
|
||||
runJobCli.getJobSubmitter());
|
||||
Assert.assertEquals(4, serviceSpec.getComponents().size());
|
||||
|
||||
commonVerifyDistributedTrainingSpec(serviceSpec);
|
||||
|
||||
verifyTensorboardComponent(runJobCli, serviceSpec,
|
||||
Resources.createResource(4096, 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -123,13 +158,84 @@ public class TestYarnServiceRunJobCli {
|
|||
|
||||
runJobCli.run(
|
||||
new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
|
||||
"--input_path", "s3://input", "--checkpoint_path",
|
||||
"s3://output", "--num_workers", "1", "--worker_launch_cmd",
|
||||
"python run-job.py", "--worker_resources", "memory=2G,vcores=2",
|
||||
"--tensorboard", "true", "--verbose" });
|
||||
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
|
||||
"--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
|
||||
"--worker_resources", "memory=2G,vcores=2", "--verbose" });
|
||||
|
||||
Service serviceSpec = getServiceSpecFromJobSubmitter(
|
||||
runJobCli.getJobSubmitter());
|
||||
Assert.assertEquals(1, serviceSpec.getComponents().size());
|
||||
|
||||
commonTestSingleNodeTraining(serviceSpec);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTensorboardOnlyService() throws Exception {
|
||||
MockClientContext mockClientContext =
|
||||
YarnServiceCliTestUtils.getMockClientContext();
|
||||
RunJobCli runJobCli = new RunJobCli(mockClientContext);
|
||||
Assert.assertFalse(SubmarineLogs.isVerbose());
|
||||
|
||||
runJobCli.run(
|
||||
new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
|
||||
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
|
||||
"--num_workers", "0", "--tensorboard", "--verbose" });
|
||||
|
||||
Service serviceSpec = getServiceSpecFromJobSubmitter(
|
||||
runJobCli.getJobSubmitter());
|
||||
Assert.assertEquals(1, serviceSpec.getComponents().size());
|
||||
|
||||
verifyTensorboardComponent(runJobCli, serviceSpec,
|
||||
Resources.createResource(4096, 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTensorboardOnlyServiceWithCustomizedDockerImageAndResourceCkptPath()
|
||||
throws Exception {
|
||||
MockClientContext mockClientContext =
|
||||
YarnServiceCliTestUtils.getMockClientContext();
|
||||
RunJobCli runJobCli = new RunJobCli(mockClientContext);
|
||||
Assert.assertFalse(SubmarineLogs.isVerbose());
|
||||
|
||||
runJobCli.run(
|
||||
new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
|
||||
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
|
||||
"--num_workers", "0", "--tensorboard", "--verbose",
|
||||
"--tensorboard_resources", "memory=2G,vcores=2",
|
||||
"--tensorboard_docker_image", "tb_docker_image:001" });
|
||||
|
||||
Service serviceSpec = getServiceSpecFromJobSubmitter(
|
||||
runJobCli.getJobSubmitter());
|
||||
Assert.assertEquals(1, serviceSpec.getComponents().size());
|
||||
|
||||
verifyTensorboardComponent(runJobCli, serviceSpec,
|
||||
Resources.createResource(2048, 2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTensorboardOnlyServiceWithCustomizedDockerImageAndResource()
|
||||
throws Exception {
|
||||
MockClientContext mockClientContext =
|
||||
YarnServiceCliTestUtils.getMockClientContext();
|
||||
RunJobCli runJobCli = new RunJobCli(mockClientContext);
|
||||
Assert.assertFalse(SubmarineLogs.isVerbose());
|
||||
|
||||
runJobCli.run(
|
||||
new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
|
||||
"--num_workers", "0", "--tensorboard", "--verbose",
|
||||
"--tensorboard_resources", "memory=2G,vcores=2",
|
||||
"--tensorboard_docker_image", "tb_docker_image:001" });
|
||||
|
||||
Service serviceSpec = getServiceSpecFromJobSubmitter(
|
||||
runJobCli.getJobSubmitter());
|
||||
Assert.assertEquals(1, serviceSpec.getComponents().size());
|
||||
|
||||
verifyTensorboardComponent(runJobCli, serviceSpec,
|
||||
Resources.createResource(2048, 2));
|
||||
}
|
||||
|
||||
private void commonTestSingleNodeTraining(Service serviceSpec)
|
||||
throws Exception {
|
||||
Assert.assertTrue(
|
||||
serviceSpec.getComponent(TaskType.PRIMARY_WORKER.getComponentName())
|
||||
!= null);
|
||||
|
@ -140,8 +246,110 @@ public class TestYarnServiceRunJobCli {
|
|||
primaryWorkerComp.getResource().getCpus().intValue());
|
||||
|
||||
Assert.assertTrue(SubmarineLogs.isVerbose());
|
||||
}
|
||||
|
||||
// TODO, ADD TEST TO USE SERVICE CLIENT TO VALIDATE THE JSON SPEC
|
||||
private void verifyTensorboardComponent(RunJobCli runJobCli,
|
||||
Service serviceSpec, Resource resource) throws Exception {
|
||||
Assert.assertTrue(
|
||||
serviceSpec.getComponent(TaskType.TENSORBOARD.getComponentName())
|
||||
!= null);
|
||||
Component tensorboardComp = serviceSpec.getComponent(
|
||||
TaskType.TENSORBOARD.getComponentName());
|
||||
Assert.assertEquals(1, tensorboardComp.getNumberOfContainers().intValue());
|
||||
Assert.assertEquals(resource.getMemorySize(),
|
||||
tensorboardComp.getResource().calcMemoryMB());
|
||||
Assert.assertEquals(resource.getVirtualCores(),
|
||||
tensorboardComp.getResource().getCpus().intValue());
|
||||
|
||||
Assert.assertEquals("./run-TENSORBOARD.sh",
|
||||
tensorboardComp.getLaunchCommand());
|
||||
|
||||
// Check docker image
|
||||
if (runJobCli.getRunJobParameters().getTensorboardDockerImage() != null) {
|
||||
Assert.assertEquals(
|
||||
runJobCli.getRunJobParameters().getTensorboardDockerImage(),
|
||||
tensorboardComp.getArtifact().getId());
|
||||
} else{
|
||||
Assert.assertNull(tensorboardComp.getArtifact());
|
||||
}
|
||||
|
||||
YarnServiceJobSubmitter yarnServiceJobSubmitter =
|
||||
(YarnServiceJobSubmitter) runJobCli.getJobSubmitter();
|
||||
|
||||
String expectedLaunchScript =
|
||||
"#!/bin/bash\n" + "echo \"CLASSPATH:$CLASSPATH\"\n"
|
||||
+ "echo \"HADOOP_CONF_DIR:$HADOOP_CONF_DIR\"\n"
|
||||
+ "echo \"HADOOP_TOKEN_FILE_LOCATION:$HADOOP_TOKEN_FILE_LOCATION\"\n"
|
||||
+ "echo \"JAVA_HOME:$JAVA_HOME\"\n"
|
||||
+ "echo \"LD_LIBRARY_PATH:$LD_LIBRARY_PATH\"\n"
|
||||
+ "echo \"HADOOP_HDFS_HOME:$HADOOP_HDFS_HOME\"\n"
|
||||
+ "export LC_ALL=C && tensorboard --logdir=" + runJobCli
|
||||
.getRunJobParameters().getCheckpointPath() + "\n";
|
||||
|
||||
verifyLaunchScriptForComponet(yarnServiceJobSubmitter, serviceSpec,
|
||||
TaskType.TENSORBOARD, expectedLaunchScript);
|
||||
}
|
||||
|
||||
private void verifyLaunchScriptForComponet(
|
||||
YarnServiceJobSubmitter yarnServiceJobSubmitter, Service serviceSpec,
|
||||
TaskType taskType, String expectedLaunchScriptContent) throws Exception {
|
||||
Map<String, String> componentToLocalLaunchScriptMap =
|
||||
yarnServiceJobSubmitter.getComponentToLocalLaunchScriptPath();
|
||||
|
||||
String path = componentToLocalLaunchScriptMap.get(
|
||||
taskType.getComponentName());
|
||||
|
||||
byte[] encoded = Files.readAllBytes(Paths.get(path));
|
||||
String scriptContent = new String(encoded, Charset.defaultCharset());
|
||||
|
||||
Assert.assertEquals(expectedLaunchScriptContent, scriptContent);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicRunJobForSingleNodeTrainingWithTensorboard()
|
||||
throws Exception {
|
||||
MockClientContext mockClientContext =
|
||||
YarnServiceCliTestUtils.getMockClientContext();
|
||||
RunJobCli runJobCli = new RunJobCli(mockClientContext);
|
||||
Assert.assertFalse(SubmarineLogs.isVerbose());
|
||||
|
||||
runJobCli.run(
|
||||
new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
|
||||
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
|
||||
"--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
|
||||
"--worker_resources", "memory=2G,vcores=2", "--tensorboard",
|
||||
"--verbose" });
|
||||
Service serviceSpec = getServiceSpecFromJobSubmitter(
|
||||
runJobCli.getJobSubmitter());
|
||||
|
||||
Assert.assertEquals(2, serviceSpec.getComponents().size());
|
||||
|
||||
commonTestSingleNodeTraining(serviceSpec);
|
||||
verifyTensorboardComponent(runJobCli, serviceSpec,
|
||||
Resources.createResource(4096, 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicRunJobForSingleNodeTrainingWithGeneratedCheckpoint()
|
||||
throws Exception {
|
||||
MockClientContext mockClientContext =
|
||||
YarnServiceCliTestUtils.getMockClientContext();
|
||||
RunJobCli runJobCli = new RunJobCli(mockClientContext);
|
||||
Assert.assertFalse(SubmarineLogs.isVerbose());
|
||||
|
||||
runJobCli.run(
|
||||
new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
|
||||
"--input_path", "s3://input", "--num_workers", "1",
|
||||
"--worker_launch_cmd", "python run-job.py", "--worker_resources",
|
||||
"memory=2G,vcores=2", "--tensorboard", "--verbose" });
|
||||
Service serviceSpec = getServiceSpecFromJobSubmitter(
|
||||
runJobCli.getJobSubmitter());
|
||||
|
||||
Assert.assertEquals(2, serviceSpec.getComponents().size());
|
||||
|
||||
commonTestSingleNodeTraining(serviceSpec);
|
||||
verifyTensorboardComponent(runJobCli, serviceSpec,
|
||||
Resources.createResource(4096, 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -153,10 +361,10 @@ public class TestYarnServiceRunJobCli {
|
|||
|
||||
runJobCli.run(
|
||||
new String[] { "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
|
||||
"--input_path", "s3://input", "--checkpoint_path",
|
||||
"s3://output", "--num_workers", "1", "--worker_launch_cmd",
|
||||
"python run-job.py", "--worker_resources", "memory=2G,vcores=2",
|
||||
"--tensorboard", "true", "--verbose" });
|
||||
"--input_path", "s3://input", "--checkpoint_path", "s3://output",
|
||||
"--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
|
||||
"--worker_resources", "memory=2G,vcores=2", "--tensorboard", "true",
|
||||
"--verbose" });
|
||||
SubmarineStorage storage =
|
||||
mockClientContext.getRuntimeFactory().getSubmarineStorage();
|
||||
Map<String, String> jobInfo = storage.getJobInfoByName("my-job");
|
||||
|
|
|
@ -53,7 +53,7 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
|
|||
@Override
|
||||
public Path getJobCheckpointDir(String jobName, boolean create)
|
||||
throws IOException {
|
||||
return null;
|
||||
return new Path("s3://generated_checkpoint_dir");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,4 +80,9 @@ public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
|
|||
public FileSystem getFileSystem() throws IOException {
|
||||
return FileSystem.getLocal(new Configuration());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getUserRootFolder() throws IOException {
|
||||
return new Path("s3://generated_root_dir");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue