From 24b03eb79f4e40824d7fe971b2b57c91fc425c92 Mon Sep 17 00:00:00 2001 From: Chris Trezzo Date: Thu, 12 Oct 2017 10:58:02 -0700 Subject: [PATCH] MAPREDUCE-5951. Add support for the YARN Shared Cache. (cherry picked from commit e46d5bb962b0c942f993afc505b165b1cd96e51b) --- .../mapreduce/v2/app/job/impl/JobImpl.java | 16 + .../v2/app/job/impl/TaskAttemptImpl.java | 52 ++- .../v2/util/LocalResourceBuilder.java | 169 +++++++ .../hadoop/mapreduce/v2/util/MRApps.java | 137 ++---- .../TestLocalDistributedCacheManager.java | 9 + .../hadoop/mapreduce/v2/util/TestMRApps.java | 8 +- .../hadoop-mapreduce-client-core/pom.xml | 6 + .../java/org/apache/hadoop/mapreduce/Job.java | 226 ++++++++++ .../hadoop/mapreduce/JobResourceUploader.java | 416 +++++++++++++++--- .../apache/hadoop/mapreduce/MRJobConfig.java | 71 +++ .../hadoop/mapreduce/SharedCacheConfig.java | 102 +++++ .../src/main/resources/mapred-default.xml | 11 + .../src/site/markdown/SharedCacheSupport.md | 100 +++++ .../mapreduce/TestJobResourceUploader.java | 80 ++-- ...estJobResourceUploaderWithSharedCache.java | 365 +++++++++++++++ .../org/apache/hadoop/mapred/YARNRunner.java | 54 ++- .../hadoop/mapred/TestLocalJobSubmission.java | 52 +++ .../hadoop/mapreduce/v2/TestMRJobs.java | 59 +++ hadoop-project/src/site/site.xml | 1 + 19 files changed, 1703 insertions(+), 231 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 757c545aa65..d2e2492be6c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -1414,6 +1415,19 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, new char[] {'"', '=', '.'}); } + /* + * The goal is to make sure only the NM that hosts MRAppMaster will upload + * resources to shared cache. Clean up the shared cache policies for all + * resources so that later when TaskAttemptImpl creates + * ContainerLaunchContext, LocalResource.setShouldBeUploadedToSharedCache will + * be set up to false. In that way, the NMs that host the task containers + * won't try to upload the resources to shared cache. + */ + private static void cleanupSharedCacheUploadPolicies(Configuration conf) { + Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap()); + Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap()); + } + public static class InitTransition implements MultipleArcTransition { @@ -1492,6 +1506,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, job.allowedReduceFailuresPercent = job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0); + cleanupSharedCacheUploadPolicies(job.conf); + // create the Tasks but don't start them yet createMapTasks(job, inputLength, taskSplitMetaInfo); createReduceTasks(job); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 650f387b03d..00c7b8405ae 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -708,17 +710,38 @@ public abstract class TaskAttemptImpl implements /** * Create a {@link LocalResource} record with all the given parameters. + * The NM that hosts AM container will upload resources to shared cache. + * Thus there is no need to ask task container's NM to upload the + * resources to shared cache. Set the shared cache upload policy to + * false. */ private static LocalResource createLocalResource(FileSystem fc, Path file, - LocalResourceType type, LocalResourceVisibility visibility) - throws IOException { + String fileSymlink, LocalResourceType type, + LocalResourceVisibility visibility) throws IOException { FileStatus fstat = fc.getFileStatus(file); - URL resourceURL = URL.fromPath(fc.resolvePath(fstat.getPath())); + // We need to be careful when converting from path to URL to add a fragment + // so that the symlink name when localized will be correct. + Path qualifiedPath = fc.resolvePath(fstat.getPath()); + URI uriWithFragment = null; + boolean useFragment = fileSymlink != null && !fileSymlink.equals(""); + try { + if (useFragment) { + uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink); + } else { + uriWithFragment = qualifiedPath.toUri(); + } + } catch (URISyntaxException e) { + throw new IOException( + "Error parsing local resource path." + + " Path was not able to be converted to a URI: " + qualifiedPath, + e); + } + URL resourceURL = URL.fromURI(uriWithFragment); long resourceSize = fstat.getLen(); long resourceModificationTime = fstat.getModificationTime(); return LocalResource.newInstance(resourceURL, type, visibility, - resourceSize, resourceModificationTime); + resourceSize, resourceModificationTime, false); } /** @@ -829,8 +852,18 @@ public abstract class TaskAttemptImpl implements final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf); Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(), jobJarFs.getWorkingDirectory()); - LocalResource rc = createLocalResource(jobJarFs, remoteJobJar, - LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION); + LocalResourceVisibility jobJarViz = + conf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY, + MRJobConfig.JOBJAR_VISIBILITY_DEFAULT) + ? LocalResourceVisibility.PUBLIC + : LocalResourceVisibility.APPLICATION; + // We hard code the job.jar localized symlink in the container directory. + // This is because the mapreduce app expects the job.jar to be named + // accordingly. Additionally we set the shared cache upload policy to + // false. Resources are uploaded by the AM if necessary. + LocalResource rc = + createLocalResource(jobJarFs, remoteJobJar, MRJobConfig.JOB_JAR, + LocalResourceType.PATTERN, jobJarViz); String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); rc.setPattern(pattern); @@ -855,9 +888,12 @@ public abstract class TaskAttemptImpl implements Path remoteJobConfPath = new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); FileSystem remoteFS = FileSystem.get(conf); + // There is no point to ask task container's NM to upload the resource + // to shared cache (job conf is not shared). Therefore, createLocalResource + // will set the shared cache upload policy to false localResources.put(MRJobConfig.JOB_CONF_FILE, - createLocalResource(remoteFS, remoteJobConfPath, LocalResourceType.FILE, - LocalResourceVisibility.APPLICATION)); + createLocalResource(remoteFS, remoteJobConfPath, null, + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); LOG.info("The job-conf file on the remote FS is " + remoteJobConfPath.toUri().toASCIIString()); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java new file mode 100644 index 00000000000..cb55e1332bd --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/LocalResourceBuilder.java @@ -0,0 +1,169 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.util; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; + +/** + * Helper class for MR applications that parses distributed cache artifacts and + * creates a map of LocalResources. + */ +@SuppressWarnings("deprecation") +@Private +@Unstable +class LocalResourceBuilder { + public static final Log LOG = LogFactory.getLog(LocalResourceBuilder.class); + + private Configuration conf; + private LocalResourceType type; + private URI[] uris; + private long[] timestamps; + private long[] sizes; + private boolean[] visibilities; + private Map sharedCacheUploadPolicies; + + LocalResourceBuilder() { + } + + void setConf(Configuration c) { + this.conf = c; + } + + void setType(LocalResourceType t) { + this.type = t; + } + + void setUris(URI[] u) { + this.uris = u; + } + + void setTimestamps(long[] t) { + this.timestamps = t; + } + + void setSizes(long[] s) { + this.sizes = s; + } + + void setVisibilities(boolean[] v) { + this.visibilities = v; + } + + void setSharedCacheUploadPolicies(Map policies) { + this.sharedCacheUploadPolicies = policies; + } + + void createLocalResources(Map localResources) + throws IOException { + + if (uris != null) { + // Sanity check + if ((uris.length != timestamps.length) || (uris.length != sizes.length) || + (uris.length != visibilities.length)) { + throw new IllegalArgumentException("Invalid specification for " + + "distributed-cache artifacts of type " + type + " :" + + " #uris=" + uris.length + + " #timestamps=" + timestamps.length + + " #visibilities=" + visibilities.length + ); + } + + for (int i = 0; i < uris.length; ++i) { + URI u = uris[i]; + Path p = new Path(u); + FileSystem remoteFS = p.getFileSystem(conf); + String linkName = null; + + if (p.getName().equals(DistributedCache.WILDCARD)) { + p = p.getParent(); + linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD; + } + + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory())); + + // If there's no wildcard, try using the fragment for the link + if (linkName == null) { + linkName = u.getFragment(); + + // Because we don't know what's in the fragment, we have to handle + // it with care. + if (linkName != null) { + Path linkPath = new Path(linkName); + + if (linkPath.isAbsolute()) { + throw new IllegalArgumentException("Resource name must be " + + "relative"); + } + + linkName = linkPath.toUri().getPath(); + } + } else if (u.getFragment() != null) { + throw new IllegalArgumentException("Invalid path URI: " + p + + " - cannot contain both a URI fragment and a wildcard"); + } + + // If there's no wildcard or fragment, just link to the file name + if (linkName == null) { + linkName = p.getName(); + } + + LocalResource orig = localResources.get(linkName); + if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) { + throw new InvalidJobConfException( + getResourceDescription(orig.getType()) + orig.getResource() + + + " conflicts with " + getResourceDescription(type) + u); + } + Boolean sharedCachePolicy = sharedCacheUploadPolicies.get(u.toString()); + sharedCachePolicy = + sharedCachePolicy == null ? Boolean.FALSE : sharedCachePolicy; + localResources.put(linkName, LocalResource.newInstance(URL.fromURI(p + .toUri()), type, visibilities[i] ? LocalResourceVisibility.PUBLIC + : LocalResourceVisibility.PRIVATE, + sizes[i], timestamps[i], sharedCachePolicy)); + } + } + } + + private static String getResourceDescription(LocalResourceType type) { + if (type == LocalResourceType.ARCHIVE + || type == LocalResourceType.PATTERN) { + return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") "; + } + return "cache file (" + MRJobConfig.CACHE_FILES + ") "; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index a43da653146..57771178faf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -42,7 +42,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.TaskLog; @@ -67,12 +67,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.Apps; -import org.apache.hadoop.yarn.util.ConverterUtils; /** * Helper class for MR applications @@ -251,10 +248,16 @@ public class MRApps extends Apps { if (!userClassesTakesPrecedence) { MRApps.setMRFrameworkClasspath(environment, conf); } + /* + * We use "*" for the name of the JOB_JAR instead of MRJobConfig.JOB_JAR for + * the case where the job jar is not necessarily named "job.jar". This can + * happen, for example, when the job is leveraging a resource from the YARN + * shared cache. + */ MRApps.addToEnvironment( environment, classpathEnvVar, - MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf); + MRJobConfig.JOB_JAR + Path.SEPARATOR + "*", conf); MRApps.addToEnvironment( environment, classpathEnvVar, @@ -471,27 +474,32 @@ public class MRApps extends Apps { return startCommitFile; } - public static void setupDistributedCache( - Configuration conf, - Map localResources) - throws IOException { - + @SuppressWarnings("deprecation") + public static void setupDistributedCache(Configuration conf, + Map localResources) throws IOException { + + LocalResourceBuilder lrb = new LocalResourceBuilder(); + lrb.setConf(conf); + // Cache archives - parseDistributedCacheArtifacts(conf, localResources, - LocalResourceType.ARCHIVE, - DistributedCache.getCacheArchives(conf), - DistributedCache.getArchiveTimestamps(conf), - getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), - DistributedCache.getArchiveVisibilities(conf)); + lrb.setType(LocalResourceType.ARCHIVE); + lrb.setUris(DistributedCache.getCacheArchives(conf)); + lrb.setTimestamps(DistributedCache.getArchiveTimestamps(conf)); + lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES)); + lrb.setVisibilities(DistributedCache.getArchiveVisibilities(conf)); + lrb.setSharedCacheUploadPolicies( + Job.getArchiveSharedCacheUploadPolicies(conf)); + lrb.createLocalResources(localResources); // Cache files - parseDistributedCacheArtifacts(conf, - localResources, - LocalResourceType.FILE, - DistributedCache.getCacheFiles(conf), - DistributedCache.getFileTimestamps(conf), - getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), - DistributedCache.getFileVisibilities(conf)); + lrb.setType(LocalResourceType.FILE); + lrb.setUris(DistributedCache.getCacheFiles(conf)); + lrb.setTimestamps(DistributedCache.getFileTimestamps(conf)); + lrb.setSizes(getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES)); + lrb.setVisibilities(DistributedCache.getFileVisibilities(conf)); + lrb.setSharedCacheUploadPolicies( + Job.getFileSharedCacheUploadPolicies(conf)); + lrb.createLocalResources(localResources); } /** @@ -550,89 +558,6 @@ public class MRApps extends Apps { } } - private static String getResourceDescription(LocalResourceType type) { - if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) { - return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") "; - } - return "cache file (" + MRJobConfig.CACHE_FILES + ") "; - } - - // TODO - Move this to MR! - // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], - // long[], boolean[], Path[], FileType) - private static void parseDistributedCacheArtifacts( - Configuration conf, - Map localResources, - LocalResourceType type, - URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[]) - throws IOException { - - if (uris != null) { - // Sanity check - if ((uris.length != timestamps.length) || (uris.length != sizes.length) || - (uris.length != visibilities.length)) { - throw new IllegalArgumentException("Invalid specification for " + - "distributed-cache artifacts of type " + type + " :" + - " #uris=" + uris.length + - " #timestamps=" + timestamps.length + - " #visibilities=" + visibilities.length - ); - } - - for (int i = 0; i < uris.length; ++i) { - URI u = uris[i]; - Path p = new Path(u); - FileSystem remoteFS = p.getFileSystem(conf); - String linkName = null; - - if (p.getName().equals(DistributedCache.WILDCARD)) { - p = p.getParent(); - linkName = p.getName() + Path.SEPARATOR + DistributedCache.WILDCARD; - } - - p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), - remoteFS.getWorkingDirectory())); - - // If there's no wildcard, try using the fragment for the link - if (linkName == null) { - linkName = u.getFragment(); - - // Because we don't know what's in the fragment, we have to handle - // it with care. - if (linkName != null) { - Path linkPath = new Path(linkName); - - if (linkPath.isAbsolute()) { - throw new IllegalArgumentException("Resource name must be " - + "relative"); - } - - linkName = linkPath.toUri().getPath(); - } - } else if (u.getFragment() != null) { - throw new IllegalArgumentException("Invalid path URI: " + p + - " - cannot contain both a URI fragment and a wildcard"); - } - - // If there's no wildcard or fragment, just link to the file name - if (linkName == null) { - linkName = p.getName(); - } - - LocalResource orig = localResources.get(linkName); - if(orig != null && !orig.getResource().equals(URL.fromURI(p.toUri()))) { - throw new InvalidJobConfException( - getResourceDescription(orig.getType()) + orig.getResource() + - " conflicts with " + getResourceDescription(type) + u); - } - localResources.put(linkName, LocalResource - .newInstance(URL.fromURI(p.toUri()), type, visibilities[i] - ? LocalResourceVisibility.PUBLIC : LocalResourceVisibility.PRIVATE, - sizes[i], timestamps[i])); - } - } - } - // TODO - Move this to MR! private static long[] getFileSizes(Configuration conf, String key) { String[] strs = conf.getStrings(key); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java index ec80e65f030..d2814e962d3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java @@ -30,6 +30,8 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -39,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.filecache.DistributedCache; @@ -164,6 +167,9 @@ public class TestLocalDistributedCacheManager { }); DistributedCache.addCacheFile(file, conf); + Map policies = new HashMap(); + policies.put(file.toString(), true); + Job.setFileSharedCacheUploadPolicies(conf, policies); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "201"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false"); @@ -272,6 +278,9 @@ public class TestLocalDistributedCacheManager { DistributedCache.addCacheFile(file, conf); DistributedCache.addCacheFile(file, conf); + Map policies = new HashMap(); + policies.put(file.toString(), true); + Job.setFileSharedCacheUploadPolicies(conf, policies); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java index 96b4e84b55e..a6936e663c8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java @@ -260,7 +260,7 @@ public class TestMRApps { } String env_str = env.get("CLASSPATH"); String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, - Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar", + Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/*", "job.jar/classes/", "job.jar/lib/*", ApplicationConstants.Environment.PWD.$$() + "/*")); assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!", @@ -280,7 +280,7 @@ public class TestMRApps { } String env_str = env.get("CLASSPATH"); String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, - Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*", + Arrays.asList("job.jar/*", "job.jar/classes/", "job.jar/lib/*", ApplicationConstants.Environment.PWD.$$() + "/*")); assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in" + " the classpath!", env_str.contains(expectedClasspath)); @@ -302,7 +302,7 @@ public class TestMRApps { assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!", cp.contains("PWD")); String expectedAppClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, - Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/job.jar", + Arrays.asList(ApplicationConstants.Environment.PWD.$$(), "job.jar/*", "job.jar/classes/", "job.jar/lib/*", ApplicationConstants.Environment.PWD.$$() + "/*")); assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app" @@ -331,7 +331,7 @@ public class TestMRApps { conf.set(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, FRAMEWORK_CLASSPATH); MRApps.setClasspath(env, conf); final String stdClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, - Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*", + Arrays.asList("job.jar/*", "job.jar/classes/", "job.jar/lib/*", ApplicationConstants.Environment.PWD.$$() + "/*")); String expectedClasspath = StringUtils.join(ApplicationConstants.CLASS_PATH_SEPARATOR, Arrays.asList(ApplicationConstants.Environment.PWD.$$(), diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml index 5b55d4735b0..824c25ec14b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml @@ -51,6 +51,12 @@ hadoop-hdfs test + + org.apache.hadoop + hadoop-hdfs + test-jar + test + org.skyscreamer jsonassert diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java index 5530d95b77b..a09f0341623 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -21,12 +21,17 @@ package org.apache.hadoop.mapreduce; import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.fs.FileSystem; @@ -1303,6 +1308,227 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable { } } + /** + * Add a file to job config for shared cache processing. If shared cache is + * enabled, it will return true, otherwise, return false. We don't check with + * SCM here given application might not be able to provide the job id; + * ClientSCMProtocol.use requires the application id. Job Submitter will read + * the files from job config and take care of things. + * + * @param resource The resource that Job Submitter will process later using + * shared cache. + * @param conf Configuration to add the resource to + * @return whether the resource has been added to the configuration + */ + @Unstable + public static boolean addFileToSharedCache(URI resource, Configuration conf) { + SharedCacheConfig scConfig = new SharedCacheConfig(); + scConfig.init(conf); + if (scConfig.isSharedCacheFilesEnabled()) { + String files = conf.get(MRJobConfig.FILES_FOR_SHARED_CACHE); + conf.set( + MRJobConfig.FILES_FOR_SHARED_CACHE, + files == null ? resource.toString() : files + "," + + resource.toString()); + return true; + } else { + return false; + } + } + + /** + * Add a file to job config for shared cache processing. If shared cache is + * enabled, it will return true, otherwise, return false. We don't check with + * SCM here given application might not be able to provide the job id; + * ClientSCMProtocol.use requires the application id. Job Submitter will read + * the files from job config and take care of things. Job Submitter will also + * add the file to classpath. Intended to be used by user code. + * + * @param resource The resource that Job Submitter will process later using + * shared cache. + * @param conf Configuration to add the resource to + * @return whether the resource has been added to the configuration + */ + @Unstable + public static boolean addFileToSharedCacheAndClasspath(URI resource, + Configuration conf) { + SharedCacheConfig scConfig = new SharedCacheConfig(); + scConfig.init(conf); + if (scConfig.isSharedCacheLibjarsEnabled()) { + String files = + conf.get(MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE); + conf.set( + MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE, + files == null ? resource.toString() : files + "," + + resource.toString()); + return true; + } else { + return false; + } + } + + /** + * Add an archive to job config for shared cache processing. If shared cache + * is enabled, it will return true, otherwise, return false. We don't check + * with SCM here given application might not be able to provide the job id; + * ClientSCMProtocol.use requires the application id. Job Submitter will read + * the files from job config and take care of things. Intended to be used by + * user code. + * + * @param resource The resource that Job Submitter will process later using + * shared cache. + * @param conf Configuration to add the resource to + * @return whether the resource has been added to the configuration + */ + @Unstable + public static boolean addArchiveToSharedCache(URI resource, + Configuration conf) { + SharedCacheConfig scConfig = new SharedCacheConfig(); + scConfig.init(conf); + if (scConfig.isSharedCacheArchivesEnabled()) { + String files = conf.get(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE); + conf.set( + MRJobConfig.ARCHIVES_FOR_SHARED_CACHE, + files == null ? resource.toString() : files + "," + + resource.toString()); + return true; + } else { + return false; + } + } + + /** + * This is to set the shared cache upload policies for files. If the parameter + * was previously set, this method will replace the old value with the new + * provided map. + * + * @param conf Configuration which stores the shared cache upload policies + * @param policies A map containing the shared cache upload policies for a set + * of resources. The key is the url of the resource and the value is + * the upload policy. True if it should be uploaded, false otherwise. + */ + @Unstable + public static void setFileSharedCacheUploadPolicies(Configuration conf, + Map policies) { + setSharedCacheUploadPolicies(conf, policies, true); + } + + /** + * This is to set the shared cache upload policies for archives. If the + * parameter was previously set, this method will replace the old value with + * the new provided map. + * + * @param conf Configuration which stores the shared cache upload policies + * @param policies A map containing the shared cache upload policies for a set + * of resources. The key is the url of the resource and the value is + * the upload policy. True if it should be uploaded, false otherwise. + */ + @Unstable + public static void setArchiveSharedCacheUploadPolicies(Configuration conf, + Map policies) { + setSharedCacheUploadPolicies(conf, policies, false); + } + + // We use a double colon because a colon is a reserved character in a URI and + // there should not be two colons next to each other. + private static final String DELIM = "::"; + + /** + * Set the shared cache upload policies config parameter. This is done by + * serializing the provided map of shared cache upload policies into a config + * parameter. If the parameter was previously set, this method will replace + * the old value with the new provided map. + * + * @param conf Configuration which stores the shared cache upload policies + * @param policies A map containing the shared cache upload policies for a set + * of resources. The key is the url of the resource and the value is + * the upload policy. True if it should be uploaded, false otherwise. + * @param areFiles True if these policies are for files, false if they are for + * archives. + */ + private static void setSharedCacheUploadPolicies(Configuration conf, + Map policies, boolean areFiles) { + if (policies != null) { + StringBuilder sb = new StringBuilder(); + Iterator> it = policies.entrySet().iterator(); + Map.Entry e; + if (it.hasNext()) { + e = it.next(); + sb.append(e.getKey() + DELIM + e.getValue()); + } else { + // policies is an empty map, just skip setting the parameter + return; + } + while (it.hasNext()) { + e = it.next(); + sb.append("," + e.getKey() + DELIM + e.getValue()); + } + String confParam = + areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES + : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES; + conf.set(confParam, sb.toString()); + } + } + + /** + * Deserialize a map of shared cache upload policies from a config parameter. + * + * @param conf Configuration which stores the shared cache upload policies + * @param areFiles True if these policies are for files, false if they are for + * archives. + * @return A map containing the shared cache upload policies for a set of + * resources. The key is the url of the resource and the value is the + * upload policy. True if it should be uploaded, false otherwise. + */ + private static Map getSharedCacheUploadPolicies( + Configuration conf, boolean areFiles) { + String confParam = + areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES + : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES; + Collection policies = conf.getStringCollection(confParam); + String[] policy; + Map policyMap = new LinkedHashMap(); + for (String s : policies) { + policy = s.split(DELIM); + if (policy.length != 2) { + LOG.error(confParam + + " is mis-formatted, returning empty shared cache upload policies." + + " Error on [" + s + "]"); + return new LinkedHashMap(); + } + policyMap.put(policy[0], Boolean.parseBoolean(policy[1])); + } + return policyMap; + } + + /** + * This is to get the shared cache upload policies for files. + * + * @param conf Configuration which stores the shared cache upload policies + * @return A map containing the shared cache upload policies for a set of + * resources. The key is the url of the resource and the value is the + * upload policy. True if it should be uploaded, false otherwise. + */ + @Unstable + public static Map getFileSharedCacheUploadPolicies( + Configuration conf) { + return getSharedCacheUploadPolicies(conf, true); + } + + /** + * This is to get the shared cache upload policies for archives. + * + * @param conf Configuration which stores the shared cache upload policies + * @return A map containing the shared cache upload policies for a set of + * resources. The key is the url of the resource and the value is the + * upload policy. True if it should be uploaded, false otherwise. + */ + @Unstable + public static Map getArchiveSharedCacheUploadPolicies( + Configuration conf) { + return getSharedCacheUploadPolicies(conf, false); + } + private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java index d9bf988f9b8..a044fc1fe78 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java @@ -24,12 +24,13 @@ import java.net.URISyntaxException; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; +import java.util.LinkedHashMap; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -40,30 +41,100 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.client.api.SharedCacheClient; +import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.annotations.VisibleForTesting; -@InterfaceAudience.Private -@InterfaceStability.Unstable +/** + * This class is responsible for uploading resources from the client to HDFS + * that are associated with a MapReduce job. + */ +@Private +@Unstable class JobResourceUploader { protected static final Log LOG = LogFactory.getLog(JobResourceUploader.class); private final boolean useWildcard; private final FileSystem jtFs; + private SharedCacheClient scClient = null; + private SharedCacheConfig scConfig = new SharedCacheConfig(); + private ApplicationId appId = null; JobResourceUploader(FileSystem submitFs, boolean useWildcard) { this.jtFs = submitFs; this.useWildcard = useWildcard; } + private void initSharedCache(JobID jobid, Configuration conf) { + this.scConfig.init(conf); + if (this.scConfig.isSharedCacheEnabled()) { + this.scClient = createSharedCacheClient(conf); + appId = jobIDToAppId(jobid); + } + } + + /* + * We added this method so that we could do the conversion between JobId and + * ApplicationId for the shared cache client. This logic is very similar to + * the org.apache.hadoop.mapreduce.TypeConverter#toYarn method. We don't use + * that because mapreduce-client-core can not depend on + * mapreduce-client-common. + */ + private ApplicationId jobIDToAppId(JobID jobId) { + return ApplicationId.newInstance(Long.parseLong(jobId.getJtIdentifier()), + jobId.getId()); + } + + private void stopSharedCache() { + if (scClient != null) { + scClient.stop(); + scClient = null; + } + } + + /** + * Create, initialize and start a new shared cache client. + */ + @VisibleForTesting + protected SharedCacheClient createSharedCacheClient(Configuration conf) { + SharedCacheClient scc = SharedCacheClient.createSharedCacheClient(); + scc.init(conf); + scc.start(); + return scc; + } + /** * Upload and configure files, libjars, jobjars, and archives pertaining to * the passed job. - * + *

