MAPREDUCE-5951. Add support for the YARN Shared Cache.
(cherry picked from commit e46d5bb962
)
This commit is contained in:
parent
97b7d39a5f
commit
24b03eb79f
|
@ -48,6 +48,7 @@ import org.apache.hadoop.mapred.JobACLsManager;
|
|||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.TaskCompletionEvent;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
|
@ -1414,6 +1415,19 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
new char[] {'"', '=', '.'});
|
||||
}
|
||||
|
||||
/*
|
||||
* The goal is to make sure only the NM that hosts MRAppMaster will upload
|
||||
* resources to shared cache. Clean up the shared cache policies for all
|
||||
* resources so that later when TaskAttemptImpl creates
|
||||
* ContainerLaunchContext, LocalResource.setShouldBeUploadedToSharedCache will
|
||||
* be set up to false. In that way, the NMs that host the task containers
|
||||
* won't try to upload the resources to shared cache.
|
||||
*/
|
||||
private static void cleanupSharedCacheUploadPolicies(Configuration conf) {
|
||||
Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap());
|
||||
Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap());
|
||||
}
|
||||
|
||||
public static class InitTransition
|
||||
implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
|
||||
|
||||
|
@ -1492,6 +1506,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
job.allowedReduceFailuresPercent =
|
||||
job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
|
||||
|
||||
cleanupSharedCacheUploadPolicies(job.conf);
|
||||
|
||||
// create the Tasks but don't start them yet
|
||||
createMapTasks(job, inputLength, taskSplitMetaInfo);
|
||||
createReduceTasks(job);
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
|||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
|
@ -708,17 +710,38 @@ public abstract class TaskAttemptImpl implements
|
|||
|
||||
/**
|
||||
* Create a {@link LocalResource} record with all the given parameters.
|
||||
* The NM that hosts AM container will upload resources to shared cache.
|
||||
* Thus there is no need to ask task container's NM to upload the
|
||||
* resources to shared cache. Set the shared cache upload policy to
|
||||
* false.
|
||||
*/
|
||||
private static LocalResource createLocalResource(FileSystem fc, Path file,
|
||||
LocalResourceType type, LocalResourceVisibility visibility)
|
||||
throws IOException {
|
||||
String fileSymlink, LocalResourceType type,
|
||||
LocalResourceVisibility visibility) throws IOException {
|
||||
FileStatus fstat = fc.getFileStatus(file);
|
||||
URL resourceURL = URL.fromPath(fc.resolvePath(fstat.getPath()));
|
||||
// We need to be careful when converting from path to URL to add a fragment
|
||||
// so that the symlink name when localized will be correct.
|
||||
Path qualifiedPath = fc.resolvePath(fstat.getPath());
|
||||
URI uriWithFragment = null;
|
||||
boolean useFragment = fileSymlink != null && !fileSymlink.equals("");
|
||||
try {
|
||||
if (useFragment) {
|
||||
uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink);
|
||||
} else {
|
||||
uriWithFragment = qualifiedPath.toUri();
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException(
|
||||
"Error parsing local resource path."
|
||||
+ " Path was not able to be converted to a URI: " + qualifiedPath,
|
||||
e);
|
||||
}
|
||||
URL resourceURL = URL.fromURI(uriWithFragment);
|
||||
long resourceSize = fstat.getLen();
|
||||
long resourceModificationTime = fstat.getModificationTime();
|
||||
|
||||
return LocalResource.newInstance(resourceURL, type, visibility,
|
||||
resourceSize, resourceModificationTime);
|
||||
resourceSize, resourceModificationTime, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -829,8 +852,18 @@ public abstract class TaskAttemptImpl implements
|
|||
final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf);
|
||||
Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(),
|
||||
jobJarFs.getWorkingDirectory());
|
||||
LocalResource rc = createLocalResource(jobJarFs, remoteJobJar,
|
||||
LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION);
|
||||
LocalResourceVisibility jobJarViz =
|
||||
conf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY,
|
||||
MRJobConfig.JOBJAR_VISIBILITY_DEFAULT)
|
||||
? LocalResourceVisibility.PUBLIC
|
||||
: LocalResourceVisibility.APPLICATION;
|
||||
// We hard code the job.jar localized symlink in the container directory.
|
||||
// This is because the mapreduce app expects the job.jar to be named
|
||||
// accordingly. Additionally we set the shared cache upload policy to
|
||||
// false. Resources are uploaded by the AM if necessary.
|
||||
LocalResource rc =
|
||||
createLocalResource(jobJarFs, remoteJobJar, MRJobConfig.JOB_JAR,
|
||||
LocalResourceType.PATTERN, jobJarViz);
|
||||
String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
|
||||
JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
|
||||
rc.setPattern(pattern);
|
||||
|
@ -855,9 +888,12 @@ public abstract class TaskAttemptImpl implements
|
|||
Path remoteJobConfPath =
|
||||
new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
|
||||
FileSystem remoteFS = FileSystem.get(conf);
|
||||
// There is no point to ask task container's NM to upload the resource
|
||||
// to shared cache (job conf is not shared). Therefore, createLocalResource
|
||||
// will set the shared cache upload policy to false
|
||||
localResources.put(MRJobConfig.JOB_CONF_FILE,
|
||||
createLocalResource(remoteFS, remoteJobConfPath, LocalResourceType.FILE,
|
||||
LocalResourceVisibility.APPLICATION));
|
||||
createLocalResource(remoteFS, remoteJobConfPath, null,
|
||||
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
|
||||
LOG.info("The job-conf file on the remote FS is "
|
||||
+ remoteJobConfPath.toUri().toASCIIString());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,169 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.InvalidJobConfException;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
|
||||
/**
|
||||
* Helper class for MR applications that parses distributed cache artifacts and
|
||||
* creates a map of LocalResources.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Private
|
||||
@Unstable
|
||||
class LocalResourceBuilder {
|
||||
public static final Log LOG = LogFactory.getLog(LocalResourceBuilder.class);
|
||||
|
||||
private Configuration conf;
|
||||
private LocalResourceType type;
|
||||
private URI[] uris;
|
||||
private long[] timestamps;
|
||||
private long[] sizes;
|
||||
private boolean[] visibilities;
|
||||
private Map<String, Boolean> sharedCacheUploadPolicies;
|
||||
|
||||
LocalResourceBuilder() {
|
||||
}
|
||||
|
||||
void setConf(Configuration c) {
|
||||
this.conf = c;
|
||||
}
|
||||
|
||||
void setType(LocalResourceType t) {
|
||||
this.type = t;
|
||||
}
|
||||
|
||||
void setUris(URI[] u) {
|
||||
this.uris = u;
|
||||
}
|
||||
|
||||
void setTimestamps(long[] t) {
|
||||
this.timestamps = t;
|
||||
}
|
||||
|
||||
void setSizes(long[] s) {
|
||||
this.sizes = s;
|
||||
}
|
||||
|
||||
void setVisibilities(boolean[] v) {
|
||||
this.visibilities = v;
|
||||
}
|
||||
|
||||
void setSharedCacheUploadPolicies(Map<String, Boolean> policies) {
|
||||
this.sharedCacheUploadPolicies = policies;
|
||||
}
|
||||
|
||||
void createLocalResources(Map<String, LocalResource> localResources)
|
||||
throws IOException {
|
||||
|
||||
if (uris != null) {
|
||||
// Sanity check
|
||||
if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
|
||||
(uris.length != visibilities.length)) {
|
||||
throw new IllegalArgumentException("Invalid specification for " +
|
||||
"distributed-cache artifacts of type " + type + " :" +
|
||||
" #uris=" + uris.length +
|
||||
" #timestamps=" + timestamps.length +
|
||||
" #visibilities=" + visibilities.length
|
||||
);
|
||||
}
|
||||
|
||||
for (int i = 0; i < uris.length; ++i) {
|
||||
URI u = uris[i];
|
||||
Path p = new Path(u);
|
||||
FileSystem remoteFS = p.getFileSystem(conf);
|
||||
String linkName = null;
|
||||
|
||||
if (p.getName().equals(DistributedCache.WILDCARD)) {
|
||||
p = p.getParent();
|
||||
linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD;
|
||||
}
|
||||
|
||||
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
|
||||
remoteFS.getWorkingDirectory()));
|
||||
|
||||
// If there's no wildcard, try using the fragment for the link
|
||||
if (linkName == null) {
|
||||
linkName = u.getFragment();
|
||||
|
||||
// Because we don't know what's in the fragment, we have to handle
|
||||
// it with care.
|
||||
if (linkName != null) {
|
||||
Path linkPath = new Path(linkName);
|
||||
|
||||
if (linkPath.isAbsolute()) {
|
||||
throw new IllegalArgumentException("Resource name must be "
|
||||
+ "relative");
|
||||
}
|
||||
|
||||
linkName = linkPath.toUri().getPath();
|
||||
}
|
||||
} else if (u.getFragment() != null) {
|
||||
throw new IllegalArgumentException("Invalid path URI: " + p +
|
||||
" - cannot contain both a URI fragment and a wildcard");
|
||||
}
|
||||
|
||||
// If there's no wildcard or fragment, just link to the file name
|
||||
if (linkName == null) {
|
||||
linkName = p.getName();
|
||||
}
|
||||
|
||||
LocalResource orig = localResources.get(linkName);
|
||||
if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) {
|
||||
throw new InvalidJobConfException(
|
||||
getResourceDescription(orig.getType()) + orig.getResource()
|
||||
+
|
||||
" conflicts with " + getResourceDescription(type) + u);
|
||||
}
|
||||
Boolean sharedCachePolicy = sharedCacheUploadPolicies.get(u.toString());
|
||||
sharedCachePolicy =
|
||||
sharedCachePolicy == null ? Boolean.FALSE : sharedCachePolicy;
|
||||
localResources.put(linkName, LocalResource.newInstance(URL.fromURI(p
|
||||
.toUri()), type, visibilities[i] ? LocalResourceVisibility.PUBLIC
|
||||
: LocalResourceVisibility.PRIVATE,
|
||||
sizes[i], timestamps[i], sharedCachePolicy));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static String getResourceDescription(LocalResourceType type) {
|
||||
if (type == LocalResourceType.ARCHIVE
|
||||
|| type == LocalResourceType.PATTERN) {
|
||||
return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
|
||||
}
|
||||
return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
|
||||
}
|
||||
}
|
|
@ -42,7 +42,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.InvalidJobConfException;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.Task;
|
||||
import org.apache.hadoop.mapred.TaskLog;
|
||||
|
@ -67,12 +67,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.util.Apps;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
/**
|
||||
* Helper class for MR applications
|
||||
|
@ -251,10 +248,16 @@ public class MRApps extends Apps {
|
|||
if (!userClassesTakesPrecedence) {
|
||||
MRApps.setMRFrameworkClasspath(environment, conf);
|
||||
}
|
||||
/*
|
||||
* We use "*" for the name of the JOB_JAR instead of MRJobConfig.JOB_JAR for
|
||||
* the case where the job jar is not necessarily named "job.jar". This can
|
||||
* happen, for example, when the job is leveraging a resource from the YARN
|
||||
* shared cache.
|
||||
*/
|
||||
MRApps.addToEnvironment(
|
||||
environment,
|
||||
classpathEnvVar,
|
||||
MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf);
|
||||
MRJobConfig.JOB_JAR + Path.SEPARATOR + "*", conf);
|
||||
MRApps.addToEnvironment(
|
||||
environment,
|
||||
classpathEnvVar,
|
||||
|
@ -471,27 +474,32 @@ public class MRApps extends Apps {
|
|||
return startCommitFile;
|
||||
}
|
||||
|
||||
public static void setupDistributedCache(
|
||||
Configuration conf,
|
||||
Map<String, LocalResource> localResources)
|
||||
throws IOException {
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public static void setupDistributedCache(Configuration conf,
|
||||
Map<String, LocalResource> localResources) throws IOException {
|
||||
|
||||
LocalResourceBuilder lrb = new LocalResourceBuilder();
|
||||
lrb.setConf(conf);
|
||||
|
||||
// Cache archives
|
||||
parseDistributedCacheArtifacts(conf, localResources,
|
||||
LocalResourceType.ARCHIVE,
|
||||
DistributedCache.getCacheArchives(conf),
|
||||
DistributedCache.getArchiveTimestamps(conf),
|
||||
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
|
||||
DistributedCache.getArchiveVisibilities(conf));
|
||||
lrb.setType(LocalResourceType.ARCHIVE);
|
||||
lrb.setUris(DistributedCache.getCacheArchives(conf));
|
||||
lrb.setTimestamps(DistributedCache.getArchiveTimestamps(conf));
|
||||
lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES));
|
||||
lrb.setVisibilities(DistributedCache.getArchiveVisibilities(conf));
|
||||
lrb.setSharedCacheUploadPolicies(
|
||||
Job.getArchiveSharedCacheUploadPolicies(conf));
|
||||
lrb.createLocalResources(localResources);
|
||||
|
||||
// Cache files
|
||||
parseDistributedCacheArtifacts(conf,
|
||||
localResources,
|
||||
LocalResourceType.FILE,
|
||||
DistributedCache.getCacheFiles(conf),
|
||||
DistributedCache.getFileTimestamps(conf),
|
||||
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
|
||||
DistributedCache.getFileVisibilities(conf));
|
||||
lrb.setType(LocalResourceType.FILE);
|
||||
lrb.setUris(DistributedCache.getCacheFiles(conf));
|
||||
lrb.setTimestamps(DistributedCache.getFileTimestamps(conf));
|
||||
lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES));
|
||||
lrb.setVisibilities(DistributedCache.getFileVisibilities(conf));
|
||||
lrb.setSharedCacheUploadPolicies(
|
||||
Job.getFileSharedCacheUploadPolicies(conf));
|
||||
lrb.createLocalResources(localResources);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -550,89 +558,6 @@ public class MRApps extends Apps {
|
|||
}
|
||||
}
|
||||
|
||||
private static String getResourceDescription(LocalResourceType type) {
|
||||
if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) {
|
||||
return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
|
||||
}
|
||||
return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
|
||||
}
|
||||
|
||||
// TODO - Move this to MR!
|
||||
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
|
||||
// long[], boolean[], Path[], FileType)
|
||||
private static void parseDistributedCacheArtifacts(
|
||||
Configuration conf,
|
||||
Map<String, LocalResource> localResources,
|
||||
LocalResourceType type,
|
||||
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[])
|
||||
throws IOException {
|
||||
|
||||
if (uris != null) {
|
||||
// Sanity check
|
||||
if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
|
||||
(uris.length != visibilities.length)) {
|
||||
throw new IllegalArgumentException("Invalid specification for " +
|
||||
"distributed-cache artifacts of type " + type + " :" +
|
||||
" #uris=" + uris.length +
|
||||
" #timestamps=" + timestamps.length +
|
||||
" #visibilities=" + visibilities.length
|
||||
);
|
||||
}
|
||||
|
||||
for (int i = 0; i < uris.length; ++i) {
|
||||
URI u = uris[i];
|
||||
Path p = new Path(u);
|
||||
FileSystem remoteFS = p.getFileSystem(conf);
|
||||
String linkName = null;
|
||||
|
||||
if (p.getName().equals(DistributedCache.WILDCARD)) {
|
||||
p = p.getParent();
|
||||
linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD;
|
||||
}
|
||||
|
||||
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
|
||||
remoteFS.getWorkingDirectory()));
|
||||
|
||||
// If there's no wildcard, try using the fragment for the link
|
||||
if (linkName == null) {
|
||||
linkName = u.getFragment();
|
||||
|
||||
// Because we don't know what's in the fragment, we have to handle
|
||||
// it with care.
|
||||
if (linkName != null) {
|
||||
Path linkPath = new Path(linkName);
|
||||
|
||||
if (linkPath.isAbsolute()) {
|
||||
throw new IllegalArgumentException("Resource name must be "
|
||||
+ "relative");
|
||||
}
|
||||
|
||||
linkName = linkPath.toUri().getPath();
|
||||
}
|
||||
} else if (u.getFragment() != null) {
|
||||
throw new IllegalArgumentException("Invalid path URI: " + p +
|
||||
" - cannot contain both a URI fragment and a wildcard");
|
||||
}
|
||||
|
||||
// If there's no wildcard or fragment, just link to the file name
|
||||
if (linkName == null) {
|
||||
linkName = p.getName();
|
||||
}
|
||||
|
||||
LocalResource orig = localResources.get(linkName);
|
||||
if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) {
|
||||
throw new InvalidJobConfException(
|
||||
getResourceDescription(orig.getType()) + orig.getResource() +
|
||||
" conflicts with " + getResourceDescription(type) + u);
|
||||
}
|
||||
localResources.put(linkName, LocalResource
|
||||
.newInstance(URL.fromURI(p.toUri()), type, visibilities[i]
|
||||
? LocalResourceVisibility.PUBLIC : LocalResourceVisibility.PRIVATE,
|
||||
sizes[i], timestamps[i]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO - Move this to MR!
|
||||
private static long[] getFileSizes(Configuration conf, String key) {
|
||||
String[] strs = conf.getStrings(key);
|
||||
|
|
|
@ -30,6 +30,8 @@ import java.io.File;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -39,6 +41,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.PositionedReadable;
|
||||
import org.apache.hadoop.fs.Seekable;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
||||
|
@ -164,6 +167,9 @@ public class TestLocalDistributedCacheManager {
|
|||
});
|
||||
|
||||
DistributedCache.addCacheFile(file, conf);
|
||||
Map<String, Boolean> policies = new HashMap<String, Boolean>();
|
||||
policies.put(file.toString(), true);
|
||||
Job.setFileSharedCacheUploadPolicies(conf, policies);
|
||||
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101");
|
||||
conf.set(MRJobConfig.CACHE_FILES_SIZES, "201");
|
||||
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false");
|
||||
|
@ -272,6 +278,9 @@ public class TestLocalDistributedCacheManager {
|
|||
|
||||
DistributedCache.addCacheFile(file, conf);
|
||||
DistributedCache.addCacheFile(file, conf);
|
||||
Map<String, Boolean> policies = new HashMap<String, Boolean>();
|
||||
policies.put(file.toString(), true);
|
||||
Job.setFileSharedCacheUploadPolicies(conf, policies);
|
||||
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");
|
||||
conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201");
|
||||
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false");
|
||||
|
|
|
@ -260,7 +260,7 @@ public class TestMRApps {
|
|||
}
|
||||
String env_str = env.get("CLASSPATH");
|
||||
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
|
||||
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
|
||||
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/*",
|
||||
"job.jar/classes/", "job.jar/lib/*",
|
||||
ApplicationConstants.Environment.PWD.$$() + "/*"));
|
||||
assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
|
||||
|
@ -280,7 +280,7 @@ public class TestMRApps {
|
|||
}
|
||||
String env_str = env.get("CLASSPATH");
|
||||
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
|
||||
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
|
||||
Arrays.asList("job.jar/*", "job.jar/classes/", "job.jar/lib/*",
|
||||
ApplicationConstants.Environment.PWD.$$() + "/*"));
|
||||
assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
|
||||
+ " the classpath!", env_str.contains(expectedClasspath));
|
||||
|
@ -302,7 +302,7 @@ public class TestMRApps {
|
|||
assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
|
||||
cp.contains("PWD"));
|
||||
String expectedAppClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
|
||||
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar",
|
||||
Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/*",
|
||||
"job.jar/classes/", "job.jar/lib/*",
|
||||
ApplicationConstants.Environment.PWD.$$() + "/*"));
|
||||
assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app"
|
||||
|
@ -331,7 +331,7 @@ public class TestMRApps {
|
|||
conf.set(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, FRAMEWORK_CLASSPATH);
|
||||
MRApps.setClasspath(env, conf);
|
||||
final String stdClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
|
||||
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
|
||||
Arrays.asList("job.jar/*", "job.jar/classes/", "job.jar/lib/*",
|
||||
ApplicationConstants.Environment.PWD.$$() + "/*"));
|
||||
String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR,
|
||||
Arrays.asList(ApplicationConstants.Environment.PWD.$$(),
|
||||
|
|
|
@ -51,6 +51,12 @@
|
|||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.skyscreamer</groupId>
|
||||
<artifactId>jsonassert</artifactId>
|
||||
|
|
|
@ -21,12 +21,17 @@ package org.apache.hadoop.mapreduce;
|
|||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configuration.IntegerRanges;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -1303,6 +1308,227 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a file to job config for shared cache processing. If shared cache is
|
||||
* enabled, it will return true, otherwise, return false. We don't check with
|
||||
* SCM here given application might not be able to provide the job id;
|
||||
* ClientSCMProtocol.use requires the application id. Job Submitter will read
|
||||
* the files from job config and take care of things.
|
||||
*
|
||||
* @param resource The resource that Job Submitter will process later using
|
||||
* shared cache.
|
||||
* @param conf Configuration to add the resource to
|
||||
* @return whether the resource has been added to the configuration
|
||||
*/
|
||||
@Unstable
|
||||
public static boolean addFileToSharedCache(URI resource, Configuration conf) {
|
||||
SharedCacheConfig scConfig = new SharedCacheConfig();
|
||||
scConfig.init(conf);
|
||||
if (scConfig.isSharedCacheFilesEnabled()) {
|
||||
String files = conf.get(MRJobConfig.FILES_FOR_SHARED_CACHE);
|
||||
conf.set(
|
||||
MRJobConfig.FILES_FOR_SHARED_CACHE,
|
||||
files == null ? resource.toString() : files + ","
|
||||
+ resource.toString());
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a file to job config for shared cache processing. If shared cache is
|
||||
* enabled, it will return true, otherwise, return false. We don't check with
|
||||
* SCM here given application might not be able to provide the job id;
|
||||
* ClientSCMProtocol.use requires the application id. Job Submitter will read
|
||||
* the files from job config and take care of things. Job Submitter will also
|
||||
* add the file to classpath. Intended to be used by user code.
|
||||
*
|
||||
* @param resource The resource that Job Submitter will process later using
|
||||
* shared cache.
|
||||
* @param conf Configuration to add the resource to
|
||||
* @return whether the resource has been added to the configuration
|
||||
*/
|
||||
@Unstable
|
||||
public static boolean addFileToSharedCacheAndClasspath(URI resource,
|
||||
Configuration conf) {
|
||||
SharedCacheConfig scConfig = new SharedCacheConfig();
|
||||
scConfig.init(conf);
|
||||
if (scConfig.isSharedCacheLibjarsEnabled()) {
|
||||
String files =
|
||||
conf.get(MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE);
|
||||
conf.set(
|
||||
MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE,
|
||||
files == null ? resource.toString() : files + ","
|
||||
+ resource.toString());
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an archive to job config for shared cache processing. If shared cache
|
||||
* is enabled, it will return true, otherwise, return false. We don't check
|
||||
* with SCM here given application might not be able to provide the job id;
|
||||
* ClientSCMProtocol.use requires the application id. Job Submitter will read
|
||||
* the files from job config and take care of things. Intended to be used by
|
||||
* user code.
|
||||
*
|
||||
* @param resource The resource that Job Submitter will process later using
|
||||
* shared cache.
|
||||
* @param conf Configuration to add the resource to
|
||||
* @return whether the resource has been added to the configuration
|
||||
*/
|
||||
@Unstable
|
||||
public static boolean addArchiveToSharedCache(URI resource,
|
||||
Configuration conf) {
|
||||
SharedCacheConfig scConfig = new SharedCacheConfig();
|
||||
scConfig.init(conf);
|
||||
if (scConfig.isSharedCacheArchivesEnabled()) {
|
||||
String files = conf.get(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE);
|
||||
conf.set(
|
||||
MRJobConfig.ARCHIVES_FOR_SHARED_CACHE,
|
||||
files == null ? resource.toString() : files + ","
|
||||
+ resource.toString());
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is to set the shared cache upload policies for files. If the parameter
|
||||
* was previously set, this method will replace the old value with the new
|
||||
* provided map.
|
||||
*
|
||||
* @param conf Configuration which stores the shared cache upload policies
|
||||
* @param policies A map containing the shared cache upload policies for a set
|
||||
* of resources. The key is the url of the resource and the value is
|
||||
* the upload policy. True if it should be uploaded, false otherwise.
|
||||
*/
|
||||
@Unstable
|
||||
public static void setFileSharedCacheUploadPolicies(Configuration conf,
|
||||
Map<String, Boolean> policies) {
|
||||
setSharedCacheUploadPolicies(conf, policies, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is to set the shared cache upload policies for archives. If the
|
||||
* parameter was previously set, this method will replace the old value with
|
||||
* the new provided map.
|
||||
*
|
||||
* @param conf Configuration which stores the shared cache upload policies
|
||||
* @param policies A map containing the shared cache upload policies for a set
|
||||
* of resources. The key is the url of the resource and the value is
|
||||
* the upload policy. True if it should be uploaded, false otherwise.
|
||||
*/
|
||||
@Unstable
|
||||
public static void setArchiveSharedCacheUploadPolicies(Configuration conf,
|
||||
Map<String, Boolean> policies) {
|
||||
setSharedCacheUploadPolicies(conf, policies, false);
|
||||
}
|
||||
|
||||
// We use a double colon because a colon is a reserved character in a URI and
|
||||
// there should not be two colons next to each other.
|
||||
private static final String DELIM = "::";
|
||||
|
||||
/**
|
||||
* Set the shared cache upload policies config parameter. This is done by
|
||||
* serializing the provided map of shared cache upload policies into a config
|
||||
* parameter. If the parameter was previously set, this method will replace
|
||||
* the old value with the new provided map.
|
||||
*
|
||||
* @param conf Configuration which stores the shared cache upload policies
|
||||
* @param policies A map containing the shared cache upload policies for a set
|
||||
* of resources. The key is the url of the resource and the value is
|
||||
* the upload policy. True if it should be uploaded, false otherwise.
|
||||
* @param areFiles True if these policies are for files, false if they are for
|
||||
* archives.
|
||||
*/
|
||||
private static void setSharedCacheUploadPolicies(Configuration conf,
|
||||
Map<String, Boolean> policies, boolean areFiles) {
|
||||
if (policies != null) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
|
||||
Map.Entry<String, Boolean> e;
|
||||
if (it.hasNext()) {
|
||||
e = it.next();
|
||||
sb.append(e.getKey() + DELIM + e.getValue());
|
||||
} else {
|
||||
// policies is an empty map, just skip setting the parameter
|
||||
return;
|
||||
}
|
||||
while (it.hasNext()) {
|
||||
e = it.next();
|
||||
sb.append("," + e.getKey() + DELIM + e.getValue());
|
||||
}
|
||||
String confParam =
|
||||
areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
|
||||
: MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
|
||||
conf.set(confParam, sb.toString());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize a map of shared cache upload policies from a config parameter.
|
||||
*
|
||||
* @param conf Configuration which stores the shared cache upload policies
|
||||
* @param areFiles True if these policies are for files, false if they are for
|
||||
* archives.
|
||||
* @return A map containing the shared cache upload policies for a set of
|
||||
* resources. The key is the url of the resource and the value is the
|
||||
* upload policy. True if it should be uploaded, false otherwise.
|
||||
*/
|
||||
private static Map<String, Boolean> getSharedCacheUploadPolicies(
|
||||
Configuration conf, boolean areFiles) {
|
||||
String confParam =
|
||||
areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
|
||||
: MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
|
||||
Collection<String> policies = conf.getStringCollection(confParam);
|
||||
String[] policy;
|
||||
Map<String, Boolean> policyMap = new LinkedHashMap<String, Boolean>();
|
||||
for (String s : policies) {
|
||||
policy = s.split(DELIM);
|
||||
if (policy.length != 2) {
|
||||
LOG.error(confParam
|
||||
+ " is mis-formatted, returning empty shared cache upload policies."
|
||||
+ " Error on [" + s + "]");
|
||||
return new LinkedHashMap<String, Boolean>();
|
||||
}
|
||||
policyMap.put(policy[0], Boolean.parseBoolean(policy[1]));
|
||||
}
|
||||
return policyMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is to get the shared cache upload policies for files.
|
||||
*
|
||||
* @param conf Configuration which stores the shared cache upload policies
|
||||
* @return A map containing the shared cache upload policies for a set of
|
||||
* resources. The key is the url of the resource and the value is the
|
||||
* upload policy. True if it should be uploaded, false otherwise.
|
||||
*/
|
||||
@Unstable
|
||||
public static Map<String, Boolean> getFileSharedCacheUploadPolicies(
|
||||
Configuration conf) {
|
||||
return getSharedCacheUploadPolicies(conf, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is to get the shared cache upload policies for archives.
|
||||
*
|
||||
* @param conf Configuration which stores the shared cache upload policies
|
||||
* @return A map containing the shared cache upload policies for a set of
|
||||
* resources. The key is the url of the resource and the value is the
|
||||
* upload policy. True if it should be uploaded, false otherwise.
|
||||
*/
|
||||
@Unstable
|
||||
public static Map<String, Boolean> getArchiveSharedCacheUploadPolicies(
|
||||
Configuration conf) {
|
||||
return getSharedCacheUploadPolicies(conf, false);
|
||||
}
|
||||
|
||||
private synchronized void connect()
|
||||
throws IOException, InterruptedException, ClassNotFoundException {
|
||||
if (cluster == null) {
|
||||
|
|
|
@ -24,12 +24,13 @@ import java.net.URISyntaxException;
|
|||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -40,30 +41,100 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
|
||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.client.api.SharedCacheClient;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
/**
|
||||
* This class is responsible for uploading resources from the client to HDFS
|
||||
* that are associated with a MapReduce job.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
class JobResourceUploader {
|
||||
protected static final Log LOG = LogFactory.getLog(JobResourceUploader.class);
|
||||
private final boolean useWildcard;
|
||||
private final FileSystem jtFs;
|
||||
private SharedCacheClient scClient = null;
|
||||
private SharedCacheConfig scConfig = new SharedCacheConfig();
|
||||
private ApplicationId appId = null;
|
||||
|
||||
JobResourceUploader(FileSystem submitFs, boolean useWildcard) {
|
||||
this.jtFs = submitFs;
|
||||
this.useWildcard = useWildcard;
|
||||
}
|
||||
|
||||
private void initSharedCache(JobID jobid, Configuration conf) {
|
||||
this.scConfig.init(conf);
|
||||
if (this.scConfig.isSharedCacheEnabled()) {
|
||||
this.scClient = createSharedCacheClient(conf);
|
||||
appId = jobIDToAppId(jobid);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* We added this method so that we could do the conversion between JobId and
|
||||
* ApplicationId for the shared cache client. This logic is very similar to
|
||||
* the org.apache.hadoop.mapreduce.TypeConverter#toYarn method. We don't use
|
||||
* that because mapreduce-client-core can not depend on
|
||||
* mapreduce-client-common.
|
||||
*/
|
||||
private ApplicationId jobIDToAppId(JobID jobId) {
|
||||
return ApplicationId.newInstance(Long.parseLong(jobId.getJtIdentifier()),
|
||||
jobId.getId());
|
||||
}
|
||||
|
||||
private void stopSharedCache() {
|
||||
if (scClient != null) {
|
||||
scClient.stop();
|
||||
scClient = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create, initialize and start a new shared cache client.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected SharedCacheClient createSharedCacheClient(Configuration conf) {
|
||||
SharedCacheClient scc = SharedCacheClient.createSharedCacheClient();
|
||||
scc.init(conf);
|
||||
scc.start();
|
||||
return scc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload and configure files, libjars, jobjars, and archives pertaining to
|
||||
* the passed job.
|
||||
*
|
||||
* <p>
|
||||
* This client will use the shared cache for libjars, files, archives and
|
||||
* jobjars if it is enabled. When shared cache is enabled, it will try to use
|
||||
* the shared cache and fall back to the default behavior when the scm isn't
|
||||
* available.
|
||||
* <p>
|
||||
* 1. For the resources that have been successfully shared, we will continue
|
||||
* to use them in a shared fashion.
|
||||
* <p>
|
||||
* 2. For the resources that weren't in the cache and need to be uploaded by
|
||||
* NM, we won't ask NM to upload them.
|
||||
*
|
||||
* @param job the job containing the files to be uploaded
|
||||
* @param submitJobDir the submission directory of the job
|
||||
* @throws IOException
|
||||
*/
|
||||
public void uploadResources(Job job, Path submitJobDir) throws IOException {
|
||||
try {
|
||||
initSharedCache(job.getJobID(), job.getConfiguration());
|
||||
uploadResourcesInternal(job, submitJobDir);
|
||||
} finally {
|
||||
stopSharedCache();
|
||||
}
|
||||
}
|
||||
|
||||
private void uploadResourcesInternal(Job job, Path submitJobDir)
|
||||
throws IOException {
|
||||
Configuration conf = job.getConfiguration();
|
||||
short replication =
|
||||
(short) conf.getInt(Job.SUBMIT_REPLICATION,
|
||||
|
@ -90,6 +161,7 @@ class JobResourceUploader {
|
|||
+ " already exists!! This is unexpected.Please check what's there in"
|
||||
+ " that directory");
|
||||
}
|
||||
// Create the submission directory for the MapReduce job.
|
||||
submitJobDir = jtFs.makeQualified(submitJobDir);
|
||||
submitJobDir = new Path(submitJobDir.toUri().getPath());
|
||||
FsPermission mapredSysPerms =
|
||||
|
@ -101,20 +173,45 @@ class JobResourceUploader {
|
|||
disableErasureCodingForPath(jtFs, submitJobDir);
|
||||
}
|
||||
|
||||
// Get the resources that have been added via command line arguments in the
|
||||
// GenericOptionsParser (i.e. files, libjars, archives).
|
||||
Collection<String> files = conf.getStringCollection("tmpfiles");
|
||||
Collection<String> libjars = conf.getStringCollection("tmpjars");
|
||||
Collection<String> archives = conf.getStringCollection("tmparchives");
|
||||
String jobJar = job.getJar();
|
||||
|
||||
// Merge resources that have been programmatically specified for the shared
|
||||
// cache via the Job API.
|
||||
files.addAll(conf.getStringCollection(MRJobConfig.FILES_FOR_SHARED_CACHE));
|
||||
libjars.addAll(conf.getStringCollection(
|
||||
MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE));
|
||||
archives.addAll(conf
|
||||
.getStringCollection(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE));
|
||||
|
||||
|
||||
Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
|
||||
checkLocalizationLimits(conf, files, libjars, archives, jobJar, statCache);
|
||||
|
||||
uploadFiles(conf, files, submitJobDir, mapredSysPerms, replication);
|
||||
uploadLibJars(conf, libjars, submitJobDir, mapredSysPerms, replication);
|
||||
uploadArchives(conf, archives, submitJobDir, mapredSysPerms, replication);
|
||||
uploadJobJar(job, jobJar, submitJobDir, replication);
|
||||
Map<String, Boolean> fileSCUploadPolicies =
|
||||
new LinkedHashMap<String, Boolean>();
|
||||
Map<String, Boolean> archiveSCUploadPolicies =
|
||||
new LinkedHashMap<String, Boolean>();
|
||||
|
||||
uploadFiles(job, files, submitJobDir, mapredSysPerms, replication,
|
||||
fileSCUploadPolicies, statCache);
|
||||
uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication,
|
||||
fileSCUploadPolicies, statCache);
|
||||
uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication,
|
||||
archiveSCUploadPolicies, statCache);
|
||||
uploadJobJar(job, jobJar, submitJobDir, replication, statCache);
|
||||
addLog4jToDistributedCache(job, submitJobDir);
|
||||
|
||||
// Note, we do not consider resources in the distributed cache for the
|
||||
// shared cache at this time. Only resources specified via the
|
||||
// GenericOptionsParser or the jobjar.
|
||||
Job.setFileSharedCacheUploadPolicies(conf, fileSCUploadPolicies);
|
||||
Job.setArchiveSharedCacheUploadPolicies(conf, archiveSCUploadPolicies);
|
||||
|
||||
// set the timestamps of the archives and files
|
||||
// set the public/private visibility of the archives and files
|
||||
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf,
|
||||
|
@ -125,9 +222,11 @@ class JobResourceUploader {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void uploadFiles(Configuration conf, Collection<String> files,
|
||||
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
|
||||
void uploadFiles(Job job, Collection<String> files,
|
||||
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication,
|
||||
Map<String, Boolean> fileSCUploadPolicies, Map<URI, FileStatus> statCache)
|
||||
throws IOException {
|
||||
Configuration conf = job.getConfiguration();
|
||||
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
|
||||
if (!files.isEmpty()) {
|
||||
mkdirs(jtFs, filesDir, mapredSysPerms);
|
||||
|
@ -140,17 +239,33 @@ class JobResourceUploader {
|
|||
+ " Argument must be a valid URI: " + tmpFile, e);
|
||||
}
|
||||
Path tmp = new Path(tmpURI);
|
||||
Path newPath = copyRemoteFiles(filesDir, tmp, conf, submitReplication);
|
||||
try {
|
||||
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
|
||||
DistributedCache.addCacheFile(pathURI, conf);
|
||||
} catch (URISyntaxException ue) {
|
||||
// should not throw a uri exception
|
||||
throw new IOException(
|
||||
"Failed to create a URI (URISyntaxException) for the remote path "
|
||||
+ newPath + ". This was based on the files parameter: "
|
||||
+ tmpFile,
|
||||
ue);
|
||||
URI newURI = null;
|
||||
boolean uploadToSharedCache = false;
|
||||
if (scConfig.isSharedCacheFilesEnabled()) {
|
||||
newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
|
||||
if (newURI == null) {
|
||||
uploadToSharedCache = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (newURI == null) {
|
||||
Path newPath =
|
||||
copyRemoteFiles(filesDir, tmp, conf, submitReplication);
|
||||
try {
|
||||
newURI = getPathURI(newPath, tmpURI.getFragment());
|
||||
} catch (URISyntaxException ue) {
|
||||
// should not throw a uri exception
|
||||
throw new IOException(
|
||||
"Failed to create a URI (URISyntaxException) for the"
|
||||
+ " remote path " + newPath
|
||||
+ ". This was based on the files parameter: " + tmpFile,
|
||||
ue);
|
||||
}
|
||||
}
|
||||
|
||||
job.addCacheFile(newURI);
|
||||
if (scConfig.isSharedCacheFilesEnabled()) {
|
||||
fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -159,9 +274,11 @@ class JobResourceUploader {
|
|||
// Suppress warning for use of DistributedCache (it is everywhere).
|
||||
@SuppressWarnings("deprecation")
|
||||
@VisibleForTesting
|
||||
void uploadLibJars(Configuration conf, Collection<String> libjars,
|
||||
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
|
||||
void uploadLibJars(Job job, Collection<String> libjars, Path submitJobDir,
|
||||
FsPermission mapredSysPerms, short submitReplication,
|
||||
Map<String, Boolean> fileSCUploadPolicies, Map<URI, FileStatus> statCache)
|
||||
throws IOException {
|
||||
Configuration conf = job.getConfiguration();
|
||||
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
|
||||
if (!libjars.isEmpty()) {
|
||||
mkdirs(jtFs, libjarsDir, mapredSysPerms);
|
||||
|
@ -176,23 +293,53 @@ class JobResourceUploader {
|
|||
+ " Argument must be a valid URI: " + tmpjars, e);
|
||||
}
|
||||
Path tmp = new Path(tmpURI);
|
||||
Path newPath =
|
||||
copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
|
||||
try {
|
||||
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
|
||||
if (!foundFragment) {
|
||||
foundFragment = pathURI.getFragment() != null;
|
||||
URI newURI = null;
|
||||
boolean uploadToSharedCache = false;
|
||||
boolean fromSharedCache = false;
|
||||
if (scConfig.isSharedCacheLibjarsEnabled()) {
|
||||
newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
|
||||
if (newURI == null) {
|
||||
uploadToSharedCache = true;
|
||||
} else {
|
||||
fromSharedCache = true;
|
||||
}
|
||||
DistributedCache.addFileToClassPath(new Path(pathURI.getPath()), conf,
|
||||
jtFs, false);
|
||||
libjarURIs.add(pathURI);
|
||||
} catch (URISyntaxException ue) {
|
||||
// should not throw a uri exception
|
||||
throw new IOException(
|
||||
"Failed to create a URI (URISyntaxException) for the remote path "
|
||||
+ newPath + ". This was based on the libjar parameter: "
|
||||
+ tmpjars,
|
||||
ue);
|
||||
}
|
||||
|
||||
if (newURI == null) {
|
||||
Path newPath =
|
||||
copyRemoteFiles(libjarsDir, tmp, conf, submitReplication);
|
||||
try {
|
||||
newURI = getPathURI(newPath, tmpURI.getFragment());
|
||||
} catch (URISyntaxException ue) {
|
||||
// should not throw a uri exception
|
||||
throw new IOException(
|
||||
"Failed to create a URI (URISyntaxException) for the"
|
||||
+ " remote path " + newPath
|
||||
+ ". This was based on the libjar parameter: " + tmpjars,
|
||||
ue);
|
||||
}
|
||||
}
|
||||
|
||||
if (!foundFragment) {
|
||||
// We do not count shared cache paths containing fragments as a
|
||||
// "foundFragment." This is because these resources are not in the
|
||||
// staging directory and will be added to the distributed cache
|
||||
// separately.
|
||||
foundFragment = (newURI.getFragment() != null) && !fromSharedCache;
|
||||
}
|
||||
DistributedCache.addFileToClassPath(new Path(newURI.getPath()), conf,
|
||||
jtFs, false);
|
||||
if (fromSharedCache) {
|
||||
// We simply add this URI to the distributed cache. It will not come
|
||||
// from the staging directory (it is in the shared cache), so we
|
||||
// must add it to the cache regardless of the wildcard feature.
|
||||
DistributedCache.addCacheFile(newURI, conf);
|
||||
} else {
|
||||
libjarURIs.add(newURI);
|
||||
}
|
||||
|
||||
if (scConfig.isSharedCacheLibjarsEnabled()) {
|
||||
fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -210,9 +357,11 @@ class JobResourceUploader {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void uploadArchives(Configuration conf, Collection<String> archives,
|
||||
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication)
|
||||
throws IOException {
|
||||
void uploadArchives(Job job, Collection<String> archives,
|
||||
Path submitJobDir, FsPermission mapredSysPerms, short submitReplication,
|
||||
Map<String, Boolean> archiveSCUploadPolicies,
|
||||
Map<URI, FileStatus> statCache) throws IOException {
|
||||
Configuration conf = job.getConfiguration();
|
||||
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
|
||||
if (!archives.isEmpty()) {
|
||||
mkdirs(jtFs, archivesDir, mapredSysPerms);
|
||||
|
@ -225,18 +374,34 @@ class JobResourceUploader {
|
|||
+ " Argument must be a valid URI: " + tmpArchives, e);
|
||||
}
|
||||
Path tmp = new Path(tmpURI);
|
||||
Path newPath =
|
||||
copyRemoteFiles(archivesDir, tmp, conf, submitReplication);
|
||||
try {
|
||||
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
|
||||
DistributedCache.addCacheArchive(pathURI, conf);
|
||||
} catch (URISyntaxException ue) {
|
||||
// should not throw an uri excpetion
|
||||
throw new IOException(
|
||||
"Failed to create a URI (URISyntaxException) for the remote path"
|
||||
+ newPath + ". This was based on the archive parameter: "
|
||||
+ tmpArchives,
|
||||
ue);
|
||||
URI newURI = null;
|
||||
boolean uploadToSharedCache = false;
|
||||
if (scConfig.isSharedCacheArchivesEnabled()) {
|
||||
newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true);
|
||||
if (newURI == null) {
|
||||
uploadToSharedCache = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (newURI == null) {
|
||||
Path newPath =
|
||||
copyRemoteFiles(archivesDir, tmp, conf, submitReplication);
|
||||
try {
|
||||
newURI = getPathURI(newPath, tmpURI.getFragment());
|
||||
} catch (URISyntaxException ue) {
|
||||
// should not throw a uri exception
|
||||
throw new IOException(
|
||||
"Failed to create a URI (URISyntaxException) for the"
|
||||
+ " remote path " + newPath
|
||||
+ ". This was based on the archive parameter: "
|
||||
+ tmpArchives,
|
||||
ue);
|
||||
}
|
||||
}
|
||||
|
||||
job.addCacheArchive(newURI);
|
||||
if (scConfig.isSharedCacheArchivesEnabled()) {
|
||||
archiveSCUploadPolicies.put(newURI.toString(), uploadToSharedCache);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -244,7 +409,9 @@ class JobResourceUploader {
|
|||
|
||||
@VisibleForTesting
|
||||
void uploadJobJar(Job job, String jobJar, Path submitJobDir,
|
||||
short submitReplication) throws IOException {
|
||||
short submitReplication, Map<URI, FileStatus> statCache)
|
||||
throws IOException {
|
||||
Configuration conf = job.getConfiguration();
|
||||
if (jobJar != null) { // copy jar to JobTracker's fs
|
||||
// use jar name if job is not named.
|
||||
if ("".equals(job.getJobName())) {
|
||||
|
@ -252,12 +419,59 @@ class JobResourceUploader {
|
|||
}
|
||||
Path jobJarPath = new Path(jobJar);
|
||||
URI jobJarURI = jobJarPath.toUri();
|
||||
// If the job jar is already in a global fs,
|
||||
// we don't need to copy it from local fs
|
||||
if (jobJarURI.getScheme() == null || jobJarURI.getScheme().equals("file")) {
|
||||
copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
|
||||
submitReplication);
|
||||
job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
|
||||
Path newJarPath = null;
|
||||
boolean uploadToSharedCache = false;
|
||||
if (jobJarURI.getScheme() == null ||
|
||||
jobJarURI.getScheme().equals("file")) {
|
||||
// job jar is on the local file system
|
||||
if (scConfig.isSharedCacheJobjarEnabled()) {
|
||||
// We must have a qualified path for the shared cache client. We can
|
||||
// assume this is for the local filesystem
|
||||
jobJarPath = FileSystem.getLocal(conf).makeQualified(jobJarPath);
|
||||
// Don't add a resource name here because the resource name (i.e.
|
||||
// job.jar directory symlink) will always be hard coded to job.jar for
|
||||
// the job.jar
|
||||
URI newURI =
|
||||
useSharedCache(jobJarPath.toUri(), null, statCache, conf, false);
|
||||
if (newURI == null) {
|
||||
uploadToSharedCache = true;
|
||||
} else {
|
||||
newJarPath = stringToPath(newURI.toString());
|
||||
// The job jar is coming from the shared cache (i.e. a public
|
||||
// place), so we want the job.jar to have a public visibility.
|
||||
conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true);
|
||||
}
|
||||
}
|
||||
if (newJarPath == null) {
|
||||
newJarPath = JobSubmissionFiles.getJobJar(submitJobDir);
|
||||
copyJar(jobJarPath, newJarPath, submitReplication);
|
||||
}
|
||||
} else {
|
||||
// job jar is in a remote file system
|
||||
if (scConfig.isSharedCacheJobjarEnabled()) {
|
||||
// Don't add a resource name here because the resource name (i.e.
|
||||
// job.jar directory symlink) will always be hard coded to job.jar for
|
||||
// the job.jar
|
||||
URI newURI = useSharedCache(jobJarURI, null, statCache, conf, false);
|
||||
if (newURI == null) {
|
||||
uploadToSharedCache = true;
|
||||
newJarPath = jobJarPath;
|
||||
} else {
|
||||
newJarPath = stringToPath(newURI.toString());
|
||||
// The job jar is coming from the shared cache (i.e. a public
|
||||
// place), so we want the job.jar to have a public visibility.
|
||||
conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true);
|
||||
}
|
||||
} else {
|
||||
// we don't need to upload the jobjar to the staging directory because
|
||||
// it is already in an accessible place
|
||||
newJarPath = jobJarPath;
|
||||
}
|
||||
}
|
||||
job.setJar(newJarPath.toString());
|
||||
if (scConfig.isSharedCacheJobjarEnabled()) {
|
||||
conf.setBoolean(MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY,
|
||||
uploadToSharedCache);
|
||||
}
|
||||
} else {
|
||||
LOG.warn("No job jar file set. User classes may not be found. "
|
||||
|
@ -267,7 +481,9 @@ class JobResourceUploader {
|
|||
|
||||
/**
|
||||
* Verify that the resources this job is going to localize are within the
|
||||
* localization limits.
|
||||
* localization limits. We count all resources towards these limits regardless
|
||||
* of where they are coming from (i.e. local, distributed cache, or shared
|
||||
* cache).
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void checkLocalizationLimits(Configuration conf, Collection<String> files,
|
||||
|
@ -464,6 +680,80 @@ class JobResourceUploader {
|
|||
return newPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checksum a local resource file and call use for that resource with the scm.
|
||||
*/
|
||||
private URI useSharedCache(URI sourceFile, String resourceName,
|
||||
Map<URI, FileStatus> statCache, Configuration conf, boolean honorFragment)
|
||||
throws IOException {
|
||||
if (scClient == null) {
|
||||
return null;
|
||||
}
|
||||
Path filePath = new Path(sourceFile);
|
||||
if (getFileStatus(statCache, conf, filePath).isDirectory()) {
|
||||
LOG.warn("Shared cache does not support directories"
|
||||
+ " (see YARN-6097)." + " Will not upload " + filePath
|
||||
+ " to the shared cache.");
|
||||
return null;
|
||||
}
|
||||
|
||||
String rn = resourceName;
|
||||
if (honorFragment) {
|
||||
if (sourceFile.getFragment() != null) {
|
||||
rn = sourceFile.getFragment();
|
||||
}
|
||||
}
|
||||
|
||||
// If for whatever reason, we can't even calculate checksum for
|
||||
// a resource, something is really wrong with the file system;
|
||||
// even non-SCM approach won't work. Let us just throw the exception.
|
||||
String checksum = scClient.getFileChecksum(filePath);
|
||||
URL url = null;
|
||||
try {
|
||||
url = scClient.use(this.appId, checksum);
|
||||
} catch (YarnException e) {
|
||||
LOG.warn("Error trying to contact the shared cache manager,"
|
||||
+ " disabling the SCMClient for the rest of this job submission", e);
|
||||
/*
|
||||
* If we fail to contact the SCM, we do not use it for the rest of this
|
||||
* JobResourceUploader's life. This prevents us from having to timeout
|
||||
* each time we try to upload a file while the SCM is unavailable. Instead
|
||||
* we timeout/error the first time and quickly revert to the default
|
||||
* behavior without the shared cache. We do this by stopping the shared
|
||||
* cache client and setting it to null.
|
||||
*/
|
||||
stopSharedCache();
|
||||
}
|
||||
|
||||
if (url != null) {
|
||||
// Because we deal with URI's in mapreduce, we need to convert the URL to
|
||||
// a URI and add a fragment if necessary.
|
||||
URI uri = null;
|
||||
try {
|
||||
String name = new Path(url.getFile()).getName();
|
||||
if (rn != null && !name.equals(rn)) {
|
||||
// A name was specified that is different then the URL in the shared
|
||||
// cache. Therefore, we need to set the fragment portion of the URI to
|
||||
// preserve the user's desired name. We assume that there is no
|
||||
// existing fragment in the URL since the shared cache manager does
|
||||
// not use fragments.
|
||||
uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(),
|
||||
url.getPort(), url.getFile(), null, rn);
|
||||
} else {
|
||||
uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(),
|
||||
url.getPort(), url.getFile(), null, null);
|
||||
}
|
||||
return uri;
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.warn("Error trying to convert URL received from shared cache to"
|
||||
+ " a URI: " + url.toString());
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void copyJar(Path originalJarPath, Path submitJarFile,
|
||||
short replication) throws IOException {
|
||||
|
|
|
@ -193,6 +193,77 @@ public interface MRJobConfig {
|
|||
|
||||
public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
|
||||
|
||||
/**
|
||||
* This parameter controls the visibility of the localized job jar on the node
|
||||
* manager. If set to true, the visibility will be set to
|
||||
* LocalResourceVisibility.PUBLIC. If set to false, the visibility will be set
|
||||
* to LocalResourceVisibility.APPLICATION. This is a generated parameter and
|
||||
* should not be set manually via config files.
|
||||
*/
|
||||
String JOBJAR_VISIBILITY = "mapreduce.job.jobjar.visibility";
|
||||
boolean JOBJAR_VISIBILITY_DEFAULT = false;
|
||||
|
||||
/**
|
||||
* This is a generated parameter and should not be set manually via config
|
||||
* files.
|
||||
*/
|
||||
String JOBJAR_SHARED_CACHE_UPLOAD_POLICY =
|
||||
"mapreduce.job.jobjar.sharedcache.uploadpolicy";
|
||||
boolean JOBJAR_SHARED_CACHE_UPLOAD_POLICY_DEFAULT = false;
|
||||
|
||||
/**
|
||||
* This is a generated parameter and should not be set manually via config
|
||||
* files.
|
||||
*/
|
||||
String CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES =
|
||||
"mapreduce.job.cache.files.sharedcache.uploadpolicies";
|
||||
|
||||
/**
|
||||
* This is a generated parameter and should not be set manually via config
|
||||
* files.
|
||||
*/
|
||||
String CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES =
|
||||
"mapreduce.job.cache.archives.sharedcache.uploadpolicies";
|
||||
|
||||
/**
|
||||
* A comma delimited list of file resources that are needed for this MapReduce
|
||||
* job. These resources, if the files resource type is enabled, should either
|
||||
* use the shared cache or be added to the shared cache. This parameter can be
|
||||
* modified programmatically using the MapReduce Job api.
|
||||
*/
|
||||
String FILES_FOR_SHARED_CACHE = "mapreduce.job.cache.sharedcache.files";
|
||||
|
||||
/**
|
||||
* A comma delimited list of libjar resources that are needed for this
|
||||
* MapReduce job. These resources, if the libjars resource type is enabled,
|
||||
* should either use the shared cache or be added to the shared cache. These
|
||||
* resources will also be added to the classpath of all tasks for this
|
||||
* MapReduce job. This parameter can be modified programmatically using the
|
||||
* MapReduce Job api.
|
||||
*/
|
||||
String FILES_FOR_CLASSPATH_AND_SHARED_CACHE =
|
||||
"mapreduce.job.cache.sharedcache.files.addtoclasspath";
|
||||
|
||||
/**
|
||||
* A comma delimited list of archive resources that are needed for this
|
||||
* MapReduce job. These resources, if the archives resource type is enabled,
|
||||
* should either use the shared cache or be added to the shared cache. This
|
||||
* parameter can be modified programmatically using the MapReduce Job api.
|
||||
*/
|
||||
String ARCHIVES_FOR_SHARED_CACHE =
|
||||
"mapreduce.job.cache.sharedcache.archives";
|
||||
|
||||
/**
|
||||
* A comma delimited list of resource categories that are enabled for the
|
||||
* shared cache. If a category is enabled, resources in that category will be
|
||||
* uploaded to the shared cache. The valid categories are: jobjar, libjars,
|
||||
* files, archives. If "disabled" is specified then all categories are
|
||||
* disabled. If "enabled" is specified then all categories are enabled.
|
||||
*/
|
||||
String SHARED_CACHE_MODE = "mapreduce.job.sharedcache.mode";
|
||||
|
||||
String SHARED_CACHE_MODE_DEFAULT = "disabled";
|
||||
|
||||
/**
|
||||
* @deprecated Symlinks are always on and cannot be disabled.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
/**
|
||||
* A class for parsing configuration parameters associated with the shared
|
||||
* cache.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class SharedCacheConfig {
|
||||
protected static final Log LOG = LogFactory.getLog(SharedCacheConfig.class);
|
||||
|
||||
private boolean sharedCacheFilesEnabled = false;
|
||||
private boolean sharedCacheLibjarsEnabled = false;
|
||||
private boolean sharedCacheArchivesEnabled = false;
|
||||
private boolean sharedCacheJobjarEnabled = false;
|
||||
|
||||
public void init(Configuration conf) {
|
||||
if (!MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(
|
||||
MRConfig.FRAMEWORK_NAME))) {
|
||||
// Shared cache is only valid if the job runs on yarn
|
||||
return;
|
||||
}
|
||||
|
||||
if(!conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED,
|
||||
YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
Collection<String> configs = StringUtils.getTrimmedStringCollection(
|
||||
conf.get(MRJobConfig.SHARED_CACHE_MODE,
|
||||
MRJobConfig.SHARED_CACHE_MODE_DEFAULT));
|
||||
if (configs.contains("files")) {
|
||||
this.sharedCacheFilesEnabled = true;
|
||||
}
|
||||
if (configs.contains("libjars")) {
|
||||
this.sharedCacheLibjarsEnabled = true;
|
||||
}
|
||||
if (configs.contains("archives")) {
|
||||
this.sharedCacheArchivesEnabled = true;
|
||||
}
|
||||
if (configs.contains("jobjar")) {
|
||||
this.sharedCacheJobjarEnabled = true;
|
||||
}
|
||||
if (configs.contains("enabled")) {
|
||||
this.sharedCacheFilesEnabled = true;
|
||||
this.sharedCacheLibjarsEnabled = true;
|
||||
this.sharedCacheArchivesEnabled = true;
|
||||
this.sharedCacheJobjarEnabled = true;
|
||||
}
|
||||
if (configs.contains("disabled")) {
|
||||
this.sharedCacheFilesEnabled = false;
|
||||
this.sharedCacheLibjarsEnabled = false;
|
||||
this.sharedCacheArchivesEnabled = false;
|
||||
this.sharedCacheJobjarEnabled = false;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isSharedCacheFilesEnabled() {
|
||||
return sharedCacheFilesEnabled;
|
||||
}
|
||||
public boolean isSharedCacheLibjarsEnabled() {
|
||||
return sharedCacheLibjarsEnabled;
|
||||
}
|
||||
public boolean isSharedCacheArchivesEnabled() {
|
||||
return sharedCacheArchivesEnabled;
|
||||
}
|
||||
public boolean isSharedCacheJobjarEnabled() {
|
||||
return sharedCacheJobjarEnabled;
|
||||
}
|
||||
public boolean isSharedCacheEnabled() {
|
||||
return (sharedCacheFilesEnabled || sharedCacheLibjarsEnabled ||
|
||||
sharedCacheArchivesEnabled || sharedCacheJobjarEnabled);
|
||||
}
|
||||
}
|
|
@ -648,6 +648,17 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.job.sharedcache.mode</name>
|
||||
<value>disabled</value>
|
||||
<description>
|
||||
A comma delimited list of resource categories to submit to the shared cache.
|
||||
The valid categories are: jobjar, libjars, files, archives.
|
||||
If "disabled" is specified then the job submission code will not use
|
||||
the shared cache.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.input.fileinputformat.split.minsize</name>
|
||||
<value>0</value>
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
<!---
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. See accompanying LICENSE file.
|
||||
-->
|
||||
|
||||
MR Support for YARN Shared Cache
|
||||
==================
|
||||
|
||||
<!-- MACRO{toc|fromDepth=0|toDepth=2} -->
|
||||
|
||||
Overview
|
||||
-------
|
||||
|
||||
MapReduce support for the YARN shared cache allows MapReduce jobs to take advantage
|
||||
of additional resource caching. This saves network bandwidth between the job
|
||||
submission client as well as within the YARN cluster itself. This will reduce job
|
||||
submission time and overall job runtime.
|
||||
|
||||
|
||||
Enabling/Disabling the shared cache
|
||||
-------
|
||||
|
||||
First, your YARN cluster must have the shared cache service running. Please see YARN documentation
|
||||
for information on how to setup the shared cache service.
|
||||
|
||||
A MapReduce user can specify what resources are eligible to be uploaded to the shared cache
|
||||
based on resource type. This is done using a configuration parameter in mapred-site.xml:
|
||||
|
||||
```
|
||||
<property>
|
||||
<name>mapreduce.job.sharedcache.mode</name>
|
||||
<value>disabled</value>
|
||||
<description>
|
||||
A comma delimited list of resource categories to submit to the
|
||||
shared cache. The valid categories are: jobjar, libjars, files,
|
||||
archives. If "disabled" is specified then the job submission code
|
||||
will not use the shared cache.
|
||||
</description>
|
||||
</property>
|
||||
```
|
||||
|
||||
If a resource type is listed, it will check the shared cache to see if the resource is already in the
|
||||
cache. If so, it will use the cached resource, if not, it will specify that the resource needs to be
|
||||
uploaded asynchronously.
|
||||
|
||||
Specifying resources for the cache
|
||||
-------
|
||||
|
||||
A MapReduce user has 3 ways to specify resources for a MapReduce job:
|
||||
|
||||
1. **The command line via the generic options parser (i.e. -files, -archives, -libjars):** If a
|
||||
resource is specified via the command line and the resource type is enabled for the
|
||||
shared cache, that resource will use the shared cache.
|
||||
2. **The distributed cache api:** If a resource is specified via the distributed cache the
|
||||
resource will not use the shared cache regardless of if the resource type is enabled for
|
||||
the shared cache.
|
||||
3. **The shared cache api:** This is a new set of methods added to the
|
||||
org.apache.hadoop.mapreduce.Job api. It allows users to add a file to the shared cache,
|
||||
add it to the shared cache and the classpath and add an archive to the shared cache.
|
||||
These resources will be placed in the distributed cache and, if their resource type is
|
||||
enabled the client will use the shared cache as well.
|
||||
|
||||
Resource naming
|
||||
-------
|
||||
|
||||
It is important to ensure that each resource for a MapReduce job has a unique file name.
|
||||
This prevents symlink clobbering when YARN containers running MapReduce tasks are localized
|
||||
during container launch. A user can specify their own resource name by using the fragment
|
||||
portion of a URI. For example, for file resources specified on the command line, it could look
|
||||
like this:
|
||||
```
|
||||
-files /local/path/file1.txt#foo.txt,/local/path2/file1.txt#bar.txt
|
||||
```
|
||||
In the above example two files, named file1.txt, will be localized with two different names: foo.txt
|
||||
and bar.txt.
|
||||
|
||||
Resource Visibility
|
||||
-------
|
||||
|
||||
All resources in the shared cache have a PUBLIC visibility.
|
||||
|
||||
|
||||
MapReduce client behavior while the shared cache is unavailable
|
||||
-------
|
||||
|
||||
In the event that the shared cache manager is unavailable, the MapReduce client uses a fail-fast
|
||||
mechanism. If the MapReduce client fails to contact the shared cache manager, the client will
|
||||
no longer use the shared cache for the rest of that job submission. This
|
||||
prevents the MapReduce client from timing out each time it tries to check for a resource
|
||||
in the shared cache. The MapReduce client quickly reverts to the default behavior and submits a
|
||||
Job as if the shared cache was never enabled in the first place.
|
|
@ -220,7 +220,7 @@ public class TestJobResourceUploader {
|
|||
destinationPathPrefix + "tmpArchives1.tgz#tmpArchivesfragment1.tgz" };
|
||||
|
||||
private String jobjarSubmitDir = "/jobjar-submit-dir";
|
||||
private String expectedJobJar = jobjarSubmitDir + "/job.jar";
|
||||
private String basicExpectedJobJar = jobjarSubmitDir + "/job.jar";
|
||||
|
||||
@Test
|
||||
public void testPathsWithNoFragNoSchemeRelative() throws IOException {
|
||||
|
@ -236,7 +236,7 @@ public class TestJobResourceUploader {
|
|||
JobResourceUploader uploader = new StubedUploader(jConf);
|
||||
|
||||
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
|
||||
expectedArchivesNoFrags, expectedJobJar);
|
||||
expectedArchivesNoFrags, basicExpectedJobJar);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -254,7 +254,7 @@ public class TestJobResourceUploader {
|
|||
JobResourceUploader uploader = new StubedUploader(jConf);
|
||||
|
||||
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
|
||||
expectedArchivesNoFrags, expectedJobJar);
|
||||
expectedArchivesNoFrags, basicExpectedJobJar);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -272,7 +272,7 @@ public class TestJobResourceUploader {
|
|||
JobResourceUploader uploader = new StubedUploader(jConf);
|
||||
|
||||
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
|
||||
expectedArchivesWithFrags, expectedJobJar);
|
||||
expectedArchivesWithFrags, basicExpectedJobJar);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -290,7 +290,7 @@ public class TestJobResourceUploader {
|
|||
JobResourceUploader uploader = new StubedUploader(jConf);
|
||||
|
||||
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
|
||||
expectedArchivesWithFrags, expectedJobJar);
|
||||
expectedArchivesWithFrags, basicExpectedJobJar);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -308,7 +308,7 @@ public class TestJobResourceUploader {
|
|||
JobResourceUploader uploader = new StubedUploader(jConf);
|
||||
|
||||
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
|
||||
expectedArchivesWithFrags, expectedJobJar);
|
||||
expectedArchivesWithFrags, basicExpectedJobJar);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -326,7 +326,7 @@ public class TestJobResourceUploader {
|
|||
JobResourceUploader uploader = new StubedUploader(jConf);
|
||||
|
||||
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags,
|
||||
expectedArchivesNoFrags, expectedJobJar);
|
||||
expectedArchivesNoFrags, basicExpectedJobJar);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -344,7 +344,7 @@ public class TestJobResourceUploader {
|
|||
JobResourceUploader uploader = new StubedUploader(jConf, true);
|
||||
|
||||
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithWildcard,
|
||||
expectedArchivesNoFrags, expectedJobJar);
|
||||
expectedArchivesNoFrags, basicExpectedJobJar);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -362,7 +362,7 @@ public class TestJobResourceUploader {
|
|||
JobResourceUploader uploader = new StubedUploader(jConf, true);
|
||||
|
||||
runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags,
|
||||
expectedArchivesWithFrags, expectedJobJar);
|
||||
expectedArchivesWithFrags, basicExpectedJobJar);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -402,44 +402,39 @@ public class TestJobResourceUploader {
|
|||
private void runTmpResourcePathTest(JobResourceUploader uploader,
|
||||
ResourceConf rConf, JobConf jConf, String[] expectedFiles,
|
||||
String[] expectedArchives, String expectedJobJar) throws IOException {
|
||||
rConf.setupJobConf(jConf);
|
||||
// We use a pre and post job object here because we need the post job object
|
||||
// to get the new values set during uploadResources, but we need the pre job
|
||||
// to set the job jar because JobResourceUploader#uploadJobJar uses the Job
|
||||
// interface not the JobConf. The post job is automatically created in
|
||||
// validateResourcePaths.
|
||||
Job jobPre = Job.getInstance(jConf);
|
||||
uploadResources(uploader, jConf, jobPre);
|
||||
|
||||
validateResourcePaths(jConf, expectedFiles, expectedArchives,
|
||||
expectedJobJar, jobPre);
|
||||
Job job = rConf.setupJobConf(jConf);
|
||||
uploadResources(uploader, job);
|
||||
validateResourcePaths(job, expectedFiles, expectedArchives, expectedJobJar);
|
||||
}
|
||||
|
||||
private void uploadResources(JobResourceUploader uploader, JobConf jConf,
|
||||
Job job) throws IOException {
|
||||
Collection<String> files = jConf.getStringCollection("tmpfiles");
|
||||
Collection<String> libjars = jConf.getStringCollection("tmpjars");
|
||||
Collection<String> archives = jConf.getStringCollection("tmparchives");
|
||||
String jobJar = jConf.getJar();
|
||||
uploader.uploadFiles(jConf, files, new Path("/files-submit-dir"), null,
|
||||
(short) 3);
|
||||
uploader.uploadArchives(jConf, archives, new Path("/archives-submit-dir"),
|
||||
null, (short) 3);
|
||||
uploader.uploadLibJars(jConf, libjars, new Path("/libjars-submit-dir"),
|
||||
null, (short) 3);
|
||||
uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3);
|
||||
}
|
||||
|
||||
private void validateResourcePaths(JobConf jConf, String[] expectedFiles,
|
||||
String[] expectedArchives, String expectedJobJar, Job preJob)
|
||||
private void uploadResources(JobResourceUploader uploader, Job job)
|
||||
throws IOException {
|
||||
Job j = Job.getInstance(jConf);
|
||||
validateResourcePathsSub(j.getCacheFiles(), expectedFiles);
|
||||
validateResourcePathsSub(j.getCacheArchives(), expectedArchives);
|
||||
Configuration conf = job.getConfiguration();
|
||||
Collection<String> files = conf.getStringCollection("tmpfiles");
|
||||
Collection<String> libjars = conf.getStringCollection("tmpjars");
|
||||
Collection<String> archives = conf.getStringCollection("tmparchives");
|
||||
Map<URI, FileStatus> statCache = new HashMap<>();
|
||||
Map<String, Boolean> fileSCUploadPolicies = new HashMap<>();
|
||||
String jobJar = job.getJar();
|
||||
uploader.uploadFiles(job, files, new Path("/files-submit-dir"), null,
|
||||
(short) 3, fileSCUploadPolicies, statCache);
|
||||
uploader.uploadArchives(job, archives, new Path("/archives-submit-dir"),
|
||||
null, (short) 3, fileSCUploadPolicies, statCache);
|
||||
uploader.uploadLibJars(job, libjars, new Path("/libjars-submit-dir"), null,
|
||||
(short) 3, fileSCUploadPolicies, statCache);
|
||||
uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3,
|
||||
statCache);
|
||||
}
|
||||
|
||||
private void validateResourcePaths(Job job, String[] expectedFiles,
|
||||
String[] expectedArchives, String expectedJobJar)
|
||||
throws IOException {
|
||||
validateResourcePathsSub(job.getCacheFiles(), expectedFiles);
|
||||
validateResourcePathsSub(job.getCacheArchives(), expectedArchives);
|
||||
// We use a different job object here because the jobjar was set on a
|
||||
// different job object
|
||||
Assert.assertEquals("Job jar path is different than expected!",
|
||||
expectedJobJar, preJob.getJar());
|
||||
expectedJobJar, job.getJar());
|
||||
}
|
||||
|
||||
private void validateResourcePathsSub(URI[] actualURIs,
|
||||
|
@ -645,7 +640,7 @@ public class TestJobResourceUploader {
|
|||
}
|
||||
}
|
||||
|
||||
private void setupJobConf(JobConf conf) {
|
||||
private Job setupJobConf(JobConf conf) throws IOException {
|
||||
conf.set("tmpfiles",
|
||||
buildPathString("tmpFiles", this.numOfTmpFiles, ".txt"));
|
||||
conf.set("tmpjars",
|
||||
|
@ -675,6 +670,7 @@ public class TestJobResourceUploader {
|
|||
conf.setLong(MRJobConfig.MAX_RESOURCES_MB, this.maxResourcesMB);
|
||||
conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB,
|
||||
this.maxSingleResourceMB);
|
||||
return new Job(conf);
|
||||
}
|
||||
|
||||
// We always want absolute paths with a scheme in the DistributedCache, so
|
||||
|
|
|
@ -0,0 +1,365 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.jar.JarOutputStream;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipOutputStream;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.client.api.SharedCacheClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
/**
|
||||
* Tests the JobResourceUploader class with the shared cache.
|
||||
*/
|
||||
public class TestJobResourceUploaderWithSharedCache {
|
||||
protected static final Log LOG = LogFactory
|
||||
.getLog(TestJobResourceUploaderWithSharedCache.class);
|
||||
private static MiniDFSCluster dfs;
|
||||
private static FileSystem localFs;
|
||||
private static FileSystem remoteFs;
|
||||
private static Configuration conf = new Configuration();
|
||||
private static Path testRootDir;
|
||||
private static Path remoteStagingDir =
|
||||
new Path(MRJobConfig.DEFAULT_MR_AM_STAGING_DIR);
|
||||
private String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
|
||||
|
||||
@Before
|
||||
public void cleanup() throws Exception {
|
||||
remoteFs.delete(remoteStagingDir, true);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws IOException {
|
||||
// create configuration, dfs, file system
|
||||
localFs = FileSystem.getLocal(conf);
|
||||
testRootDir =
|
||||
new Path("target",
|
||||
TestJobResourceUploaderWithSharedCache.class.getName() + "-tmpDir")
|
||||
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
|
||||
dfs = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
|
||||
remoteFs = dfs.getFileSystem();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
try {
|
||||
if (localFs != null) {
|
||||
localFs.close();
|
||||
}
|
||||
if (remoteFs != null) {
|
||||
remoteFs.close();
|
||||
}
|
||||
if (dfs != null) {
|
||||
dfs.shutdown();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("IO exception in closing file system");
|
||||
ioe.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private class MyFileUploader extends JobResourceUploader {
|
||||
// The mocked SharedCacheClient that will be fed into the FileUploader
|
||||
private SharedCacheClient mockscClient = mock(SharedCacheClient.class);
|
||||
// A real client for checksum calculation
|
||||
private SharedCacheClient scClient = SharedCacheClient
|
||||
.createSharedCacheClient();
|
||||
|
||||
MyFileUploader(FileSystem submitFs, Configuration conf)
|
||||
throws IOException {
|
||||
super(submitFs, false);
|
||||
// Initialize the real client, but don't start it. We don't need or want
|
||||
// to create an actual proxy because we only use this for mocking out the
|
||||
// getFileChecksum method.
|
||||
scClient.init(conf);
|
||||
when(mockscClient.getFileChecksum(any(Path.class))).thenAnswer(
|
||||
new Answer<String>() {
|
||||
@Override
|
||||
public String answer(InvocationOnMock invocation) throws Throwable {
|
||||
Path file = (Path) invocation.getArguments()[0];
|
||||
// Use the real scClient to generate the checksum. We use an
|
||||
// answer/mock combination to avoid having to spy on a real
|
||||
// SharedCacheClient object.
|
||||
return scClient.getFileChecksum(file);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// This method is to prime the mock client with the correct checksum, so it
|
||||
// looks like a given resource is present in the shared cache.
|
||||
public void mockFileInSharedCache(Path localFile, URL remoteFile)
|
||||
throws YarnException, IOException {
|
||||
// when the resource is referenced, simply return the remote path to the
|
||||
// caller
|
||||
when(mockscClient.use(any(ApplicationId.class),
|
||||
eq(scClient.getFileChecksum(localFile)))).thenReturn(remoteFile);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SharedCacheClient createSharedCacheClient(Configuration c) {
|
||||
// Feed the mocked SharedCacheClient into the FileUploader logic
|
||||
return mockscClient;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSharedCacheDisabled() throws Exception {
|
||||
JobConf jobConf = createJobConf();
|
||||
Job job = new Job(jobConf);
|
||||
job.setJobID(new JobID("567789", 1));
|
||||
|
||||
// shared cache is disabled by default
|
||||
uploadFilesToRemoteFS(job, jobConf, 0, 0, 0, false);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSharedCacheEnabled() throws Exception {
|
||||
JobConf jobConf = createJobConf();
|
||||
jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
|
||||
Job job = new Job(jobConf);
|
||||
job.setJobID(new JobID("567789", 1));
|
||||
|
||||
// shared cache is enabled for every file type
|
||||
// the # of times SharedCacheClient.use is called should ==
|
||||
// total # of files/libjars/archive/jobjar
|
||||
uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSharedCacheEnabledWithJobJarInSharedCache()
|
||||
throws Exception {
|
||||
JobConf jobConf = createJobConf();
|
||||
jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
|
||||
Job job = new Job(jobConf);
|
||||
job.setJobID(new JobID("567789", 1));
|
||||
|
||||
// shared cache is enabled for every file type
|
||||
// the # of times SharedCacheClient.use is called should ==
|
||||
// total # of files/libjars/archive/jobjar
|
||||
uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSharedCacheArchivesAndLibjarsEnabled() throws Exception {
|
||||
JobConf jobConf = createJobConf();
|
||||
jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "archives,libjars");
|
||||
Job job = new Job(jobConf);
|
||||
job.setJobID(new JobID("567789", 1));
|
||||
|
||||
// shared cache is enabled for archives and libjars type
|
||||
// the # of times SharedCacheClient.use is called should ==
|
||||
// total # of libjars and archives
|
||||
uploadFilesToRemoteFS(job, jobConf, 5, 1, 2, true);
|
||||
}
|
||||
|
||||
private JobConf createJobConf() {
|
||||
JobConf jobConf = new JobConf();
|
||||
jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
||||
jobConf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true);
|
||||
|
||||
jobConf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, remoteFs.getUri()
|
||||
.toString());
|
||||
return jobConf;
|
||||
}
|
||||
|
||||
private Path copyToRemote(Path jar) throws IOException {
|
||||
Path remoteFile = new Path("/tmp", jar.getName());
|
||||
remoteFs.copyFromLocalFile(jar, remoteFile);
|
||||
return remoteFile;
|
||||
}
|
||||
|
||||
private void makeJarAvailableInSharedCache(Path jar,
|
||||
MyFileUploader fileUploader) throws YarnException, IOException {
|
||||
// copy file to remote file system
|
||||
Path remoteFile = copyToRemote(jar);
|
||||
// prime mocking so that it looks like this file is in the shared cache
|
||||
fileUploader.mockFileInSharedCache(jar, URL.fromPath(remoteFile));
|
||||
}
|
||||
|
||||
private void uploadFilesToRemoteFS(Job job, JobConf jobConf,
|
||||
int useCallCountExpected,
|
||||
int numOfFilesShouldBeUploadedToSharedCacheExpected,
|
||||
int numOfArchivesShouldBeUploadedToSharedCacheExpected,
|
||||
boolean jobJarInSharedCacheBeforeUpload) throws Exception {
|
||||
MyFileUploader fileUploader = new MyFileUploader(remoteFs, jobConf);
|
||||
SharedCacheConfig sharedCacheConfig = new SharedCacheConfig();
|
||||
sharedCacheConfig.init(jobConf);
|
||||
|
||||
Path firstFile = createTempFile("first-input-file", "x");
|
||||
Path secondFile = createTempFile("second-input-file", "xx");
|
||||
|
||||
// Add files to job conf via distributed cache API as well as command line
|
||||
boolean fileAdded = Job.addFileToSharedCache(firstFile.toUri(), jobConf);
|
||||
assertEquals(sharedCacheConfig.isSharedCacheFilesEnabled(), fileAdded);
|
||||
if (!fileAdded) {
|
||||
Path remoteFile = copyToRemote(firstFile);
|
||||
job.addCacheFile(remoteFile.toUri());
|
||||
}
|
||||
jobConf.set("tmpfiles", secondFile.toString());
|
||||
|
||||
// Create jars with a single file inside them.
|
||||
Path firstJar = makeJar(new Path(testRootDir, "distributed.first.jar"), 1);
|
||||
Path secondJar =
|
||||
makeJar(new Path(testRootDir, "distributed.second.jar"), 2);
|
||||
|
||||
// Verify duplicated contents can be handled properly.
|
||||
Path thirdJar = new Path(testRootDir, "distributed.third.jar");
|
||||
localFs.copyFromLocalFile(secondJar, thirdJar);
|
||||
|
||||
// make secondJar cache available
|
||||
makeJarAvailableInSharedCache(secondJar, fileUploader);
|
||||
|
||||
// Add libjars to job conf via distributed cache API as well as command
|
||||
// line
|
||||
boolean libjarAdded =
|
||||
Job.addFileToSharedCacheAndClasspath(firstJar.toUri(), jobConf);
|
||||
assertEquals(sharedCacheConfig.isSharedCacheLibjarsEnabled(), libjarAdded);
|
||||
if (!libjarAdded) {
|
||||
Path remoteJar = copyToRemote(firstJar);
|
||||
job.addFileToClassPath(remoteJar);
|
||||
}
|
||||
|
||||
jobConf.set("tmpjars", secondJar.toString() + "," + thirdJar.toString());
|
||||
|
||||
Path firstArchive = makeArchive("first-archive.zip", "first-file");
|
||||
Path secondArchive = makeArchive("second-archive.zip", "second-file");
|
||||
|
||||
// Add archives to job conf via distributed cache API as well as command
|
||||
// line
|
||||
boolean archiveAdded =
|
||||
Job.addArchiveToSharedCache(firstArchive.toUri(), jobConf);
|
||||
assertEquals(sharedCacheConfig.isSharedCacheArchivesEnabled(),
|
||||
archiveAdded);
|
||||
if (!archiveAdded) {
|
||||
Path remoteArchive = copyToRemote(firstArchive);
|
||||
job.addCacheArchive(remoteArchive.toUri());
|
||||
}
|
||||
|
||||
jobConf.set("tmparchives", secondArchive.toString());
|
||||
|
||||
// Add job jar to job conf
|
||||
Path jobJar = makeJar(new Path(testRootDir, "test-job.jar"), 4);
|
||||
if (jobJarInSharedCacheBeforeUpload) {
|
||||
makeJarAvailableInSharedCache(jobJar, fileUploader);
|
||||
}
|
||||
jobConf.setJar(jobJar.toString());
|
||||
|
||||
fileUploader.uploadResources(job, remoteStagingDir);
|
||||
|
||||
verify(fileUploader.mockscClient, times(useCallCountExpected)).use(
|
||||
any(ApplicationId.class), anyString());
|
||||
|
||||
int numOfFilesShouldBeUploadedToSharedCache = 0;
|
||||
Map<String, Boolean> filesSharedCacheUploadPolicies =
|
||||
Job.getFileSharedCacheUploadPolicies(jobConf);
|
||||
for (Boolean policy : filesSharedCacheUploadPolicies.values()) {
|
||||
if (policy) {
|
||||
numOfFilesShouldBeUploadedToSharedCache++;
|
||||
}
|
||||
}
|
||||
assertEquals(numOfFilesShouldBeUploadedToSharedCacheExpected,
|
||||
numOfFilesShouldBeUploadedToSharedCache);
|
||||
|
||||
int numOfArchivesShouldBeUploadedToSharedCache = 0;
|
||||
Map<String, Boolean> archivesSharedCacheUploadPolicies =
|
||||
Job.getArchiveSharedCacheUploadPolicies(jobConf);
|
||||
for (Boolean policy : archivesSharedCacheUploadPolicies.values()) {
|
||||
if (policy) {
|
||||
numOfArchivesShouldBeUploadedToSharedCache++;
|
||||
}
|
||||
}
|
||||
assertEquals(numOfArchivesShouldBeUploadedToSharedCacheExpected,
|
||||
numOfArchivesShouldBeUploadedToSharedCache);
|
||||
}
|
||||
|
||||
|
||||
private Path createTempFile(String filename, String contents)
|
||||
throws IOException {
|
||||
Path path = new Path(testRootDir, filename);
|
||||
FSDataOutputStream os = localFs.create(path);
|
||||
os.writeBytes(contents);
|
||||
os.close();
|
||||
localFs.setPermission(path, new FsPermission("700"));
|
||||
return path;
|
||||
}
|
||||
|
||||
private Path makeJar(Path p, int index) throws FileNotFoundException,
|
||||
IOException {
|
||||
FileOutputStream fos =
|
||||
new FileOutputStream(new File(p.toUri().getPath()));
|
||||
JarOutputStream jos = new JarOutputStream(fos);
|
||||
ZipEntry ze = new ZipEntry("distributed.jar.inside" + index);
|
||||
jos.putNextEntry(ze);
|
||||
jos.write(("inside the jar!" + index).getBytes());
|
||||
jos.closeEntry();
|
||||
jos.close();
|
||||
localFs.setPermission(p, new FsPermission("700"));
|
||||
return p;
|
||||
}
|
||||
|
||||
private Path makeArchive(String archiveFile, String filename)
|
||||
throws Exception {
|
||||
Path archive = new Path(testRootDir, archiveFile);
|
||||
Path file = new Path(testRootDir, filename);
|
||||
DataOutputStream out = localFs.create(archive);
|
||||
ZipOutputStream zos = new ZipOutputStream(out);
|
||||
ZipEntry ze = new ZipEntry(file.toString());
|
||||
zos.putNextEntry(ze);
|
||||
zos.write(input.getBytes("UTF-8"));
|
||||
zos.closeEntry();
|
||||
zos.close();
|
||||
return archive;
|
||||
}
|
||||
}
|
|
@ -19,6 +19,8 @@
|
|||
package org.apache.hadoop.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -338,16 +340,41 @@ public class YARNRunner implements ClientProtocol {
|
|||
}
|
||||
}
|
||||
|
||||
private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
|
||||
throws IOException {
|
||||
private LocalResource createApplicationResource(FileContext fs, Path p,
|
||||
LocalResourceType type) throws IOException {
|
||||
return createApplicationResource(fs, p, null, type,
|
||||
LocalResourceVisibility.APPLICATION, false);
|
||||
}
|
||||
|
||||
private LocalResource createApplicationResource(FileContext fs, Path p,
|
||||
String fileSymlink, LocalResourceType type, LocalResourceVisibility viz,
|
||||
Boolean uploadToSharedCache) throws IOException {
|
||||
LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
|
||||
FileStatus rsrcStat = fs.getFileStatus(p);
|
||||
rsrc.setResource(URL.fromPath(fs
|
||||
.getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
|
||||
// We need to be careful when converting from path to URL to add a fragment
|
||||
// so that the symlink name when localized will be correct.
|
||||
Path qualifiedPath =
|
||||
fs.getDefaultFileSystem().resolvePath(rsrcStat.getPath());
|
||||
URI uriWithFragment = null;
|
||||
boolean useFragment = fileSymlink != null && !fileSymlink.equals("");
|
||||
try {
|
||||
if (useFragment) {
|
||||
uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink);
|
||||
} else {
|
||||
uriWithFragment = qualifiedPath.toUri();
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException(
|
||||
"Error parsing local resource path."
|
||||
+ " Path was not able to be converted to a URI: " + qualifiedPath,
|
||||
e);
|
||||
}
|
||||
rsrc.setResource(URL.fromURI(uriWithFragment));
|
||||
rsrc.setSize(rsrcStat.getLen());
|
||||
rsrc.setTimestamp(rsrcStat.getModificationTime());
|
||||
rsrc.setType(type);
|
||||
rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
rsrc.setVisibility(viz);
|
||||
rsrc.setShouldBeUploadedToSharedCache(uploadToSharedCache);
|
||||
return rsrc;
|
||||
}
|
||||
|
||||
|
@ -368,10 +395,21 @@ public class YARNRunner implements ClientProtocol {
|
|||
jobConfPath, LocalResourceType.FILE));
|
||||
if (jobConf.get(MRJobConfig.JAR) != null) {
|
||||
Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
|
||||
// We hard code the job.jar symlink because mapreduce code expects the
|
||||
// job.jar to be named that way.
|
||||
FileContext fccc =
|
||||
FileContext.getFileContext(jobJarPath.toUri(), jobConf);
|
||||
LocalResourceVisibility jobJarViz =
|
||||
jobConf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY,
|
||||
MRJobConfig.JOBJAR_VISIBILITY_DEFAULT)
|
||||
? LocalResourceVisibility.PUBLIC
|
||||
: LocalResourceVisibility.APPLICATION;
|
||||
LocalResource rc = createApplicationResource(
|
||||
FileContext.getFileContext(jobJarPath.toUri(), jobConf),
|
||||
jobJarPath,
|
||||
LocalResourceType.PATTERN);
|
||||
FileContext.getFileContext(jobJarPath.toUri(), jobConf), jobJarPath,
|
||||
MRJobConfig.JOB_JAR, LocalResourceType.PATTERN, jobJarViz,
|
||||
jobConf.getBoolean(
|
||||
MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY,
|
||||
MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY_DEFAULT));
|
||||
String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
|
||||
JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
|
||||
rc.setPattern(pattern);
|
||||
|
|
|
@ -132,6 +132,58 @@ public class TestLocalJobSubmission {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test local job submission with a file option.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testLocalJobFilesOption() throws IOException {
|
||||
Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
|
||||
conf.set(MRConfig.FRAMEWORK_NAME, "local");
|
||||
final String[] args =
|
||||
{"-jt", "local", "-files", jarPath.toString(), "-m", "1", "-r", "1",
|
||||
"-mt", "1", "-rt", "1"};
|
||||
int res = -1;
|
||||
try {
|
||||
res = ToolRunner.run(conf, new SleepJob(), args);
|
||||
} catch (Exception e) {
|
||||
System.out.println("Job failed with " + e.getLocalizedMessage());
|
||||
e.printStackTrace(System.out);
|
||||
fail("Job failed");
|
||||
}
|
||||
assertEquals("dist job res is not 0:", 0, res);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test local job submission with an archive option.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testLocalJobArchivesOption() throws IOException {
|
||||
Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
|
||||
conf.set(MRConfig.FRAMEWORK_NAME, "local");
|
||||
final String[] args =
|
||||
{"-jt", "local", "-archives", jarPath.toString(), "-m", "1", "-r",
|
||||
"1", "-mt", "1", "-rt", "1"};
|
||||
int res = -1;
|
||||
try {
|
||||
res = ToolRunner.run(conf, new SleepJob(), args);
|
||||
} catch (Exception e) {
|
||||
System.out.println("Job failed with " + e.getLocalizedMessage());
|
||||
e.printStackTrace(System.out);
|
||||
fail("Job failed");
|
||||
}
|
||||
assertEquals("dist job res is not 0:", 0, res);
|
||||
}
|
||||
|
||||
private Path makeJar(Path p) throws IOException {
|
||||
FileOutputStream fos = new FileOutputStream(new File(p.toString()));
|
||||
JarOutputStream jos = new JarOutputStream(fos);
|
||||
|
|
|
@ -1298,6 +1298,65 @@ public class TestMRJobs {
|
|||
jarFile.delete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSharedCache() throws Exception {
|
||||
Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());
|
||||
|
||||
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
||||
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
|
||||
+ " not found. Not running test.");
|
||||
return;
|
||||
}
|
||||
|
||||
Job job = Job.getInstance(mrCluster.getConfig());
|
||||
|
||||
Configuration jobConf = job.getConfiguration();
|
||||
jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled");
|
||||
|
||||
Path inputFile = createTempFile("input-file", "x");
|
||||
|
||||
// Create jars with a single file inside them.
|
||||
Path second = makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2);
|
||||
Path third = makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3);
|
||||
Path fourth = makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
|
||||
|
||||
// Add libjars to job conf
|
||||
jobConf.set("tmpjars", second.toString() + "," + third.toString() + ","
|
||||
+ fourth.toString());
|
||||
|
||||
// Because the job jar is a "dummy" jar, we need to include the jar with
|
||||
// DistributedCacheChecker or it won't be able to find it
|
||||
Path distributedCacheCheckerJar =
|
||||
new Path(JarFinder.getJar(SharedCacheChecker.class));
|
||||
job.addFileToClassPath(distributedCacheCheckerJar.makeQualified(
|
||||
localFs.getUri(), distributedCacheCheckerJar.getParent()));
|
||||
|
||||
job.setMapperClass(SharedCacheChecker.class);
|
||||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
|
||||
FileInputFormat.setInputPaths(job, inputFile);
|
||||
|
||||
job.setMaxMapAttempts(1); // speed up failures
|
||||
|
||||
job.submit();
|
||||
String trackingUrl = job.getTrackingURL();
|
||||
String jobId = job.getJobID().toString();
|
||||
Assert.assertTrue(job.waitForCompletion(true));
|
||||
Assert.assertTrue("Tracking URL was " + trackingUrl
|
||||
+ " but didn't Match Job ID " + jobId,
|
||||
trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
|
||||
}
|
||||
|
||||
/**
|
||||
* An identity mapper for testing the shared cache.
|
||||
*/
|
||||
public static class SharedCacheChecker extends
|
||||
Mapper<LongWritable, Text, NullWritable, NullWritable> {
|
||||
@Override
|
||||
public void setup(Context context) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
public static class ConfVerificationMapper extends SleepMapper {
|
||||
@Override
|
||||
protected void setup(Context context)
|
||||
|
|
|
@ -112,6 +112,7 @@
|
|||
<item name="Encrypted Shuffle" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html"/>
|
||||
<item name="Pluggable Shuffle/Sort" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/PluggableShuffleAndPluggableSort.html"/>
|
||||
<item name="Distributed Cache Deploy" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/DistributedCacheDeploy.html"/>
|
||||
<item name="Support for YARN Shared Cache" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/SharedCacheSupport.html"/>
|
||||
</menu>
|
||||
|
||||
<menu name="MapReduce REST APIs" inherit="top">
|
||||
|
|
Loading…
Reference in New Issue