From f323ea705d0efa8bb8d5a9dc016e11132610394d Mon Sep 17 00:00:00 2001 From: Thomas White Date: Mon, 11 Jun 2012 20:39:13 +0000 Subject: [PATCH] Merge -r 1348996:1348997 from trunk to branch-2. Fixes: MAPREDUCE-3871. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1348999 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapred/LocalDistributedCacheManager.java | 59 ++++++++++++++++--- .../mapred/TestMRWithDistributedCache.java | 36 ++++++----- 3 files changed, 74 insertions(+), 24 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8b8e7e5a407..0984a92ddcf 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -11,6 +11,9 @@ Release 2.0.1-alpha - UNRELEASED MAPREDUCE-4146. Support limits on task status string length and number of block locations in branch-2. (Ahmed Radwan via tomwhite) + MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache. + (tomwhite) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java index 2fec8147e3b..670959b8c4f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import java.io.File; import java.io.IOException; import java.net.MalformedURLException; +import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.net.URLClassLoader; @@ -45,6 +46,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRConfig; @@ -72,6 +74,8 @@ class LocalDistributedCacheManager { private List localFiles = new ArrayList(); private List localClasspaths = new ArrayList(); + private List symlinksCreated = new ArrayList(); + private boolean setupCalled = false; /** @@ -172,18 +176,51 @@ class LocalDistributedCacheManager { .size()]))); } if (DistributedCache.getSymlink(conf)) { - // This is not supported largely because, - // for a Child subprocess, the cwd in LocalJobRunner - // is not a fresh slate, but rather the user's working directory. - // This is further complicated because the logic in - // setupWorkDir only creates symlinks if there's a jarfile - // in the configuration. - LOG.warn("LocalJobRunner does not support " + - "symlinking into current working dir."); + File workDir = new File(System.getProperty("user.dir")); + URI[] archives = DistributedCache.getCacheArchives(conf); + URI[] files = DistributedCache.getCacheFiles(conf); + Path[] localArchives = DistributedCache.getLocalCacheArchives(conf); + Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); + if (archives != null) { + for (int i = 0; i < archives.length; i++) { + String link = archives[i].getFragment(); + String target = new File(localArchives[i].toUri()).getPath(); + symlink(workDir, target, link); + } + } + if (files != null) { + for (int i = 0; i < files.length; i++) { + String link = files[i].getFragment(); + String target = new File(localFiles[i].toUri()).getPath(); + symlink(workDir, target, link); + } + } } setupCalled = true; } + /** + * Utility method for creating a symlink and warning on errors. + * + * If link is null, does nothing. + */ + private void symlink(File workDir, String target, String link) + throws IOException { + if (link != null) { + link = workDir.toString() + Path.SEPARATOR + link; + File flink = new File(link); + if (!flink.exists()) { + LOG.info(String.format("Creating symlink: %s <- %s", target, link)); + if (0 != FileUtil.symLink(target, link)) { + LOG.warn(String.format("Failed to create symlink: %s <- %s", target, + link)); + } else { + symlinksCreated.add(new File(link)); + } + } + } + } + /** * Are the resources that should be added to the classpath? * Should be called after setup(). @@ -217,6 +254,12 @@ class LocalDistributedCacheManager { } public void close() throws IOException { + for (File symlink : symlinksCreated) { + if (!symlink.delete()) { + LOG.warn("Failed to delete symlink created by the local job runner: " + + symlink); + } + } FileContext localFSFileContext = FileContext.getLocalFSFileContext(); for (String archive : localArchives) { localFSFileContext.delete(new Path(archive), true); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java index 6798831ba73..43368078214 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java @@ -23,6 +23,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Arrays; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; @@ -61,6 +62,9 @@ import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; public class TestMRWithDistributedCache extends TestCase { private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp")); + private static File symlinkFile = new File("distributed.first.symlink"); + private static File expectedAbsentSymlinkFile = + new File("distributed.second.jar"); private static Configuration conf = new Configuration(); private static FileSystem localFs; static { @@ -107,20 +111,17 @@ public class TestMRWithDistributedCache extends TestCase { TestCase.assertNotNull(cl.getResource("distributed.jar.inside3")); TestCase.assertNull(cl.getResource("distributed.jar.inside4")); - // Check that the symlink for the renaming was created in the cwd; - // This only happens for real for non-local jobtrackers. - // (The symlinks exist in "localRunner/" for local Jobtrackers, - // but the user has no way to get at them. - if (!"local".equals( - context.getConfiguration().get(JTConfig.JT_IPC_ADDRESS))) { - File symlinkFile = new File("distributed.first.symlink"); - TestCase.assertTrue("symlink distributed.first.symlink doesn't exist", symlinkFile.exists()); - TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1, symlinkFile.length()); - } + TestCase.assertTrue("symlink distributed.first.symlink doesn't exist", + symlinkFile.exists()); + TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1, + symlinkFile.length()); + + TestCase.assertFalse("second file should not be symlinked", + expectedAbsentSymlinkFile.exists()); } } - + private void testWithConf(Configuration conf) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException { // Create a temporary file of length 1. @@ -144,11 +145,7 @@ public class TestMRWithDistributedCache extends TestCase { job.addFileToClassPath(second); job.addArchiveToClassPath(third); job.addCacheArchive(fourth.toUri()); - - // don't create symlink for LocalJobRunner - if (!"local".equals(conf.get(JTConfig.JT_IPC_ADDRESS))) { - job.createSymlink(); - } + job.createSymlink(); job.setMaxMapAttempts(1); // speed up failures job.submit(); @@ -157,10 +154,17 @@ public class TestMRWithDistributedCache extends TestCase { /** Tests using the local job runner. */ public void testLocalJobRunner() throws Exception { + symlinkFile.delete(); // ensure symlink is not present (e.g. if test is + // killed part way through) + Configuration c = new Configuration(); c.set(JTConfig.JT_IPC_ADDRESS, "local"); c.set("fs.defaultFS", "file:///"); testWithConf(c); + + assertFalse("Symlink not removed by local job runner", + // Symlink target will have gone so can't use File.exists() + Arrays.asList(new File(".").list()).contains(symlinkFile.getName())); } private Path createTempFile(String filename, String contents)