merge -r 1367712:1367713 from trunk. FIXES: MAPREDUCE-4493

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1367715 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-07-31 19:21:27 +00:00
parent 30ba954741
commit 5a17a40ee6
20 changed files with 63 additions and 80 deletions

View File

@ -198,7 +198,9 @@ Deprecated Properties
*---+---+ *---+---+
|mapred.compress.map.output | mapreduce.map.output.compress |mapred.compress.map.output | mapreduce.map.output.compress
*---+---+ *---+---+
|mapred.create.symlink | mapreduce.job.cache.symlink.create |mapred.create.symlink | NONE - symlinking is always on
*---+---+
|mapreduce.job.cache.symlink.create | NONE - symlinking is always on
*---+---+ *---+---+
|mapred.data.field.separator | mapreduce.fieldsel.data.field.separator |mapred.data.field.separator | mapreduce.fieldsel.data.field.separator
*---+---+ *---+---+

View File

@ -647,6 +647,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4496. AM logs link is missing user name (Jason Lowe via bobby) MAPREDUCE-4496. AM logs link is missing user name (Jason Lowe via bobby)
MAPREDUCE-4493. Distibuted Cache Compatability Issues (Robert Evans
via tgraves)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -84,7 +84,6 @@ class LocalDistributedCacheManager {
* @throws IOException * @throws IOException
*/ */
public void setup(JobConf conf) throws IOException { public void setup(JobConf conf) throws IOException {
boolean mkLinks = DistributedCache.getSymlink(conf);
File workDir = new File(System.getProperty("user.dir")); File workDir = new File(System.getProperty("user.dir"));
// Generate YARN local resources objects corresponding to the distributed // Generate YARN local resources objects corresponding to the distributed
@ -145,11 +144,9 @@ class LocalDistributedCacheManager {
throw new IOException(e); throw new IOException(e);
} }
String pathString = path.toUri().toString(); String pathString = path.toUri().toString();
if(mkLinks) { String link = entry.getKey();
String link = entry.getKey(); String target = new File(path.toUri()).getPath();
String target = new File(path.toUri()).getPath(); symlink(workDir, target, link);
symlink(workDir, target, link);
}
if (resource.getType() == LocalResourceType.ARCHIVE) { if (resource.getType() == LocalResourceType.ARCHIVE) {
localArchives.add(pathString); localArchives.add(pathString);

View File

@ -150,7 +150,6 @@ public class TestLocalDistributedCacheManager {
conf.set(MRJobConfig.CACHE_FILES_SIZES, "201"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "201");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false");
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
try { try {
manager.setup(conf); manager.setup(conf);
@ -197,7 +196,6 @@ public class TestLocalDistributedCacheManager {
conf.set(MRJobConfig.CACHE_FILES, ""); conf.set(MRJobConfig.CACHE_FILES, "");
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
try { try {
manager.setup(conf); manager.setup(conf);
@ -268,7 +266,6 @@ public class TestLocalDistributedCacheManager {
conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201");
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false");
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
conf.set(MRJobConfig.CACHE_SYMLINK, "yes");
LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
try { try {
manager.setup(conf); manager.setup(conf);

View File

@ -146,7 +146,6 @@ public class TestMRWithDistributedCache extends TestCase {
job.addFileToClassPath(second); job.addFileToClassPath(second);
job.addArchiveToClassPath(third); job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri()); job.addCacheArchive(fourth.toUri());
job.createSymlink();
job.setMaxMapAttempts(1); // speed up failures job.setMaxMapAttempts(1); // speed up failures
job.submit(); job.submit();

View File

@ -48,8 +48,12 @@ import org.apache.hadoop.mapreduce.Job;
* Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
* Jars may be optionally added to the classpath of the tasks, a rudimentary * Jars may be optionally added to the classpath of the tasks, a rudimentary
* software distribution mechanism. Files have execution permissions. * software distribution mechanism. Files have execution permissions.
* Optionally users can also direct it to symlink the distributed cache file(s) * In older version of Hadoop Map/Reduce users could optionally ask for symlinks
* into the working directory of the task.</p> * to be created in the working directory of the child task. In the current
* version symlinks are always created. If the URL does not have a fragment
* the name of the file or directory will be used. If multiple files or
* directories map to the same link name, the last one added, will be used. All
* others will not even be downloaded.</p>
* *
* <p><code>DistributedCache</code> tracks modification timestamps of the cache * <p><code>DistributedCache</code> tracks modification timestamps of the cache
* files. Clearly the cache files should not be modified by the application * files. Clearly the cache files should not be modified by the application
@ -91,8 +95,7 @@ import org.apache.hadoop.mapreduce.Job;
* *
* public void configure(JobConf job) { * public void configure(JobConf job) {
* // Get the cached archives/files * // Get the cached archives/files
* localArchives = DistributedCache.getLocalCacheArchives(job); * File f = new File("./map.zip/some/file/in/zip.txt");
* localFiles = DistributedCache.getLocalCacheFiles(job);
* } * }
* *
* public void map(K key, V value, * public void map(K key, V value,

View File

@ -313,7 +313,6 @@ public class Submitter extends Configured implements Tool {
// add default debug script only when executable is expressed as // add default debug script only when executable is expressed as
// <path>#<executable> // <path>#<executable>
if (exec.contains("#")) { if (exec.contains("#")) {
DistributedCache.createSymlink(conf);
// set default gdb commands for map and reduce task // set default gdb commands for map and reduce task
String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script"; String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript); setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);

View File

@ -1049,9 +1049,10 @@ public class Job extends JobContextImpl implements JobContext {
} }
/** /**
* This method allows you to create symlinks in the current working directory * Originally intended to enable symlinks, but currently symlinks cannot be
* of the task to all the cache files/archives * disabled.
*/ */
@Deprecated
public void createSymlink() { public void createSymlink() {
ensureState(JobState.DEFINE); ensureState(JobState.DEFINE);
DistributedCache.createSymlink(conf); DistributedCache.createSymlink(conf);

View File

@ -221,10 +221,11 @@ public interface JobContext extends MRJobConfig {
public String getUser(); public String getUser();
/** /**
* This method checks to see if symlinks are to be create for the * Originally intended to check if symlinks should be used, but currently
* localized cache files in the current working directory * symlinks cannot be disabled.
* @return true if symlinks are to be created- else return false * @return true
*/ */
@Deprecated
public boolean getSymlink(); public boolean getSymlink();
/** /**
@ -251,14 +252,22 @@ public interface JobContext extends MRJobConfig {
* Return the path array of the localized caches * Return the path array of the localized caches
* @return A path array of localized caches * @return A path array of localized caches
* @throws IOException * @throws IOException
* @deprecated the array returned only includes the items the were
* downloaded. There is no way to map this to what is returned by
* {@link #getCacheArchives()}.
*/ */
@Deprecated
public Path[] getLocalCacheArchives() throws IOException; public Path[] getLocalCacheArchives() throws IOException;
/** /**
* Return the path array of the localized files * Return the path array of the localized files
* @return A path array of localized files * @return A path array of localized files
* @throws IOException * @throws IOException
* @deprecated the array returned only includes the items the were
* downloaded. There is no way to map this to what is returned by
* {@link #getCacheFiles()}.
*/ */
@Deprecated
public Path[] getLocalCacheFiles() throws IOException; public Path[] getLocalCacheFiles() throws IOException;
/** /**

View File

@ -190,7 +190,6 @@ class JobSubmitter {
//should not throw a uri exception //should not throw a uri exception
throw new IOException("Failed to create uri for " + tmpFile, ue); throw new IOException("Failed to create uri for " + tmpFile, ue);
} }
DistributedCache.createSymlink(conf);
} }
} }
@ -225,7 +224,6 @@ class JobSubmitter {
//should not throw an uri excpetion //should not throw an uri excpetion
throw new IOException("Failed to create uri for " + tmpArchives, ue); throw new IOException("Failed to create uri for " + tmpArchives, ue);
} }
DistributedCache.createSymlink(conf);
} }
} }

View File

@ -114,6 +114,10 @@ public interface MRJobConfig {
public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities"; public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
/**
* @deprecated Symlinks are always on and cannot be disabled.
*/
@Deprecated
public static final String CACHE_SYMLINK = "mapreduce.job.cache.symlink.create"; public static final String CACHE_SYMLINK = "mapreduce.job.cache.symlink.create";
public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours"; public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours";

View File

@ -55,8 +55,12 @@ import java.net.URI;
* Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
* Jars may be optionally added to the classpath of the tasks, a rudimentary * Jars may be optionally added to the classpath of the tasks, a rudimentary
* software distribution mechanism. Files have execution permissions. * software distribution mechanism. Files have execution permissions.
* Optionally users can also direct it to symlink the distributed cache file(s) * In older version of Hadoop Map/Reduce users could optionally ask for symlinks
* into the working directory of the task.</p> * to be created in the working directory of the child task. In the current
* version symlinks are always created. If the URL does not have a fragment
* the name of the file or directory will be used. If multiple files or
* directories map to the same link name, the last one added, will be used. All
* others will not even be downloaded.</p>
* *
* <p><code>DistributedCache</code> tracks modification timestamps of the cache * <p><code>DistributedCache</code> tracks modification timestamps of the cache
* files. Clearly the cache files should not be modified by the application * files. Clearly the cache files should not be modified by the application
@ -98,8 +102,7 @@ import java.net.URI;
* *
* public void configure(JobConf job) { * public void configure(JobConf job) {
* // Get the cached archives/files * // Get the cached archives/files
* localArchives = DistributedCache.getLocalCacheArchives(job); * File f = new File("./map.zip/some/file/in/zip.txt");
* localFiles = DistributedCache.getLocalCacheFiles(job);
* } * }
* *
* public void map(K key, V value, * public void map(K key, V value,
@ -375,32 +378,26 @@ public class DistributedCache {
} }
/** /**
* This method allows you to create symlinks in the current working directory * Originally intended to enable symlinks, but currently symlinks cannot be
* of the task to all the cache files/archives. * disabled. This is a NO-OP.
* Intended to be used by user code.
* @param conf the jobconf * @param conf the jobconf
* @deprecated Use {@link Job#createSymlink()} instead * @deprecated This is a NO-OP.
*/ */
@Deprecated @Deprecated
public static void createSymlink(Configuration conf){ public static void createSymlink(Configuration conf){
conf.set(MRJobConfig.CACHE_SYMLINK, "yes"); //NOOP
} }
/** /**
* This method checks to see if symlinks are to be create for the * Originally intended to check if symlinks should be used, but currently
* localized cache files in the current working directory * symlinks cannot be disabled.
* Used by internal DistributedCache code.
* @param conf the jobconf * @param conf the jobconf
* @return true if symlinks are to be created- else return false * @return true
* @deprecated Use {@link JobContext#getSymlink()} instead * @deprecated symlinks are always created.
*/ */
@Deprecated @Deprecated
public static boolean getSymlink(Configuration conf){ public static boolean getSymlink(Configuration conf){
String result = conf.get(MRJobConfig.CACHE_SYMLINK); return true;
if ("yes".equals(result)){
return true;
}
return false;
} }
private static boolean[] parseBooleans(String[] strs) { private static boolean[] parseBooleans(String[] strs) {

View File

@ -246,8 +246,6 @@ public class ConfigUtil {
new String[] {MRJobConfig.CACHE_FILE_TIMESTAMPS}); new String[] {MRJobConfig.CACHE_FILE_TIMESTAMPS});
Configuration.addDeprecation("mapred.cache.archives.timestamps", Configuration.addDeprecation("mapred.cache.archives.timestamps",
new String[] {MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS}); new String[] {MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS});
Configuration.addDeprecation("mapred.create.symlink",
new String[] {MRJobConfig.CACHE_SYMLINK});
Configuration.addDeprecation("mapred.working.dir", Configuration.addDeprecation("mapred.working.dir",
new String[] {MRJobConfig.WORKING_DIR}); new String[] {MRJobConfig.WORKING_DIR});
Configuration.addDeprecation("user.name", Configuration.addDeprecation("user.name",

View File

@ -210,19 +210,10 @@ public class MRCaching {
fs.copyFromLocalFile(tarPath1, cachePath); fs.copyFromLocalFile(tarPath1, cachePath);
fs.copyFromLocalFile(tarPath2, cachePath); fs.copyFromLocalFile(tarPath2, cachePath);
} }
public static TestResult launchMRCache(String indir,
String outdir, String cacheDir,
JobConf conf, String input)
throws IOException {
setupCache(cacheDir, FileSystem.get(conf));
return launchMRCache(indir,outdir, cacheDir, conf, input, false);
}
public static TestResult launchMRCache(String indir, public static TestResult launchMRCache(String indir,
String outdir, String cacheDir, String outdir, String cacheDir,
JobConf conf, String input, JobConf conf, String input)
boolean withSymlink)
throws IOException { throws IOException {
String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp")) String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp"))
.toString().replace(' ', '+'); .toString().replace(' ', '+');
@ -256,24 +247,13 @@ public class MRCaching {
conf.setNumReduceTasks(1); conf.setNumReduceTasks(1);
conf.setSpeculativeExecution(false); conf.setSpeculativeExecution(false);
URI[] uris = new URI[6]; URI[] uris = new URI[6];
if (!withSymlink) { conf.setMapperClass(MRCaching.MapClass2.class);
conf.setMapperClass(MRCaching.MapClass.class); uris[0] = fs.getUri().resolve(cacheDir + "/test.txt");
uris[0] = fs.getUri().resolve(cacheDir + "/test.txt"); uris[1] = fs.getUri().resolve(cacheDir + "/test.jar");
uris[1] = fs.getUri().resolve(cacheDir + "/test.jar"); uris[2] = fs.getUri().resolve(cacheDir + "/test.zip");
uris[2] = fs.getUri().resolve(cacheDir + "/test.zip"); uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz"); uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz"); uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
} else {
DistributedCache.createSymlink(conf);
conf.setMapperClass(MRCaching.MapClass2.class);
uris[0] = fs.getUri().resolve(cacheDir + "/test.txt#" + "test.txt");
uris[1] = fs.getUri().resolve(cacheDir + "/test.jar#" + "testjar");
uris[2] = fs.getUri().resolve(cacheDir + "/test.zip#" + "testzip");
uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz#" + "testtgz");
uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz#" + "testtargz");
uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar");
}
DistributedCache.addCacheFile(uris[0], conf); DistributedCache.addCacheFile(uris[0], conf);
// Save expected file sizes // Save expected file sizes

View File

@ -48,7 +48,7 @@ public class TestMiniMRDFSCaching extends TestCase {
"/cachedir", "/cachedir",
mr.createJobConf(), mr.createJobConf(),
"The quick brown fox\nhas many silly\n" "The quick brown fox\nhas many silly\n"
+ "red fox sox\n", false); + "red fox sox\n");
assertTrue("Archives not matching", ret.isOutputOk); assertTrue("Archives not matching", ret.isOutputOk);
// launch MR cache with symlinks // launch MR cache with symlinks
ret = MRCaching.launchMRCache("/testing/wc/input", ret = MRCaching.launchMRCache("/testing/wc/input",
@ -56,7 +56,7 @@ public class TestMiniMRDFSCaching extends TestCase {
"/cachedir", "/cachedir",
mr.createJobConf(), mr.createJobConf(),
"The quick brown fox\nhas many silly\n" "The quick brown fox\nhas many silly\n"
+ "red fox sox\n", true); + "red fox sox\n");
assertTrue("Archives not matching", ret.isOutputOk); assertTrue("Archives not matching", ret.isOutputOk);
} finally { } finally {
if (fileSys != null) { if (fileSys != null) {

View File

@ -211,6 +211,7 @@ public class TestMRJobs {
Path outputDir = Path outputDir =
new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "random-output"); new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "random-output");
FileOutputFormat.setOutputPath(job, outputDir); FileOutputFormat.setOutputPath(job, outputDir);
job.setSpeculativeExecution(false);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(RandomTextWriterJob.class); job.setJarByClass(RandomTextWriterJob.class);
job.setMaxMapAttempts(1); // speed up failures job.setMaxMapAttempts(1); // speed up failures
@ -462,7 +463,6 @@ public class TestMRJobs {
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.addArchiveToClassPath(third); job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri()); job.addCacheArchive(fourth.toUri());
job.createSymlink();
job.setMaxMapAttempts(1); // speed up failures job.setMaxMapAttempts(1); // speed up failures
job.submit(); job.submit();

View File

@ -301,7 +301,6 @@ public class TestSpeculativeExecution {
// Creates the Job Configuration // Creates the Job Configuration
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.createSymlink();
job.setMaxMapAttempts(2); job.setMaxMapAttempts(2);
job.submit(); job.submit();

View File

@ -167,7 +167,6 @@ public class Sort<K,V> extends Configured implements Tool {
URI partitionUri = new URI(partitionFile.toString() + URI partitionUri = new URI(partitionFile.toString() +
"#" + "_sortPartitioning"); "#" + "_sortPartitioning");
DistributedCache.addCacheFile(partitionUri, conf); DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf);
} }
System.out.println("Running on " + System.out.println("Running on " +

View File

@ -305,8 +305,7 @@ public class TeraSort extends Configured implements Tool {
LOG.error(e.getMessage()); LOG.error(e.getMessage());
return -1; return -1;
} }
job.addCacheFile(partitionUri); job.addCacheFile(partitionUri);
job.createSymlink();
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
System.out.println("Spent " + (end - start) + "ms computing partitions."); System.out.println("Spent " + (end - start) + "ms computing partitions.");
job.setPartitionerClass(TotalOrderPartitioner.class); job.setPartitionerClass(TotalOrderPartitioner.class);

View File

@ -958,7 +958,6 @@ public class StreamJob implements Tool {
if (!b) if (!b)
fail(LINK_URI); fail(LINK_URI);
} }
DistributedCache.createSymlink(jobConf_);
// set the jobconf for the caching parameters // set the jobconf for the caching parameters
if (cacheArchives != null) if (cacheArchives != null)
DistributedCache.setCacheArchives(archiveURIs, jobConf_); DistributedCache.setCacheArchives(archiveURIs, jobConf_);