svn merge -c 1528237 FIXES: MAPREDUCE-4421. Run MapReduce framework via the distributed cache. Contributed by Jason Lowe

git-svn-id: 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-10-01 22:40:01 +00:00
parent 90abcc481a
commit 7e812e7126
8 changed files with 279 additions and 13 deletions

View File

@ -25,6 +25,8 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and
Aaron Kimball via Sandy Ryza)
MAPREDUCE-4421. Run MapReduce framework via the distributed cache (jlowe)
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

View File

@ -21,6 +21,7 @@
@ -133,6 +134,30 @@ public static TaskAttemptStateUI taskAttemptState(String attemptStateStr) {
return TaskAttemptStateUI.valueOf(attemptStateStr);
// gets the base name of the MapReduce framework or null if no
// framework was configured
private static String getMRFrameworkName(Configuration conf) {
String frameworkName = null;
String framework =
if (!framework.isEmpty()) {
URI uri;
try {
uri = new URI(framework);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Unable to parse '" + framework
+ "' as a URI, check the setting for "
frameworkName = uri.getFragment();
if (frameworkName == null) {
frameworkName = new Path(uri).getName();
return frameworkName;
private static void setMRFrameworkClasspath(
Map<String, String> environment, Configuration conf) throws IOException {
// Propagate the system classpath when using the mini cluster
@ -141,18 +166,33 @@ private static void setMRFrameworkClasspath(
// Add standard Hadoop classes
for (String c : conf.getStrings(
Apps.addToEnvironment(environment,, c
// if the framework is specified then only use the MR classpath
String frameworkName = getMRFrameworkName(conf);
if (frameworkName == null) {
// Add standard Hadoop classes
for (String c : conf.getStrings(
Apps.addToEnvironment(environment,, c
boolean foundFrameworkInClasspath = (frameworkName == null);
for (String c : conf.getStrings(
Apps.addToEnvironment(environment,, c
if (!foundFrameworkInClasspath) {
foundFrameworkInClasspath = c.contains(frameworkName);
if (!foundFrameworkInClasspath) {
throw new IllegalArgumentException(
"Could not locate MapReduce framework name '" + frameworkName
// TODO: Remove duplicates.

View File

@ -283,7 +283,46 @@ public void testSetClasspathWithJobClassloader() throws IOException {
assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app"
+ " classpath!", expectedAppClasspath, appCp);
@Test (timeout = 3000000)
public void testSetClasspathWithFramework() throws IOException {
final String FRAMEWORK_NAME = "some-framework-name";
final String FRAMEWORK_PATH = "some-framework-path#" + FRAMEWORK_NAME;
Configuration conf = new Configuration();
Map<String, String> env = new HashMap<String, String>();
try {
MRApps.setClasspath(env, conf);
fail("Failed to catch framework path set without classpath change");
} catch (IllegalArgumentException e) {
assertTrue("Unexpected IllegalArgumentException",
e.getMessage().contains("Could not locate MapReduce framework name '"
MRApps.setClasspath(env, conf);
final String stdClasspath = StringUtils.join(File.pathSeparator,
Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
ApplicationConstants.Environment.PWD.$() + "/*"));
String expectedClasspath = StringUtils.join(File.pathSeparator,
assertEquals("Incorrect classpath with framework and no user precedence",
expectedClasspath, env.get("CLASSPATH"));
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
MRApps.setClasspath(env, conf);
expectedClasspath = StringUtils.join(File.pathSeparator,
assertEquals("Incorrect classpath with framework and user precedence",
expectedClasspath, env.get("CLASSPATH"));
@Test (timeout = 30000)
public void testSetupDistributedCacheEmpty() throws IOException {
Configuration conf = new Configuration();

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@ -340,11 +341,12 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
//validate the jobs output specs
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,
//configure the command line options correctly on the submitting dfs
Configuration conf = job.getConfiguration();
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
@ -602,7 +604,6 @@ private void readTokensFromFiles(Configuration conf, Credentials credentials)
//get secret keys and tokens and store them into TokenCache
private void populateTokenCache(Configuration conf, Credentials credentials)
throws IOException{
readTokensFromFiles(conf, credentials);
@ -618,4 +619,41 @@ private void populateTokenCache(Configuration conf, Credentials credentials)
TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
private static void addMRFrameworkToDistributedCache(Configuration conf)
throws IOException {
String framework =
if (!framework.isEmpty()) {
URI uri;
try {
uri = new URI(framework);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Unable to parse '" + framework
+ "' as a URI, check the setting for "
String linkedName = uri.getFragment();
// resolve any symlinks in the URI path so using a "current" symlink
// to point to a specific version shows the specific version
// in the distributed cache configuration
FileSystem fs = FileSystem.get(conf);
Path frameworkPath = fs.makeQualified(
new Path(uri.getScheme(), uri.getAuthority(), uri.getPath()));
FileContext fc = FileContext.getFileContext(frameworkPath.toUri(), conf);
frameworkPath = fc.resolvePath(frameworkPath);
uri = frameworkPath.toUri();
try {
uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(),
null, linkedName);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
DistributedCache.addCacheArchive(uri, conf);

View File

@ -644,6 +644,12 @@ public interface MRJobConfig {
public static final String MAPREDUCE_APPLICATION_CLASSPATH =
* Path to MapReduce framework archive
* Default CLASSPATH for all YARN MapReduce applications.

View File

@ -1458,11 +1458,31 @@
<description>CLASSPATH for MR applications. A comma-separated list
of CLASSPATH entries</description>
of CLASSPATH entries. If mapreduce.application.framework is set then this
must specify the appropriate classpath for that archive, and the name of
the archive must be present in the classpath.</description>
<description>Path to the MapReduce framework archive. If set, the framework
archive will automatically be distributed along with the job, and this
path would normally reside in a public location in an HDFS filesystem. As
with distributed cache files, this can be a URL with a fragment specifying
the alias to use for the archive name. For example,
hdfs:/mapred/framework/hadoop-mapreduce-2.1.1.tar.gz#mrframework would
alias the localized archive as "mrframework".
Note that mapreduce.application.classpath must include the appropriate
classpath for the specified framework. The base name of the archive, or
alias of the archive if an alias is used, must appear in the specified

View File

@ -0,0 +1,120 @@
~~ Licensed under the Apache License, Version 2.0 (the "License");
~~ you may not use this file except in compliance with the License.
~~ You may obtain a copy of the License at
~~ Unless required by applicable law or agreed to in writing, software
~~ distributed under the License is distributed on an "AS IS" BASIS,
~~ See the License for the specific language governing permissions and
~~ limitations under the License. See accompanying LICENSE file.
Hadoop Map Reduce Next Generation-${project.version} - Distributed Cache Deploy
Hadoop MapReduce Next Generation - Distributed Cache Deploy
\[ {{{./index.html}Go Back}} \]
* Introduction
The MapReduce application framework has rudimentary support for deploying a
new version of the MapReduce framework via the distributed cache. By setting
the appropriate configuration properties, users can run a different version
of MapReduce than the one initially deployed to the cluster. For example,
cluster administrators can place multiple versions of MapReduce in HDFS and
configure <<<mapred-site.xml>>> to specify which version jobs will use by
default. This allows the administrators to perform a rolling upgrade of the
MapReduce framework under certain conditions.
* Preconditions and Limitations
The support for deploying the MapReduce framework via the distributed cache
currently does not address the job client code used to submit and query
jobs. It also does not address the <<<ShuffleHandler>>> code that runs as an
auxilliary service within each NodeManager. As a result the following
limitations apply to MapReduce versions that can be successfully deployed via
the distributed cache in a rolling upgrade fashion:
* The MapReduce version must be compatible with the job client code used to
submit and query jobs. If it is incompatible then the job client must be
upgraded separately on any node from which jobs using the new MapReduce
version will be submitted or queried.
* The MapReduce version must be compatible with the configuration files used
by the job client submitting the jobs. If it is incompatible with that
configuration (e.g.: a new property must be set or an existing property
value changed) then the configuration must be updated first.
* The MapReduce version must be compatible with the <<<ShuffleHandler>>>
version running on the nodes in the cluster. If it is incompatible then the
new <<<ShuffleHandler>>> code must be deployed to all the nodes in the
cluster, and the NodeManagers must be restarted to pick up the new
<<<ShuffleHandler>>> code.
* Deploying a New MapReduce Version via the Distributed Cache
Deploying a new MapReduce version consists of three steps:
[[1]] Upload the MapReduce archive to a location that can be accessed by the
job submission client. Ideally the archive should be on the cluster's default
filesystem at a publicly-readable path. See the archive location discussion
below for more details.
[[2]] Configure <<<mapreduce.application.framework.path>>> to point to the
location where the archive is located. As when specifying distributed cache
files for a job, this is a URL that also supports creating an alias for the
archive if a URL fragment is specified. For example,
<<<hdfs:/mapred/framework/hadoop-mapreduce-2.1.1.tar.gz#mrframework>>> will
be localized as <<<mrframework>>> rather than
[[3]] Configure <<<mapreduce.application.classpath>>> to set the proper
classpath to use with the MapReduce archive configured above. NOTE: An error
occurs if <<<mapreduce.application.framework.path>>> is configured but
<<<mapreduce.application.classpath>>> does not reference the base name of the
archive path or the alias if an alias was specified.
** Location of the MapReduce Archive and How It Affects Job Performance
Note that the location of the MapReduce archive can be critical to job
submission and job startup performance. If the archive is not located on the
cluster's default filesystem then it will be copied to the job staging
directory for each job and localized to each node where the job's tasks
run. This will slow down job submission and task startup performance.
If the archive is located on the default filesystem then the job client will
not upload the archive to the job staging directory for each job
submission. However if the archive path is not readable by all cluster users
then the archive will be localized separately for each user on each node
where tasks execute. This can cause unnecessary duplication in the
distributed cache.
When working with a large cluster it can be important to increase the
replication factor of the archive to increase its availability. This will
spread the load when the nodes in the cluster localize the archive for the
first time.
* MapReduce Archives and Classpath Configuration
Setting a proper classpath for the MapReduce archive depends upon the
composition of the archive and whether it has any additional dependencies.
For example, the archive can contain not only the MapReduce jars but also the
necessary YARN, HDFS, and Hadoop Common jars and all other dependencies. In
that case, <<<mapreduce.application.classpath>>> would be configured to
something like the following example, where the archive basename is
hadoop-mapreduce-2.1.1.tar.gz and the archive is organized internally similar
to the standard Hadoop distribution archive:
Another possible approach is to have the archive consist of just the
MapReduce jars and have the remaining dependencies picked up from the Hadoop
distribution installed on the nodes. In that case, the above example would
change to something like the following:

View File

@ -86,6 +86,7 @@
<item name="Compatibilty between Hadoop 1.x and Hadoop 2.x" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduce_Compatibility_Hadoop1_Hadoop2.html"/>
<item name="Encrypted Shuffle" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/EncryptedShuffle.html"/>
<item name="Pluggable Shuffle/Sort" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/PluggableShuffleAndPluggableSort.html"/>
<item name="Distributed Cache Deploy" href="hadoop-mapreduce-client/hadoop-mapreduce-client-core/DistributedCacheDeploy.html"/>
<menu name="YARN" inherit="top">