+ * This client will use the shared cache for libjars, files, archives and + * jobjars if it is enabled. When shared cache is enabled, it will try to use + * the shared cache and fall back to the default behavior when the scm isn't + * available. + *

+ * 1. For the resources that have been successfully shared, we will continue + * to use them in a shared fashion. + *

+ * 2. For the resources that weren't in the cache and need to be uploaded by + * NM, we won't ask NM to upload them. + * * @param job the job containing the files to be uploaded * @param submitJobDir the submission directory of the job * @throws IOException */ public void uploadResources(Job job, Path submitJobDir) throws IOException { + try { + initSharedCache(job.getJobID(), job.getConfiguration()); + uploadResourcesInternal(job, submitJobDir); + } finally { + stopSharedCache(); + } + } + + private void uploadResourcesInternal(Job job, Path submitJobDir) + throws IOException { Configuration conf = job.getConfiguration(); short replication = (short) conf.getInt(Job.SUBMIT_REPLICATION, @@ -90,6 +161,7 @@ class JobResourceUploader { + " already exists!! This is unexpected.Please check what's there in" + " that directory"); } + // Create the submission directory for the MapReduce job. submitJobDir = jtFs.makeQualified(submitJobDir); submitJobDir = new Path(submitJobDir.toUri().getPath()); FsPermission mapredSysPerms = @@ -101,20 +173,45 @@ class JobResourceUploader { disableErasureCodingForPath(jtFs, submitJobDir); } + // Get the resources that have been added via command line arguments in the + // GenericOptionsParser (i.e. files, libjars, archives). Collection files = conf.getStringCollection("tmpfiles"); Collection libjars = conf.getStringCollection("tmpjars"); Collection archives = conf.getStringCollection("tmparchives"); String jobJar = job.getJar(); + // Merge resources that have been programmatically specified for the shared + // cache via the Job API. + files.addAll(conf.getStringCollection(MRJobConfig.FILES_FOR_SHARED_CACHE)); + libjars.addAll(conf.getStringCollection( + MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE)); + archives.addAll(conf + .getStringCollection(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE)); + + Map statCache = new HashMap(); checkLocalizationLimits(conf, files, libjars, archives, jobJar, statCache); - uploadFiles(conf, files, submitJobDir, mapredSysPerms, replication); - uploadLibJars(conf, libjars, submitJobDir, mapredSysPerms, replication); - uploadArchives(conf, archives, submitJobDir, mapredSysPerms, replication); - uploadJobJar(job, jobJar, submitJobDir, replication); + Map fileSCUploadPolicies = + new LinkedHashMap(); + Map archiveSCUploadPolicies = + new LinkedHashMap(); + + uploadFiles(job, files, submitJobDir, mapredSysPerms, replication, + fileSCUploadPolicies, statCache); + uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication, + fileSCUploadPolicies, statCache); + uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication, + archiveSCUploadPolicies, statCache); + uploadJobJar(job, jobJar, submitJobDir, replication, statCache); addLog4jToDistributedCache(job, submitJobDir); + // Note, we do not consider resources in the distributed cache for the + // shared cache at this time. Only resources specified via the + // GenericOptionsParser or the jobjar. + Job.setFileSharedCacheUploadPolicies(conf, fileSCUploadPolicies); + Job.setArchiveSharedCacheUploadPolicies(conf, archiveSCUploadPolicies); + // set the timestamps of the archives and files // set the public/private visibility of the archives and files ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf, @@ -125,9 +222,11 @@ class JobResourceUploader { } @VisibleForTesting - void uploadFiles(Configuration conf, Collection files, - Path submitJobDir, FsPermission mapredSysPerms, short submitReplication) + void uploadFiles(Job job, Collection files, + Path submitJobDir, FsPermission mapredSysPerms, short submitReplication, + Map fileSCUploadPolicies, Map statCache) throws IOException { + Configuration conf = job.getConfiguration(); Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir); if (!files.isEmpty()) { mkdirs(jtFs, filesDir, mapredSysPerms); @@ -140,17 +239,33 @@ class JobResourceUploader { + " Argument must be a valid URI: " + tmpFile, e); } Path tmp = new Path(tmpURI); - Path newPath = copyRemoteFiles(filesDir, tmp, conf, submitReplication); - try { - URI pathURI = getPathURI(newPath, tmpURI.getFragment()); - DistributedCache.addCacheFile(pathURI, conf); - } catch (URISyntaxException ue) { - // should not throw a uri exception - throw new IOException( - "Failed to create a URI (URISyntaxException) for the remote path " - + newPath + ". This was based on the files parameter: " - + tmpFile, - ue); + URI newURI = null; + boolean uploadToSharedCache = false; + if (scConfig.isSharedCacheFilesEnabled()) { + newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true); + if (newURI == null) { + uploadToSharedCache = true; + } + } + + if (newURI == null) { + Path newPath = + copyRemoteFiles(filesDir, tmp, conf, submitReplication); + try { + newURI = getPathURI(newPath, tmpURI.getFragment()); + } catch (URISyntaxException ue) { + // should not throw a uri exception + throw new IOException( + "Failed to create a URI (URISyntaxException) for the" + + " remote path " + newPath + + ". This was based on the files parameter: " + tmpFile, + ue); + } + } + + job.addCacheFile(newURI); + if (scConfig.isSharedCacheFilesEnabled()) { + fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache); } } } @@ -159,9 +274,11 @@ class JobResourceUploader { // Suppress warning for use of DistributedCache (it is everywhere). @SuppressWarnings("deprecation") @VisibleForTesting - void uploadLibJars(Configuration conf, Collection libjars, - Path submitJobDir, FsPermission mapredSysPerms, short submitReplication) + void uploadLibJars(Job job, Collection libjars, Path submitJobDir, + FsPermission mapredSysPerms, short submitReplication, + Map fileSCUploadPolicies, Map statCache) throws IOException { + Configuration conf = job.getConfiguration(); Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir); if (!libjars.isEmpty()) { mkdirs(jtFs, libjarsDir, mapredSysPerms); @@ -176,23 +293,53 @@ class JobResourceUploader { + " Argument must be a valid URI: " + tmpjars, e); } Path tmp = new Path(tmpURI); - Path newPath = - copyRemoteFiles(libjarsDir, tmp, conf, submitReplication); - try { - URI pathURI = getPathURI(newPath, tmpURI.getFragment()); - if (!foundFragment) { - foundFragment = pathURI.getFragment() != null; + URI newURI = null; + boolean uploadToSharedCache = false; + boolean fromSharedCache = false; + if (scConfig.isSharedCacheLibjarsEnabled()) { + newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true); + if (newURI == null) { + uploadToSharedCache = true; + } else { + fromSharedCache = true; } - DistributedCache.addFileToClassPath(new Path(pathURI.getPath()), conf, - jtFs, false); - libjarURIs.add(pathURI); - } catch (URISyntaxException ue) { - // should not throw a uri exception - throw new IOException( - "Failed to create a URI (URISyntaxException) for the remote path " - + newPath + ". This was based on the libjar parameter: " - + tmpjars, - ue); + } + + if (newURI == null) { + Path newPath = + copyRemoteFiles(libjarsDir, tmp, conf, submitReplication); + try { + newURI = getPathURI(newPath, tmpURI.getFragment()); + } catch (URISyntaxException ue) { + // should not throw a uri exception + throw new IOException( + "Failed to create a URI (URISyntaxException) for the" + + " remote path " + newPath + + ". This was based on the libjar parameter: " + tmpjars, + ue); + } + } + + if (!foundFragment) { + // We do not count shared cache paths containing fragments as a + // "foundFragment." This is because these resources are not in the + // staging directory and will be added to the distributed cache + // separately. + foundFragment = (newURI.getFragment() != null) && !fromSharedCache; + } + DistributedCache.addFileToClassPath(new Path(newURI.getPath()), conf, + jtFs, false); + if (fromSharedCache) { + // We simply add this URI to the distributed cache. It will not come + // from the staging directory (it is in the shared cache), so we + // must add it to the cache regardless of the wildcard feature. + DistributedCache.addCacheFile(newURI, conf); + } else { + libjarURIs.add(newURI); + } + + if (scConfig.isSharedCacheLibjarsEnabled()) { + fileSCUploadPolicies.put(newURI.toString(), uploadToSharedCache); } } @@ -210,9 +357,11 @@ class JobResourceUploader { } @VisibleForTesting - void uploadArchives(Configuration conf, Collection archives, - Path submitJobDir, FsPermission mapredSysPerms, short submitReplication) - throws IOException { + void uploadArchives(Job job, Collection archives, + Path submitJobDir, FsPermission mapredSysPerms, short submitReplication, + Map archiveSCUploadPolicies, + Map statCache) throws IOException { + Configuration conf = job.getConfiguration(); Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir); if (!archives.isEmpty()) { mkdirs(jtFs, archivesDir, mapredSysPerms); @@ -225,18 +374,34 @@ class JobResourceUploader { + " Argument must be a valid URI: " + tmpArchives, e); } Path tmp = new Path(tmpURI); - Path newPath = - copyRemoteFiles(archivesDir, tmp, conf, submitReplication); - try { - URI pathURI = getPathURI(newPath, tmpURI.getFragment()); - DistributedCache.addCacheArchive(pathURI, conf); - } catch (URISyntaxException ue) { - // should not throw an uri excpetion - throw new IOException( - "Failed to create a URI (URISyntaxException) for the remote path" - + newPath + ". This was based on the archive parameter: " - + tmpArchives, - ue); + URI newURI = null; + boolean uploadToSharedCache = false; + if (scConfig.isSharedCacheArchivesEnabled()) { + newURI = useSharedCache(tmpURI, tmp.getName(), statCache, conf, true); + if (newURI == null) { + uploadToSharedCache = true; + } + } + + if (newURI == null) { + Path newPath = + copyRemoteFiles(archivesDir, tmp, conf, submitReplication); + try { + newURI = getPathURI(newPath, tmpURI.getFragment()); + } catch (URISyntaxException ue) { + // should not throw a uri exception + throw new IOException( + "Failed to create a URI (URISyntaxException) for the" + + " remote path " + newPath + + ". This was based on the archive parameter: " + + tmpArchives, + ue); + } + } + + job.addCacheArchive(newURI); + if (scConfig.isSharedCacheArchivesEnabled()) { + archiveSCUploadPolicies.put(newURI.toString(), uploadToSharedCache); } } } @@ -244,7 +409,9 @@ class JobResourceUploader { @VisibleForTesting void uploadJobJar(Job job, String jobJar, Path submitJobDir, - short submitReplication) throws IOException { + short submitReplication, Map statCache) + throws IOException { + Configuration conf = job.getConfiguration(); if (jobJar != null) { // copy jar to JobTracker's fs // use jar name if job is not named. if ("".equals(job.getJobName())) { @@ -252,12 +419,59 @@ class JobResourceUploader { } Path jobJarPath = new Path(jobJar); URI jobJarURI = jobJarPath.toUri(); - // If the job jar is already in a global fs, - // we don't need to copy it from local fs - if (jobJarURI.getScheme() == null || jobJarURI.getScheme().equals("file")) { - copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir), - submitReplication); - job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString()); + Path newJarPath = null; + boolean uploadToSharedCache = false; + if (jobJarURI.getScheme() == null || + jobJarURI.getScheme().equals("file")) { + // job jar is on the local file system + if (scConfig.isSharedCacheJobjarEnabled()) { + // We must have a qualified path for the shared cache client. We can + // assume this is for the local filesystem + jobJarPath = FileSystem.getLocal(conf).makeQualified(jobJarPath); + // Don't add a resource name here because the resource name (i.e. + // job.jar directory symlink) will always be hard coded to job.jar for + // the job.jar + URI newURI = + useSharedCache(jobJarPath.toUri(), null, statCache, conf, false); + if (newURI == null) { + uploadToSharedCache = true; + } else { + newJarPath = stringToPath(newURI.toString()); + // The job jar is coming from the shared cache (i.e. a public + // place), so we want the job.jar to have a public visibility. + conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true); + } + } + if (newJarPath == null) { + newJarPath = JobSubmissionFiles.getJobJar(submitJobDir); + copyJar(jobJarPath, newJarPath, submitReplication); + } + } else { + // job jar is in a remote file system + if (scConfig.isSharedCacheJobjarEnabled()) { + // Don't add a resource name here because the resource name (i.e. + // job.jar directory symlink) will always be hard coded to job.jar for + // the job.jar + URI newURI = useSharedCache(jobJarURI, null, statCache, conf, false); + if (newURI == null) { + uploadToSharedCache = true; + newJarPath = jobJarPath; + } else { + newJarPath = stringToPath(newURI.toString()); + // The job jar is coming from the shared cache (i.e. a public + // place), so we want the job.jar to have a public visibility. + conf.setBoolean(MRJobConfig.JOBJAR_VISIBILITY, true); + } + } else { + // we don't need to upload the jobjar to the staging directory because + // it is already in an accessible place + newJarPath = jobJarPath; + } + } + job.setJar(newJarPath.toString()); + if (scConfig.isSharedCacheJobjarEnabled()) { + conf.setBoolean(MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY, + uploadToSharedCache); } } else { LOG.warn("No job jar file set. User classes may not be found. " @@ -267,7 +481,9 @@ class JobResourceUploader { /** * Verify that the resources this job is going to localize are within the - * localization limits. + * localization limits. We count all resources towards these limits regardless + * of where they are coming from (i.e. local, distributed cache, or shared + * cache). */ @VisibleForTesting void checkLocalizationLimits(Configuration conf, Collection files, @@ -464,6 +680,80 @@ class JobResourceUploader { return newPath; } + /** + * Checksum a local resource file and call use for that resource with the scm. + */ + private URI useSharedCache(URI sourceFile, String resourceName, + Map statCache, Configuration conf, boolean honorFragment) + throws IOException { + if (scClient == null) { + return null; + } + Path filePath = new Path(sourceFile); + if (getFileStatus(statCache, conf, filePath).isDirectory()) { + LOG.warn("Shared cache does not support directories" + + " (see YARN-6097)." + " Will not upload " + filePath + + " to the shared cache."); + return null; + } + + String rn = resourceName; + if (honorFragment) { + if (sourceFile.getFragment() != null) { + rn = sourceFile.getFragment(); + } + } + + // If for whatever reason, we can't even calculate checksum for + // a resource, something is really wrong with the file system; + // even non-SCM approach won't work. Let us just throw the exception. + String checksum = scClient.getFileChecksum(filePath); + URL url = null; + try { + url = scClient.use(this.appId, checksum); + } catch (YarnException e) { + LOG.warn("Error trying to contact the shared cache manager," + + " disabling the SCMClient for the rest of this job submission", e); + /* + * If we fail to contact the SCM, we do not use it for the rest of this + * JobResourceUploader's life. This prevents us from having to timeout + * each time we try to upload a file while the SCM is unavailable. Instead + * we timeout/error the first time and quickly revert to the default + * behavior without the shared cache. We do this by stopping the shared + * cache client and setting it to null. + */ + stopSharedCache(); + } + + if (url != null) { + // Because we deal with URI's in mapreduce, we need to convert the URL to + // a URI and add a fragment if necessary. + URI uri = null; + try { + String name = new Path(url.getFile()).getName(); + if (rn != null && !name.equals(rn)) { + // A name was specified that is different then the URL in the shared + // cache. Therefore, we need to set the fragment portion of the URI to + // preserve the user's desired name. We assume that there is no + // existing fragment in the URL since the shared cache manager does + // not use fragments. + uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(), + url.getPort(), url.getFile(), null, rn); + } else { + uri = new URI(url.getScheme(), url.getUserInfo(), url.getHost(), + url.getPort(), url.getFile(), null, null); + } + return uri; + } catch (URISyntaxException e) { + LOG.warn("Error trying to convert URL received from shared cache to" + + " a URI: " + url.toString()); + return null; + } + } else { + return null; + } + } + @VisibleForTesting void copyJar(Path originalJarPath, Path submitJarFile, short replication) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index cf597301c63..91541eb29b2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -193,6 +193,77 @@ public interface MRJobConfig { public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities"; + /** + * This parameter controls the visibility of the localized job jar on the node + * manager. If set to true, the visibility will be set to + * LocalResourceVisibility.PUBLIC. If set to false, the visibility will be set + * to LocalResourceVisibility.APPLICATION. This is a generated parameter and + * should not be set manually via config files. + */ + String JOBJAR_VISIBILITY = "mapreduce.job.jobjar.visibility"; + boolean JOBJAR_VISIBILITY_DEFAULT = false; + + /** + * This is a generated parameter and should not be set manually via config + * files. + */ + String JOBJAR_SHARED_CACHE_UPLOAD_POLICY = + "mapreduce.job.jobjar.sharedcache.uploadpolicy"; + boolean JOBJAR_SHARED_CACHE_UPLOAD_POLICY_DEFAULT = false; + + /** + * This is a generated parameter and should not be set manually via config + * files. + */ + String CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES = + "mapreduce.job.cache.files.sharedcache.uploadpolicies"; + + /** + * This is a generated parameter and should not be set manually via config + * files. + */ + String CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES = + "mapreduce.job.cache.archives.sharedcache.uploadpolicies"; + + /** + * A comma delimited list of file resources that are needed for this MapReduce + * job. These resources, if the files resource type is enabled, should either + * use the shared cache or be added to the shared cache. This parameter can be + * modified programmatically using the MapReduce Job api. + */ + String FILES_FOR_SHARED_CACHE = "mapreduce.job.cache.sharedcache.files"; + + /** + * A comma delimited list of libjar resources that are needed for this + * MapReduce job. These resources, if the libjars resource type is enabled, + * should either use the shared cache or be added to the shared cache. These + * resources will also be added to the classpath of all tasks for this + * MapReduce job. This parameter can be modified programmatically using the + * MapReduce Job api. + */ + String FILES_FOR_CLASSPATH_AND_SHARED_CACHE = + "mapreduce.job.cache.sharedcache.files.addtoclasspath"; + + /** + * A comma delimited list of archive resources that are needed for this + * MapReduce job. These resources, if the archives resource type is enabled, + * should either use the shared cache or be added to the shared cache. This + * parameter can be modified programmatically using the MapReduce Job api. + */ + String ARCHIVES_FOR_SHARED_CACHE = + "mapreduce.job.cache.sharedcache.archives"; + + /** + * A comma delimited list of resource categories that are enabled for the + * shared cache. If a category is enabled, resources in that category will be + * uploaded to the shared cache. The valid categories are: jobjar, libjars, + * files, archives. If "disabled" is specified then all categories are + * disabled. If "enabled" is specified then all categories are enabled. + */ + String SHARED_CACHE_MODE = "mapreduce.job.sharedcache.mode"; + + String SHARED_CACHE_MODE_DEFAULT = "disabled"; + /** * @deprecated Symlinks are always on and cannot be disabled. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java new file mode 100644 index 00000000000..de033e58b2f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/SharedCacheConfig.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce; + +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * A class for parsing configuration parameters associated with the shared + * cache. + */ +@Private +@Unstable +public class SharedCacheConfig { + protected static final Log LOG = LogFactory.getLog(SharedCacheConfig.class); + + private boolean sharedCacheFilesEnabled = false; + private boolean sharedCacheLibjarsEnabled = false; + private boolean sharedCacheArchivesEnabled = false; + private boolean sharedCacheJobjarEnabled = false; + + public void init(Configuration conf) { + if (!MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get( + MRConfig.FRAMEWORK_NAME))) { + // Shared cache is only valid if the job runs on yarn + return; + } + + if(!conf.getBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, + YarnConfiguration.DEFAULT_SHARED_CACHE_ENABLED)) { + return; + } + + + Collection configs = StringUtils.getTrimmedStringCollection( + conf.get(MRJobConfig.SHARED_CACHE_MODE, + MRJobConfig.SHARED_CACHE_MODE_DEFAULT)); + if (configs.contains("files")) { + this.sharedCacheFilesEnabled = true; + } + if (configs.contains("libjars")) { + this.sharedCacheLibjarsEnabled = true; + } + if (configs.contains("archives")) { + this.sharedCacheArchivesEnabled = true; + } + if (configs.contains("jobjar")) { + this.sharedCacheJobjarEnabled = true; + } + if (configs.contains("enabled")) { + this.sharedCacheFilesEnabled = true; + this.sharedCacheLibjarsEnabled = true; + this.sharedCacheArchivesEnabled = true; + this.sharedCacheJobjarEnabled = true; + } + if (configs.contains("disabled")) { + this.sharedCacheFilesEnabled = false; + this.sharedCacheLibjarsEnabled = false; + this.sharedCacheArchivesEnabled = false; + this.sharedCacheJobjarEnabled = false; + } + } + + public boolean isSharedCacheFilesEnabled() { + return sharedCacheFilesEnabled; + } + public boolean isSharedCacheLibjarsEnabled() { + return sharedCacheLibjarsEnabled; + } + public boolean isSharedCacheArchivesEnabled() { + return sharedCacheArchivesEnabled; + } + public boolean isSharedCacheJobjarEnabled() { + return sharedCacheJobjarEnabled; + } + public boolean isSharedCacheEnabled() { + return (sharedCacheFilesEnabled || sharedCacheLibjarsEnabled || + sharedCacheArchivesEnabled || sharedCacheJobjarEnabled); + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 6b6faf20329..9d166c78696 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -648,6 +648,17 @@ + + mapreduce.job.sharedcache.mode + disabled + + A comma delimited list of resource categories to submit to the shared cache. + The valid categories are: jobjar, libjars, files, archives. + If "disabled" is specified then the job submission code will not use + the shared cache. + + + mapreduce.input.fileinputformat.split.minsize 0 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md new file mode 100644 index 00000000000..9e3987c3020 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/SharedCacheSupport.md @@ -0,0 +1,100 @@ + + +MR Support for YARN Shared Cache +================== + + + +Overview +------- + +MapReduce support for the YARN shared cache allows MapReduce jobs to take advantage +of additional resource caching. This saves network bandwidth between the job +submission client as well as within the YARN cluster itself. This will reduce job +submission time and overall job runtime. + + +Enabling/Disabling the shared cache +------- + +First, your YARN cluster must have the shared cache service running. Please see YARN documentation +for information on how to setup the shared cache service. + +A MapReduce user can specify what resources are eligible to be uploaded to the shared cache +based on resource type. This is done using a configuration parameter in mapred-site.xml: + +``` + + mapreduce.job.sharedcache.mode + disabled + + A comma delimited list of resource categories to submit to the + shared cache. The valid categories are: jobjar, libjars, files, + archives. If "disabled" is specified then the job submission code + will not use the shared cache. + + +``` + +If a resource type is listed, it will check the shared cache to see if the resource is already in the +cache. If so, it will use the cached resource, if not, it will specify that the resource needs to be +uploaded asynchronously. + +Specifying resources for the cache +------- + +A MapReduce user has 3 ways to specify resources for a MapReduce job: + +1. **The command line via the generic options parser (i.e. -files, -archives, -libjars):** If a +resource is specified via the command line and the resource type is enabled for the +shared cache, that resource will use the shared cache. +2. **The distributed cache api:** If a resource is specified via the distributed cache the +resource will not use the shared cache regardless of if the resource type is enabled for +the shared cache. +3. **The shared cache api:** This is a new set of methods added to the +org.apache.hadoop.mapreduce.Job api. It allows users to add a file to the shared cache, +add it to the shared cache and the classpath and add an archive to the shared cache. +These resources will be placed in the distributed cache and, if their resource type is +enabled the client will use the shared cache as well. + +Resource naming +------- + +It is important to ensure that each resource for a MapReduce job has a unique file name. +This prevents symlink clobbering when YARN containers running MapReduce tasks are localized +during container launch. A user can specify their own resource name by using the fragment +portion of a URI. For example, for file resources specified on the command line, it could look +like this: +``` +-files /local/path/file1.txt#foo.txt,/local/path2/file1.txt#bar.txt +``` +In the above example two files, named file1.txt, will be localized with two different names: foo.txt +and bar.txt. + +Resource Visibility +------- + +All resources in the shared cache have a PUBLIC visibility. + + +MapReduce client behavior while the shared cache is unavailable +------- + +In the event that the shared cache manager is unavailable, the MapReduce client uses a fail-fast +mechanism. If the MapReduce client fails to contact the shared cache manager, the client will +no longer use the shared cache for the rest of that job submission. This +prevents the MapReduce client from timing out each time it tries to check for a resource +in the shared cache. The MapReduce client quickly reverts to the default behavior and submits a +Job as if the shared cache was never enabled in the first place. \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java index d0d7a349323..d347da55e80 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploader.java @@ -220,7 +220,7 @@ public class TestJobResourceUploader { destinationPathPrefix + "tmpArchives1.tgz#tmpArchivesfragment1.tgz" }; private String jobjarSubmitDir = "/jobjar-submit-dir"; - private String expectedJobJar = jobjarSubmitDir + "/job.jar"; + private String basicExpectedJobJar = jobjarSubmitDir + "/job.jar"; @Test public void testPathsWithNoFragNoSchemeRelative() throws IOException { @@ -236,7 +236,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags, - expectedArchivesNoFrags, expectedJobJar); + expectedArchivesNoFrags, basicExpectedJobJar); } @Test @@ -254,7 +254,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags, - expectedArchivesNoFrags, expectedJobJar); + expectedArchivesNoFrags, basicExpectedJobJar); } @Test @@ -272,7 +272,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags, - expectedArchivesWithFrags, expectedJobJar); + expectedArchivesWithFrags, basicExpectedJobJar); } @Test @@ -290,7 +290,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags, - expectedArchivesWithFrags, expectedJobJar); + expectedArchivesWithFrags, basicExpectedJobJar); } @Test @@ -308,7 +308,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags, - expectedArchivesWithFrags, expectedJobJar); + expectedArchivesWithFrags, basicExpectedJobJar); } @Test @@ -326,7 +326,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesNoFrags, - expectedArchivesNoFrags, expectedJobJar); + expectedArchivesNoFrags, basicExpectedJobJar); } @Test @@ -344,7 +344,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf, true); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithWildcard, - expectedArchivesNoFrags, expectedJobJar); + expectedArchivesNoFrags, basicExpectedJobJar); } @Test @@ -362,7 +362,7 @@ public class TestJobResourceUploader { JobResourceUploader uploader = new StubedUploader(jConf, true); runTmpResourcePathTest(uploader, rConf, jConf, expectedFilesWithFrags, - expectedArchivesWithFrags, expectedJobJar); + expectedArchivesWithFrags, basicExpectedJobJar); } @Test @@ -402,44 +402,39 @@ public class TestJobResourceUploader { private void runTmpResourcePathTest(JobResourceUploader uploader, ResourceConf rConf, JobConf jConf, String[] expectedFiles, String[] expectedArchives, String expectedJobJar) throws IOException { - rConf.setupJobConf(jConf); - // We use a pre and post job object here because we need the post job object - // to get the new values set during uploadResources, but we need the pre job - // to set the job jar because JobResourceUploader#uploadJobJar uses the Job - // interface not the JobConf. The post job is automatically created in - // validateResourcePaths. - Job jobPre = Job.getInstance(jConf); - uploadResources(uploader, jConf, jobPre); - - validateResourcePaths(jConf, expectedFiles, expectedArchives, - expectedJobJar, jobPre); + Job job = rConf.setupJobConf(jConf); + uploadResources(uploader, job); + validateResourcePaths(job, expectedFiles, expectedArchives, expectedJobJar); } - private void uploadResources(JobResourceUploader uploader, JobConf jConf, - Job job) throws IOException { - Collection files = jConf.getStringCollection("tmpfiles"); - Collection libjars = jConf.getStringCollection("tmpjars"); - Collection archives = jConf.getStringCollection("tmparchives"); - String jobJar = jConf.getJar(); - uploader.uploadFiles(jConf, files, new Path("/files-submit-dir"), null, - (short) 3); - uploader.uploadArchives(jConf, archives, new Path("/archives-submit-dir"), - null, (short) 3); - uploader.uploadLibJars(jConf, libjars, new Path("/libjars-submit-dir"), - null, (short) 3); - uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3); - } - - private void validateResourcePaths(JobConf jConf, String[] expectedFiles, - String[] expectedArchives, String expectedJobJar, Job preJob) + private void uploadResources(JobResourceUploader uploader, Job job) throws IOException { - Job j = Job.getInstance(jConf); - validateResourcePathsSub(j.getCacheFiles(), expectedFiles); - validateResourcePathsSub(j.getCacheArchives(), expectedArchives); + Configuration conf = job.getConfiguration(); + Collection files = conf.getStringCollection("tmpfiles"); + Collection libjars = conf.getStringCollection("tmpjars"); + Collection archives = conf.getStringCollection("tmparchives"); + Map statCache = new HashMap<>(); + Map fileSCUploadPolicies = new HashMap<>(); + String jobJar = job.getJar(); + uploader.uploadFiles(job, files, new Path("/files-submit-dir"), null, + (short) 3, fileSCUploadPolicies, statCache); + uploader.uploadArchives(job, archives, new Path("/archives-submit-dir"), + null, (short) 3, fileSCUploadPolicies, statCache); + uploader.uploadLibJars(job, libjars, new Path("/libjars-submit-dir"), null, + (short) 3, fileSCUploadPolicies, statCache); + uploader.uploadJobJar(job, jobJar, new Path(jobjarSubmitDir), (short) 3, + statCache); + } + + private void validateResourcePaths(Job job, String[] expectedFiles, + String[] expectedArchives, String expectedJobJar) + throws IOException { + validateResourcePathsSub(job.getCacheFiles(), expectedFiles); + validateResourcePathsSub(job.getCacheArchives(), expectedArchives); // We use a different job object here because the jobjar was set on a // different job object Assert.assertEquals("Job jar path is different than expected!", - expectedJobJar, preJob.getJar()); + expectedJobJar, job.getJar()); } private void validateResourcePathsSub(URI[] actualURIs, @@ -645,7 +640,7 @@ public class TestJobResourceUploader { } } - private void setupJobConf(JobConf conf) { + private Job setupJobConf(JobConf conf) throws IOException { conf.set("tmpfiles", buildPathString("tmpFiles", this.numOfTmpFiles, ".txt")); conf.set("tmpjars", @@ -675,6 +670,7 @@ public class TestJobResourceUploader { conf.setLong(MRJobConfig.MAX_RESOURCES_MB, this.maxResourcesMB); conf.setLong(MRJobConfig.MAX_SINGLE_RESOURCE_MB, this.maxSingleResourceMB); + return new Job(conf); } // We always want absolute paths with a scheme in the DistributedCache, so diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java new file mode 100644 index 00000000000..7598141f982 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobResourceUploaderWithSharedCache.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.jar.JarOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.client.api.SharedCacheClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Tests the JobResourceUploader class with the shared cache. + */ +public class TestJobResourceUploaderWithSharedCache { + protected static final Log LOG = LogFactory + .getLog(TestJobResourceUploaderWithSharedCache.class); + private static MiniDFSCluster dfs; + private static FileSystem localFs; + private static FileSystem remoteFs; + private static Configuration conf = new Configuration(); + private static Path testRootDir; + private static Path remoteStagingDir = + new Path(MRJobConfig.DEFAULT_MR_AM_STAGING_DIR); + private String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n"; + + @Before + public void cleanup() throws Exception { + remoteFs.delete(remoteStagingDir, true); + } + + @BeforeClass + public static void setup() throws IOException { + // create configuration, dfs, file system + localFs = FileSystem.getLocal(conf); + testRootDir = + new Path("target", + TestJobResourceUploaderWithSharedCache.class.getName() + "-tmpDir") + .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); + dfs = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + remoteFs = dfs.getFileSystem(); + } + + @AfterClass + public static void tearDown() { + try { + if (localFs != null) { + localFs.close(); + } + if (remoteFs != null) { + remoteFs.close(); + } + if (dfs != null) { + dfs.shutdown(); + } + } catch (IOException ioe) { + LOG.info("IO exception in closing file system"); + ioe.printStackTrace(); + } + } + + private class MyFileUploader extends JobResourceUploader { + // The mocked SharedCacheClient that will be fed into the FileUploader + private SharedCacheClient mockscClient = mock(SharedCacheClient.class); + // A real client for checksum calculation + private SharedCacheClient scClient = SharedCacheClient + .createSharedCacheClient(); + + MyFileUploader(FileSystem submitFs, Configuration conf) + throws IOException { + super(submitFs, false); + // Initialize the real client, but don't start it. We don't need or want + // to create an actual proxy because we only use this for mocking out the + // getFileChecksum method. + scClient.init(conf); + when(mockscClient.getFileChecksum(any(Path.class))).thenAnswer( + new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + Path file = (Path) invocation.getArguments()[0]; + // Use the real scClient to generate the checksum. We use an + // answer/mock combination to avoid having to spy on a real + // SharedCacheClient object. + return scClient.getFileChecksum(file); + } + }); + } + + // This method is to prime the mock client with the correct checksum, so it + // looks like a given resource is present in the shared cache. + public void mockFileInSharedCache(Path localFile, URL remoteFile) + throws YarnException, IOException { + // when the resource is referenced, simply return the remote path to the + // caller + when(mockscClient.use(any(ApplicationId.class), + eq(scClient.getFileChecksum(localFile)))).thenReturn(remoteFile); + } + + @Override + protected SharedCacheClient createSharedCacheClient(Configuration c) { + // Feed the mocked SharedCacheClient into the FileUploader logic + return mockscClient; + } + } + + @Test + public void testSharedCacheDisabled() throws Exception { + JobConf jobConf = createJobConf(); + Job job = new Job(jobConf); + job.setJobID(new JobID("567789", 1)); + + // shared cache is disabled by default + uploadFilesToRemoteFS(job, jobConf, 0, 0, 0, false); + + } + + @Test + public void testSharedCacheEnabled() throws Exception { + JobConf jobConf = createJobConf(); + jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled"); + Job job = new Job(jobConf); + job.setJobID(new JobID("567789", 1)); + + // shared cache is enabled for every file type + // the # of times SharedCacheClient.use is called should == + // total # of files/libjars/archive/jobjar + uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, false); + } + + @Test + public void testSharedCacheEnabledWithJobJarInSharedCache() + throws Exception { + JobConf jobConf = createJobConf(); + jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled"); + Job job = new Job(jobConf); + job.setJobID(new JobID("567789", 1)); + + // shared cache is enabled for every file type + // the # of times SharedCacheClient.use is called should == + // total # of files/libjars/archive/jobjar + uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, true); + } + + @Test + public void testSharedCacheArchivesAndLibjarsEnabled() throws Exception { + JobConf jobConf = createJobConf(); + jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "archives,libjars"); + Job job = new Job(jobConf); + job.setJobID(new JobID("567789", 1)); + + // shared cache is enabled for archives and libjars type + // the # of times SharedCacheClient.use is called should == + // total # of libjars and archives + uploadFilesToRemoteFS(job, jobConf, 5, 1, 2, true); + } + + private JobConf createJobConf() { + JobConf jobConf = new JobConf(); + jobConf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + jobConf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); + + jobConf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, remoteFs.getUri() + .toString()); + return jobConf; + } + + private Path copyToRemote(Path jar) throws IOException { + Path remoteFile = new Path("/tmp", jar.getName()); + remoteFs.copyFromLocalFile(jar, remoteFile); + return remoteFile; + } + + private void makeJarAvailableInSharedCache(Path jar, + MyFileUploader fileUploader) throws YarnException, IOException { + // copy file to remote file system + Path remoteFile = copyToRemote(jar); + // prime mocking so that it looks like this file is in the shared cache + fileUploader.mockFileInSharedCache(jar, URL.fromPath(remoteFile)); + } + + private void uploadFilesToRemoteFS(Job job, JobConf jobConf, + int useCallCountExpected, + int numOfFilesShouldBeUploadedToSharedCacheExpected, + int numOfArchivesShouldBeUploadedToSharedCacheExpected, + boolean jobJarInSharedCacheBeforeUpload) throws Exception { + MyFileUploader fileUploader = new MyFileUploader(remoteFs, jobConf); + SharedCacheConfig sharedCacheConfig = new SharedCacheConfig(); + sharedCacheConfig.init(jobConf); + + Path firstFile = createTempFile("first-input-file", "x"); + Path secondFile = createTempFile("second-input-file", "xx"); + + // Add files to job conf via distributed cache API as well as command line + boolean fileAdded = Job.addFileToSharedCache(firstFile.toUri(), jobConf); + assertEquals(sharedCacheConfig.isSharedCacheFilesEnabled(), fileAdded); + if (!fileAdded) { + Path remoteFile = copyToRemote(firstFile); + job.addCacheFile(remoteFile.toUri()); + } + jobConf.set("tmpfiles", secondFile.toString()); + + // Create jars with a single file inside them. + Path firstJar = makeJar(new Path(testRootDir, "distributed.first.jar"), 1); + Path secondJar = + makeJar(new Path(testRootDir, "distributed.second.jar"), 2); + + // Verify duplicated contents can be handled properly. + Path thirdJar = new Path(testRootDir, "distributed.third.jar"); + localFs.copyFromLocalFile(secondJar, thirdJar); + + // make secondJar cache available + makeJarAvailableInSharedCache(secondJar, fileUploader); + + // Add libjars to job conf via distributed cache API as well as command + // line + boolean libjarAdded = + Job.addFileToSharedCacheAndClasspath(firstJar.toUri(), jobConf); + assertEquals(sharedCacheConfig.isSharedCacheLibjarsEnabled(), libjarAdded); + if (!libjarAdded) { + Path remoteJar = copyToRemote(firstJar); + job.addFileToClassPath(remoteJar); + } + + jobConf.set("tmpjars", secondJar.toString() + "," + thirdJar.toString()); + + Path firstArchive = makeArchive("first-archive.zip", "first-file"); + Path secondArchive = makeArchive("second-archive.zip", "second-file"); + + // Add archives to job conf via distributed cache API as well as command + // line + boolean archiveAdded = + Job.addArchiveToSharedCache(firstArchive.toUri(), jobConf); + assertEquals(sharedCacheConfig.isSharedCacheArchivesEnabled(), + archiveAdded); + if (!archiveAdded) { + Path remoteArchive = copyToRemote(firstArchive); + job.addCacheArchive(remoteArchive.toUri()); + } + + jobConf.set("tmparchives", secondArchive.toString()); + + // Add job jar to job conf + Path jobJar = makeJar(new Path(testRootDir, "test-job.jar"), 4); + if (jobJarInSharedCacheBeforeUpload) { + makeJarAvailableInSharedCache(jobJar, fileUploader); + } + jobConf.setJar(jobJar.toString()); + + fileUploader.uploadResources(job, remoteStagingDir); + + verify(fileUploader.mockscClient, times(useCallCountExpected)).use( + any(ApplicationId.class), anyString()); + + int numOfFilesShouldBeUploadedToSharedCache = 0; + Map filesSharedCacheUploadPolicies = + Job.getFileSharedCacheUploadPolicies(jobConf); + for (Boolean policy : filesSharedCacheUploadPolicies.values()) { + if (policy) { + numOfFilesShouldBeUploadedToSharedCache++; + } + } + assertEquals(numOfFilesShouldBeUploadedToSharedCacheExpected, + numOfFilesShouldBeUploadedToSharedCache); + + int numOfArchivesShouldBeUploadedToSharedCache = 0; + Map archivesSharedCacheUploadPolicies = + Job.getArchiveSharedCacheUploadPolicies(jobConf); + for (Boolean policy : archivesSharedCacheUploadPolicies.values()) { + if (policy) { + numOfArchivesShouldBeUploadedToSharedCache++; + } + } + assertEquals(numOfArchivesShouldBeUploadedToSharedCacheExpected, + numOfArchivesShouldBeUploadedToSharedCache); + } + + + private Path createTempFile(String filename, String contents) + throws IOException { + Path path = new Path(testRootDir, filename); + FSDataOutputStream os = localFs.create(path); + os.writeBytes(contents); + os.close(); + localFs.setPermission(path, new FsPermission("700")); + return path; + } + + private Path makeJar(Path p, int index) throws FileNotFoundException, + IOException { + FileOutputStream fos = + new FileOutputStream(new File(p.toUri().getPath())); + JarOutputStream jos = new JarOutputStream(fos); + ZipEntry ze = new ZipEntry("distributed.jar.inside" + index); + jos.putNextEntry(ze); + jos.write(("inside the jar!" + index).getBytes()); + jos.closeEntry(); + jos.close(); + localFs.setPermission(p, new FsPermission("700")); + return p; + } + + private Path makeArchive(String archiveFile, String filename) + throws Exception { + Path archive = new Path(testRootDir, archiveFile); + Path file = new Path(testRootDir, filename); + DataOutputStream out = localFs.create(archive); + ZipOutputStream zos = new ZipOutputStream(out); + ZipEntry ze = new ZipEntry(file.toString()); + zos.putNextEntry(ze); + zos.write(input.getBytes("UTF-8")); + zos.closeEntry(); + zos.close(); + return archive; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 1baa467cdeb..a23ff34b574 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -19,6 +19,8 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -338,16 +340,41 @@ public class YARNRunner implements ClientProtocol { } } - private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type) - throws IOException { + private LocalResource createApplicationResource(FileContext fs, Path p, + LocalResourceType type) throws IOException { + return createApplicationResource(fs, p, null, type, + LocalResourceVisibility.APPLICATION, false); + } + + private LocalResource createApplicationResource(FileContext fs, Path p, + String fileSymlink, LocalResourceType type, LocalResourceVisibility viz, + Boolean uploadToSharedCache) throws IOException { LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class); FileStatus rsrcStat = fs.getFileStatus(p); - rsrc.setResource(URL.fromPath(fs - .getDefaultFileSystem().resolvePath(rsrcStat.getPath()))); + // We need to be careful when converting from path to URL to add a fragment + // so that the symlink name when localized will be correct. + Path qualifiedPath = + fs.getDefaultFileSystem().resolvePath(rsrcStat.getPath()); + URI uriWithFragment = null; + boolean useFragment = fileSymlink != null && !fileSymlink.equals(""); + try { + if (useFragment) { + uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink); + } else { + uriWithFragment = qualifiedPath.toUri(); + } + } catch (URISyntaxException e) { + throw new IOException( + "Error parsing local resource path." + + " Path was not able to be converted to a URI: " + qualifiedPath, + e); + } + rsrc.setResource(URL.fromURI(uriWithFragment)); rsrc.setSize(rsrcStat.getLen()); rsrc.setTimestamp(rsrcStat.getModificationTime()); rsrc.setType(type); - rsrc.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc.setVisibility(viz); + rsrc.setShouldBeUploadedToSharedCache(uploadToSharedCache); return rsrc; } @@ -368,10 +395,21 @@ public class YARNRunner implements ClientProtocol { jobConfPath, LocalResourceType.FILE)); if (jobConf.get(MRJobConfig.JAR) != null) { Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR)); + // We hard code the job.jar symlink because mapreduce code expects the + // job.jar to be named that way. + FileContext fccc = + FileContext.getFileContext(jobJarPath.toUri(), jobConf); + LocalResourceVisibility jobJarViz = + jobConf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY, + MRJobConfig.JOBJAR_VISIBILITY_DEFAULT) + ? LocalResourceVisibility.PUBLIC + : LocalResourceVisibility.APPLICATION; LocalResource rc = createApplicationResource( - FileContext.getFileContext(jobJarPath.toUri(), jobConf), - jobJarPath, - LocalResourceType.PATTERN); + FileContext.getFileContext(jobJarPath.toUri(), jobConf), jobJarPath, + MRJobConfig.JOB_JAR, LocalResourceType.PATTERN, jobJarViz, + jobConf.getBoolean( + MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY, + MRJobConfig.JOBJAR_SHARED_CACHE_UPLOAD_POLICY_DEFAULT)); String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); rc.setPattern(pattern); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java index 4a2b8575325..a3ea26e81f0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java @@ -132,6 +132,58 @@ public class TestLocalJobSubmission { } } + /** + * Test local job submission with a file option. + * + * @throws IOException + */ + @Test + public void testLocalJobFilesOption() throws IOException { + Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar")); + + Configuration conf = new Configuration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000"); + conf.set(MRConfig.FRAMEWORK_NAME, "local"); + final String[] args = + {"-jt", "local", "-files", jarPath.toString(), "-m", "1", "-r", "1", + "-mt", "1", "-rt", "1"}; + int res = -1; + try { + res = ToolRunner.run(conf, new SleepJob(), args); + } catch (Exception e) { + System.out.println("Job failed with " + e.getLocalizedMessage()); + e.printStackTrace(System.out); + fail("Job failed"); + } + assertEquals("dist job res is not 0:", 0, res); + } + + /** + * Test local job submission with an archive option. + * + * @throws IOException + */ + @Test + public void testLocalJobArchivesOption() throws IOException { + Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar")); + + Configuration conf = new Configuration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000"); + conf.set(MRConfig.FRAMEWORK_NAME, "local"); + final String[] args = + {"-jt", "local", "-archives", jarPath.toString(), "-m", "1", "-r", + "1", "-mt", "1", "-rt", "1"}; + int res = -1; + try { + res = ToolRunner.run(conf, new SleepJob(), args); + } catch (Exception e) { + System.out.println("Job failed with " + e.getLocalizedMessage()); + e.printStackTrace(System.out); + fail("Job failed"); + } + assertEquals("dist job res is not 0:", 0, res); + } + private Path makeJar(Path p) throws IOException { FileOutputStream fos = new FileOutputStream(new File(p.toString())); JarOutputStream jos = new JarOutputStream(fos); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index 22cb5301d0d..6e280addc24 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -1298,6 +1298,65 @@ public class TestMRJobs { jarFile.delete(); } + @Test + public void testSharedCache() throws Exception { + Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString()); + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + Job job = Job.getInstance(mrCluster.getConfig()); + + Configuration jobConf = job.getConfiguration(); + jobConf.set(MRJobConfig.SHARED_CACHE_MODE, "enabled"); + + Path inputFile = createTempFile("input-file", "x"); + + // Create jars with a single file inside them. + Path second = makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2); + Path third = makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3); + Path fourth = makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4); + + // Add libjars to job conf + jobConf.set("tmpjars", second.toString() + "," + third.toString() + "," + + fourth.toString()); + + // Because the job jar is a "dummy" jar, we need to include the jar with + // DistributedCacheChecker or it won't be able to find it + Path distributedCacheCheckerJar = + new Path(JarFinder.getJar(SharedCacheChecker.class)); + job.addFileToClassPath(distributedCacheCheckerJar.makeQualified( + localFs.getUri(), distributedCacheCheckerJar.getParent())); + + job.setMapperClass(SharedCacheChecker.class); + job.setOutputFormatClass(NullOutputFormat.class); + + FileInputFormat.setInputPaths(job, inputFile); + + job.setMaxMapAttempts(1); // speed up failures + + job.submit(); + String trackingUrl = job.getTrackingURL(); + String jobId = job.getJobID().toString(); + Assert.assertTrue(job.waitForCompletion(true)); + Assert.assertTrue("Tracking URL was " + trackingUrl + + " but didn't Match Job ID " + jobId, + trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/")); + } + + /** + * An identity mapper for testing the shared cache. + */ + public static class SharedCacheChecker extends + Mapper { + @Override + public void setup(Context context) throws IOException { + } + } + public static class ConfVerificationMapper extends SleepMapper { @Override protected void setup(Context context) diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml index 0b1f6abbae6..aebf719ffbb 100644 --- a/hadoop-project/src/site/site.xml +++ b/hadoop-project/src/site/site.xml @@ -112,6 +112,7 @@ +