MAPREDUCE-6365. Refactor JobResourceUploader#uploadFilesInternal (Chris Trezzo via sjlee)

(cherry picked from commit 8f0d3d69d6)
This commit is contained in:
Sangjin Lee 2016-07-19 20:15:37 -07:00
parent 5641c28c1c
commit a7e533444c
2 changed files with 40 additions and 24 deletions

View File

@ -54,7 +54,7 @@ class JobResourceUploader {
* @param submitJobDir the submission directory of the job
* @throws IOException
*/
public void uploadFiles(Job job, Path submitJobDir) throws IOException {
public void uploadResources(Job job, Path submitJobDir) throws IOException {
Configuration conf = job.getConfiguration();
short replication =
(short) conf.getInt(Job.SUBMIT_REPLICATION,
@ -66,12 +66,6 @@ class JobResourceUploader {
+ "with ToolRunner to remedy this.");
}
// get all the command line arguments passed in by the user conf
String files = conf.get("tmpfiles");
String libjars = conf.get("tmpjars");
String archives = conf.get("tmparchives");
String jobJar = job.getJar();
//
// Figure out what fs the JobTracker is using. Copy the
// job to it, under a temporary name. This allows DFS to work,
@ -92,12 +86,27 @@ class JobResourceUploader {
FsPermission mapredSysPerms =
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
// add all the command line files/ jars and archive
// first copy them to jobtrackers filesystem
uploadFiles(conf, submitJobDir, mapredSysPerms, replication);
uploadLibJars(conf, submitJobDir, mapredSysPerms, replication);
uploadArchives(conf, submitJobDir, mapredSysPerms, replication);
uploadJobJar(job, submitJobDir, replication);
addLog4jToDistributedCache(job, submitJobDir);
// set the timestamps of the archives and files
// set the public/private visibility of the archives and files
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
// get DelegationToken for cached file
ClientDistributedCacheManager.getDelegationTokens(conf,
job.getCredentials());
}
private void uploadFiles(Configuration conf, Path submitJobDir,
FsPermission mapredSysPerms, short submitReplication) throws IOException {
String files = conf.get("tmpfiles");
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
if (files != null) {
FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
String[] fileArr = files.split(",");
@ -109,7 +118,7 @@ class JobResourceUploader {
throw new IllegalArgumentException(e);
}
Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
Path newPath = copyRemoteFiles(filesDir, tmp, conf, submitReplication);
try {
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheFile(pathURI, conf);
@ -119,13 +128,19 @@ class JobResourceUploader {
}
}
}
}
private void uploadLibJars(Configuration conf, Path submitJobDir,
FsPermission mapredSysPerms, short submitReplication) throws IOException {
String libjars = conf.get("tmpjars");
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
if (libjars != null) {
FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
String[] libjarsArr = libjars.split(",");
for (String tmpjars : libjarsArr) {
Path tmp = new Path(tmpjars);
Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
Path newPath =
copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
// Add each file to the classpath
DistributedCache.addFileToClassPath(
@ -140,7 +155,12 @@ class JobResourceUploader {
DistributedCache.addCacheFile(libJarsDirWildcard.toUri(), conf);
}
}
}
private void uploadArchives(Configuration conf, Path submitJobDir,
FsPermission mapredSysPerms, short submitReplication) throws IOException {
String archives = conf.get("tmparchives");
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
if (archives != null) {
FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
String[] archivesArr = archives.split(",");
@ -152,7 +172,8 @@ class JobResourceUploader {
throw new IllegalArgumentException(e);
}
Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(archivesDir, tmp, conf, replication);
Path newPath =
copyRemoteFiles(archivesDir, tmp, conf, submitReplication);
try {
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheArchive(pathURI, conf);
@ -162,7 +183,11 @@ class JobResourceUploader {
}
}
}
}
private void uploadJobJar(Job job, Path submitJobDir, short submitReplication)
throws IOException {
String jobJar = job.getJar();
if (jobJar != null) { // copy jar to JobTracker's fs
// use jar name if job is not named.
if ("".equals(job.getJobName())) {
@ -174,22 +199,13 @@ class JobResourceUploader {
// we don't need to copy it from local fs
if (jobJarURI.getScheme() == null || jobJarURI.getScheme().equals("file")) {
copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
replication);
submitReplication);
job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
}
} else {
LOG.warn("No job jar file set. User classes may not be found. "
+ "See Job or Job#setJar(String).");
}
addLog4jToDistributedCache(job, submitJobDir);
// set the timestamps of the archives and files
// set the public/private visibility of the archives and files
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
// get DelegationToken for cached file
ClientDistributedCacheManager.getDelegationTokens(conf,
job.getCredentials());
}
// copies a file to the jobtracker filesystem and returns the path where it

View File

@ -99,7 +99,7 @@ class JobSubmitter {
Job.DEFAULT_USE_WILDCARD_FOR_LIBJARS);
JobResourceUploader rUploader = new JobResourceUploader(jtFs, useWildcards);
rUploader.uploadFiles(job, jobSubmitDir);
rUploader.uploadResources(job, jobSubmitDir);
// Get the working directory. If not set, sets it to filesystem working dir
// This code has been added so that working directory reset before running