svn merge -c 1401054 FIXES: MAPREDUCE-4740. only .jars can be added to the Distributed Cache classpath. Contributed by Robert Joseph Evans
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1401058 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
35bd16ad62
commit
43159cc20d
@ -452,6 +452,9 @@ Release 0.23.5 - UNRELEASED
|
|||||||
MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many
|
MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many
|
||||||
reducers complete consecutively. (Jason Lowe via vinodkv)
|
reducers complete consecutively. (Jason Lowe via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-4740. only .jars can be added to the Distributed Cache
|
||||||
|
classpath. (Robert Joseph Evans via jlowe)
|
||||||
|
|
||||||
Release 0.23.4 - UNRELEASED
|
Release 0.23.4 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -191,6 +191,7 @@ private static void setMRFrameworkClasspath(
|
|||||||
// TODO: Remove duplicates.
|
// TODO: Remove duplicates.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
public static void setClasspath(Map<String, String> environment,
|
public static void setClasspath(Map<String, String> environment,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
boolean userClassesTakesPrecedence =
|
boolean userClassesTakesPrecedence =
|
||||||
@ -218,11 +219,66 @@ public static void setClasspath(Map<String, String> environment,
|
|||||||
environment,
|
environment,
|
||||||
Environment.CLASSPATH.name(),
|
Environment.CLASSPATH.name(),
|
||||||
Environment.PWD.$() + Path.SEPARATOR + "*");
|
Environment.PWD.$() + Path.SEPARATOR + "*");
|
||||||
|
// a * in the classpath will only find a .jar, so we need to filter out
|
||||||
|
// all .jars and add everything else
|
||||||
|
addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf),
|
||||||
|
DistributedCache.getCacheFiles(conf),
|
||||||
|
conf,
|
||||||
|
environment);
|
||||||
|
addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf),
|
||||||
|
DistributedCache.getCacheArchives(conf),
|
||||||
|
conf,
|
||||||
|
environment);
|
||||||
if (userClassesTakesPrecedence) {
|
if (userClassesTakesPrecedence) {
|
||||||
MRApps.setMRFrameworkClasspath(environment, conf);
|
MRApps.setMRFrameworkClasspath(environment, conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the paths to the classpath if they are not jars
|
||||||
|
* @param paths the paths to add to the classpath
|
||||||
|
* @param withLinks the corresponding paths that may have a link name in them
|
||||||
|
* @param conf used to resolve the paths
|
||||||
|
* @param environment the environment to update CLASSPATH in
|
||||||
|
* @throws IOException if there is an error resolving any of the paths.
|
||||||
|
*/
|
||||||
|
private static void addToClasspathIfNotJar(Path[] paths,
|
||||||
|
URI[] withLinks, Configuration conf,
|
||||||
|
Map<String, String> environment) throws IOException {
|
||||||
|
if (paths != null) {
|
||||||
|
HashMap<Path, String> linkLookup = new HashMap<Path, String>();
|
||||||
|
if (withLinks != null) {
|
||||||
|
for (URI u: withLinks) {
|
||||||
|
Path p = new Path(u);
|
||||||
|
FileSystem remoteFS = p.getFileSystem(conf);
|
||||||
|
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
|
||||||
|
remoteFS.getWorkingDirectory()));
|
||||||
|
String name = (null == u.getFragment())
|
||||||
|
? p.getName() : u.getFragment();
|
||||||
|
if (!name.toLowerCase().endsWith(".jar")) {
|
||||||
|
linkLookup.put(p, name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Path p : paths) {
|
||||||
|
FileSystem remoteFS = p.getFileSystem(conf);
|
||||||
|
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
|
||||||
|
remoteFS.getWorkingDirectory()));
|
||||||
|
String name = linkLookup.get(p);
|
||||||
|
if (name == null) {
|
||||||
|
name = p.getName();
|
||||||
|
}
|
||||||
|
if(!name.toLowerCase().endsWith(".jar")) {
|
||||||
|
Apps.addToEnvironment(
|
||||||
|
environment,
|
||||||
|
Environment.CLASSPATH.name(),
|
||||||
|
Environment.PWD.$() + Path.SEPARATOR + name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static final String STAGING_CONSTANT = ".staging";
|
private static final String STAGING_CONSTANT = ".staging";
|
||||||
public static Path getStagingAreaDir(Configuration conf, String user) {
|
public static Path getStagingAreaDir(Configuration conf, String user) {
|
||||||
return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR,
|
return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR,
|
||||||
@ -261,8 +317,7 @@ public static void setupDistributedCache(
|
|||||||
DistributedCache.getCacheArchives(conf),
|
DistributedCache.getCacheArchives(conf),
|
||||||
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
|
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
|
||||||
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
|
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
|
||||||
DistributedCache.getArchiveVisibilities(conf),
|
DistributedCache.getArchiveVisibilities(conf));
|
||||||
DistributedCache.getArchiveClassPaths(conf));
|
|
||||||
|
|
||||||
// Cache files
|
// Cache files
|
||||||
parseDistributedCacheArtifacts(conf,
|
parseDistributedCacheArtifacts(conf,
|
||||||
@ -271,8 +326,7 @@ public static void setupDistributedCache(
|
|||||||
DistributedCache.getCacheFiles(conf),
|
DistributedCache.getCacheFiles(conf),
|
||||||
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
|
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
|
||||||
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
|
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
|
||||||
DistributedCache.getFileVisibilities(conf),
|
DistributedCache.getFileVisibilities(conf));
|
||||||
DistributedCache.getFileClassPaths(conf));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getResourceDescription(LocalResourceType type) {
|
private static String getResourceDescription(LocalResourceType type) {
|
||||||
@ -289,8 +343,8 @@ private static void parseDistributedCacheArtifacts(
|
|||||||
Configuration conf,
|
Configuration conf,
|
||||||
Map<String, LocalResource> localResources,
|
Map<String, LocalResource> localResources,
|
||||||
LocalResourceType type,
|
LocalResourceType type,
|
||||||
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
|
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[])
|
||||||
Path[] pathsToPutOnClasspath) throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
if (uris != null) {
|
if (uris != null) {
|
||||||
// Sanity check
|
// Sanity check
|
||||||
@ -304,15 +358,6 @@ private static void parseDistributedCacheArtifacts(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, Path> classPaths = new HashMap<String, Path>();
|
|
||||||
if (pathsToPutOnClasspath != null) {
|
|
||||||
for (Path p : pathsToPutOnClasspath) {
|
|
||||||
FileSystem remoteFS = p.getFileSystem(conf);
|
|
||||||
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
|
|
||||||
remoteFS.getWorkingDirectory()));
|
|
||||||
classPaths.put(p.toUri().getPath().toString(), p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (int i = 0; i < uris.length; ++i) {
|
for (int i = 0; i < uris.length; ++i) {
|
||||||
URI u = uris[i];
|
URI u = uris[i];
|
||||||
Path p = new Path(u);
|
Path p = new Path(u);
|
||||||
|
@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.util;
|
package org.apache.hadoop.mapreduce.v2.util;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@ -42,12 +44,36 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
public class TestMRApps {
|
public class TestMRApps {
|
||||||
|
private static File testWorkDir = null;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupTestDirs() throws IOException {
|
||||||
|
testWorkDir = new File("target", TestMRApps.class.getCanonicalName());
|
||||||
|
delete(testWorkDir);
|
||||||
|
testWorkDir.mkdirs();
|
||||||
|
testWorkDir = testWorkDir.getAbsoluteFile();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void cleanupTestDirs() throws IOException {
|
||||||
|
if (testWorkDir != null) {
|
||||||
|
delete(testWorkDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void delete(File dir) throws IOException {
|
||||||
|
Path p = new Path("file://"+dir.getAbsolutePath());
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileSystem fs = p.getFileSystem(conf);
|
||||||
|
fs.delete(p, true);
|
||||||
|
}
|
||||||
|
|
||||||
@Test public void testJobIDtoString() {
|
@Test public void testJobIDtoString() {
|
||||||
JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
|
JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
|
||||||
@ -154,6 +180,28 @@ public class TestMRApps {
|
|||||||
}
|
}
|
||||||
assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
|
assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test public void testSetClasspathWithArchives () throws IOException {
|
||||||
|
File testTGZ = new File(testWorkDir, "test.tgz");
|
||||||
|
FileOutputStream out = new FileOutputStream(testTGZ);
|
||||||
|
out.write(0);
|
||||||
|
out.close();
|
||||||
|
Job job = Job.getInstance();
|
||||||
|
Configuration conf = job.getConfiguration();
|
||||||
|
conf.set(MRJobConfig.CLASSPATH_ARCHIVES, "file://"
|
||||||
|
+ testTGZ.getAbsolutePath());
|
||||||
|
conf.set(MRJobConfig.CACHE_ARCHIVES, "file://"
|
||||||
|
+ testTGZ.getAbsolutePath() + "#testTGZ");
|
||||||
|
Map<String, String> environment = new HashMap<String, String>();
|
||||||
|
MRApps.setClasspath(environment, conf);
|
||||||
|
assertTrue(environment.get("CLASSPATH").startsWith("$PWD:"));
|
||||||
|
String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
|
||||||
|
if (confClasspath != null) {
|
||||||
|
confClasspath = confClasspath.replaceAll(",\\s*", ":").trim();
|
||||||
|
}
|
||||||
|
assertTrue(environment.get("CLASSPATH").contains(confClasspath));
|
||||||
|
assertTrue(environment.get("CLASSPATH").contains("testTGZ"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test public void testSetClasspathWithUserPrecendence() {
|
@Test public void testSetClasspathWithUserPrecendence() {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user