diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c4513fa249b..e72419a718e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -25,6 +25,8 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and Aaron Kimball via Sandy Ryza) + MAPREDUCE-4421. Run MapReduce framework via the distributed cache (jlowe) + OPTIMIZATIONS MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java index 1c05060db34..977e0066bef 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.util; import java.io.IOException; import java.net.MalformedURLException; import java.net.URI; +import java.net.URISyntaxException; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; @@ -133,6 +134,30 @@ public class MRApps extends Apps { return TaskAttemptStateUI.valueOf(attemptStateStr); } + // gets the base name of the MapReduce framework or null if no + // framework was configured + private static String getMRFrameworkName(Configuration conf) { + String frameworkName = null; + String framework = + conf.get(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, ""); + if (!framework.isEmpty()) { + URI uri; + try { + uri = new URI(framework); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Unable to parse '" + framework + + "' as a URI, check the setting for " + + MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, e); + } + + frameworkName = uri.getFragment(); + if (frameworkName == null) { + frameworkName = new Path(uri).getName(); + } + } + return frameworkName; + } + private static void setMRFrameworkClasspath( Map environment, Configuration conf) throws IOException { // Propagate the system classpath when using the mini cluster @@ -141,18 +166,33 @@ public class MRApps extends Apps { System.getProperty("java.class.path")); } - // Add standard Hadoop classes - for (String c : conf.getStrings( - YarnConfiguration.YARN_APPLICATION_CLASSPATH, - YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { - Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c - .trim()); + // if the framework is specified then only use the MR classpath + String frameworkName = getMRFrameworkName(conf); + if (frameworkName == null) { + // Add standard Hadoop classes + for (String c : conf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c + .trim()); + } } + + boolean foundFrameworkInClasspath = (frameworkName == null); for (String c : conf.getStrings( MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) { Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c .trim()); + if (!foundFrameworkInClasspath) { + foundFrameworkInClasspath = c.contains(frameworkName); + } + } + + if (!foundFrameworkInClasspath) { + throw new IllegalArgumentException( + "Could not locate MapReduce framework name '" + frameworkName + + "' in " + MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH); } // TODO: Remove duplicates. } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java index 3e6a41d2d65..ea4e1635483 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java @@ -283,7 +283,46 @@ public class TestMRApps { assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app" + " classpath!", expectedAppClasspath, appCp); } - + + @Test (timeout = 3000000) + public void testSetClasspathWithFramework() throws IOException { + final String FRAMEWORK_NAME = "some-framework-name"; + final String FRAMEWORK_PATH = "some-framework-path#" + FRAMEWORK_NAME; + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, FRAMEWORK_PATH); + Map env = new HashMap(); + try { + MRApps.setClasspath(env, conf); + fail("Failed to catch framework path set without classpath change"); + } catch (IllegalArgumentException e) { + assertTrue("Unexpected IllegalArgumentException", + e.getMessage().contains("Could not locate MapReduce framework name '" + + FRAMEWORK_NAME + "'")); + } + + env.clear(); + final String FRAMEWORK_CLASSPATH = FRAMEWORK_NAME + "/*.jar"; + conf.set(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, FRAMEWORK_CLASSPATH); + MRApps.setClasspath(env, conf); + final String stdClasspath = StringUtils.join(File.pathSeparator, + Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*", + ApplicationConstants.Environment.PWD.$() + "/*")); + String expectedClasspath = StringUtils.join(File.pathSeparator, + Arrays.asList(ApplicationConstants.Environment.PWD.$(), + FRAMEWORK_CLASSPATH, stdClasspath)); + assertEquals("Incorrect classpath with framework and no user precedence", + expectedClasspath, env.get("CLASSPATH")); + + env.clear(); + conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); + MRApps.setClasspath(env, conf); + expectedClasspath = StringUtils.join(File.pathSeparator, + Arrays.asList(ApplicationConstants.Environment.PWD.$(), + stdClasspath, FRAMEWORK_CLASSPATH)); + assertEquals("Incorrect classpath with framework and user precedence", + expectedClasspath, env.get("CLASSPATH")); + } + @Test (timeout = 30000) public void testSetupDistributedCacheEmpty() throws IOException { Configuration conf = new Configuration(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index d83a3dd7ab9..94e71257498 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -340,11 +341,12 @@ class JobSubmitter { //validate the jobs output specs checkSpecs(job); - - Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, - job.getConfiguration()); - //configure the command line options correctly on the submitting dfs + Configuration conf = job.getConfiguration(); + addMRFrameworkToDistributedCache(conf); + + Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); + //configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); @@ -602,7 +604,6 @@ class JobSubmitter { } //get secret keys and tokens and store them into TokenCache - @SuppressWarnings("unchecked") private void populateTokenCache(Configuration conf, Credentials credentials) throws IOException{ readTokensFromFiles(conf, credentials); @@ -618,4 +619,41 @@ class JobSubmitter { TokenCache.obtainTokensForNamenodes(credentials, ps, conf); } } + + @SuppressWarnings("deprecation") + private static void addMRFrameworkToDistributedCache(Configuration conf) + throws IOException { + String framework = + conf.get(MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, ""); + if (!framework.isEmpty()) { + URI uri; + try { + uri = new URI(framework); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Unable to parse '" + framework + + "' as a URI, check the setting for " + + MRJobConfig.MAPREDUCE_APPLICATION_FRAMEWORK_PATH, e); + } + + String linkedName = uri.getFragment(); + + // resolve any symlinks in the URI path so using a "current" symlink + // to point to a specific version shows the specific version + // in the distributed cache configuration + FileSystem fs = FileSystem.get(conf); + Path frameworkPath = fs.makeQualified( + new Path(uri.getScheme(), uri.getAuthority(), uri.getPath())); + FileContext fc = FileContext.getFileContext(frameworkPath.toUri(), conf); + frameworkPath = fc.resolvePath(frameworkPath); + uri = frameworkPath.toUri(); + try { + uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), + null, linkedName); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + + DistributedCache.addCacheArchive(uri, conf); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 716d93ddf0a..93f7e50a132 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -644,6 +644,12 @@ public interface MRJobConfig { public static final String MAPREDUCE_APPLICATION_CLASSPATH = "mapreduce.application.classpath"; + /** + * Path to MapReduce framework archive + */ + public static final String MAPREDUCE_APPLICATION_FRAMEWORK_PATH = + "mapreduce.application.framework.path"; + /** * Default CLASSPATH for all YARN MapReduce applications. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index a9149a4c7d4..9c7a6abc724 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1458,11 +1458,31 @@ CLASSPATH for MR applications. A comma-separated list - of CLASSPATH entries + of CLASSPATH entries. If mapreduce.application.framework is set then this + must specify the appropriate classpath for that archive, and the name of + the archive must be present in the classpath. mapreduce.application.classpath $HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/* + + Path to the MapReduce framework archive. If set, the framework + archive will automatically be distributed along with the job, and this + path would normally reside in a public location in an HDFS filesystem. As + with distributed cache files, this can be a URL with a fragment specifying + the alias to use for the archive name. For example, + hdfs:/mapred/framework/hadoop-mapreduce-2.1.1.tar.gz#mrframework would + alias the localized archive as "mrframework". + + Note that mapreduce.application.classpath must include the appropriate + classpath for the specified framework. The base name of the archive, or + alias of the archive if an alias is used, must appear in the specified + classpath. + + mapreduce.application.framework.path + + + mapreduce.job.classloader false diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm new file mode 100644 index 00000000000..302f8d110af --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm @@ -0,0 +1,120 @@ +~~ Licensed under the Apache License, Version 2.0 (the "License"); +~~ you may not use this file except in compliance with the License. +~~ You may obtain a copy of the License at +~~ +~~ http://www.apache.org/licenses/LICENSE-2.0 +~~ +~~ Unless required by applicable law or agreed to in writing, software +~~ distributed under the License is distributed on an "AS IS" BASIS, +~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~~ See the License for the specific language governing permissions and +~~ limitations under the License. See accompanying LICENSE file. + + --- + Hadoop Map Reduce Next Generation-${project.version} - Distributed Cache Deploy + --- + --- + ${maven.build.timestamp} + +Hadoop MapReduce Next Generation - Distributed Cache Deploy + + \[ {{{./index.html}Go Back}} \] + +* Introduction + + The MapReduce application framework has rudimentary support for deploying a + new version of the MapReduce framework via the distributed cache. By setting + the appropriate configuration properties, users can run a different version + of MapReduce than the one initially deployed to the cluster. For example, + cluster administrators can place multiple versions of MapReduce in HDFS and + configure <<>> to specify which version jobs will use by + default. This allows the administrators to perform a rolling upgrade of the + MapReduce framework under certain conditions. + +* Preconditions and Limitations + + The support for deploying the MapReduce framework via the distributed cache + currently does not address the job client code used to submit and query + jobs. It also does not address the <<>> code that runs as an + auxilliary service within each NodeManager. As a result the following + limitations apply to MapReduce versions that can be successfully deployed via + the distributed cache in a rolling upgrade fashion: + + * The MapReduce version must be compatible with the job client code used to + submit and query jobs. If it is incompatible then the job client must be + upgraded separately on any node from which jobs using the new MapReduce + version will be submitted or queried. + + * The MapReduce version must be compatible with the configuration files used + by the job client submitting the jobs. If it is incompatible with that + configuration (e.g.: a new property must be set or an existing property + value changed) then the configuration must be updated first. + + * The MapReduce version must be compatible with the <<>> + version running on the nodes in the cluster. If it is incompatible then the + new <<>> code must be deployed to all the nodes in the + cluster, and the NodeManagers must be restarted to pick up the new + <<>> code. + +* Deploying a New MapReduce Version via the Distributed Cache + + Deploying a new MapReduce version consists of three steps: + + [[1]] Upload the MapReduce archive to a location that can be accessed by the + job submission client. Ideally the archive should be on the cluster's default + filesystem at a publicly-readable path. See the archive location discussion + below for more details. + + [[2]] Configure <<>> to point to the + location where the archive is located. As when specifying distributed cache + files for a job, this is a URL that also supports creating an alias for the + archive if a URL fragment is specified. For example, + <<>> will + be localized as <<>> rather than + <<>>. + + [[3]] Configure <<>> to set the proper + classpath to use with the MapReduce archive configured above. NOTE: An error + occurs if <<>> is configured but + <<>> does not reference the base name of the + archive path or the alias if an alias was specified. + +** Location of the MapReduce Archive and How It Affects Job Performance + + Note that the location of the MapReduce archive can be critical to job + submission and job startup performance. If the archive is not located on the + cluster's default filesystem then it will be copied to the job staging + directory for each job and localized to each node where the job's tasks + run. This will slow down job submission and task startup performance. + + If the archive is located on the default filesystem then the job client will + not upload the archive to the job staging directory for each job + submission. However if the archive path is not readable by all cluster users + then the archive will be localized separately for each user on each node + where tasks execute. This can cause unnecessary duplication in the + distributed cache. + + When working with a large cluster it can be important to increase the + replication factor of the archive to increase its availability. This will + spread the load when the nodes in the cluster localize the archive for the + first time. + +* MapReduce Archives and Classpath Configuration + + Setting a proper classpath for the MapReduce archive depends upon the + composition of the archive and whether it has any additional dependencies. + For example, the archive can contain not only the MapReduce jars but also the + necessary YARN, HDFS, and Hadoop Common jars and all other dependencies. In + that case, <<>> would be configured to + something like the following example, where the archive basename is + hadoop-mapreduce-2.1.1.tar.gz and the archive is organized internally similar + to the standard Hadoop distribution archive: + + <<<$HADOOP_CONF_DIR,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/mapreduce/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/mapreduce/lib/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/common/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/common/lib/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/yarn/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/yarn/lib/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/hdfs/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/hdfs/lib/*>>> + + Another possible approach is to have the archive consist of just the + MapReduce jars and have the remaining dependencies picked up from the Hadoop + distribution installed on the nodes. In that case, the above example would + change to something like the following: + + <<<$HADOOP_CONF_DIR,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/mapreduce/*,$PWD/hadoop-mapreduce-2.1.1.tar.gz/hadoop-mapreduce-2.1.1/share/hadoop/mapreduce/lib/*,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*>>> diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml index d4c07dc6163..22e13b14694 100644 --- a/hadoop-project/src/site/site.xml +++ b/hadoop-project/src/site/site.xml @@ -86,6 +86,7 @@ +