MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1348997 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2012-06-11 20:37:03 +00:00
parent 56f386b98d
commit 13ad756796
3 changed files with 74 additions and 24 deletions

View File

@ -129,6 +129,9 @@ Branch-2 ( Unreleased changes )
MAPREDUCE-4146. Support limits on task status string length and number of MAPREDUCE-4146. Support limits on task status string length and number of
block locations in branch-2. (Ahmed Radwan via tomwhite) block locations in branch-2. (Ahmed Radwan via tomwhite)
MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache.
(tomwhite)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.net.URLClassLoader; import java.net.URLClassLoader;
@ -45,6 +46,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRConfig;
@ -72,6 +74,8 @@ class LocalDistributedCacheManager {
private List<String> localFiles = new ArrayList<String>(); private List<String> localFiles = new ArrayList<String>();
private List<String> localClasspaths = new ArrayList<String>(); private List<String> localClasspaths = new ArrayList<String>();
private List<File> symlinksCreated = new ArrayList<File>();
private boolean setupCalled = false; private boolean setupCalled = false;
/** /**
@ -172,18 +176,51 @@ class LocalDistributedCacheManager {
.size()]))); .size()])));
} }
if (DistributedCache.getSymlink(conf)) { if (DistributedCache.getSymlink(conf)) {
// This is not supported largely because, File workDir = new File(System.getProperty("user.dir"));
// for a Child subprocess, the cwd in LocalJobRunner URI[] archives = DistributedCache.getCacheArchives(conf);
// is not a fresh slate, but rather the user's working directory. URI[] files = DistributedCache.getCacheFiles(conf);
// This is further complicated because the logic in Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
// setupWorkDir only creates symlinks if there's a jarfile Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
// in the configuration. if (archives != null) {
LOG.warn("LocalJobRunner does not support " + for (int i = 0; i < archives.length; i++) {
"symlinking into current working dir."); String link = archives[i].getFragment();
String target = new File(localArchives[i].toUri()).getPath();
symlink(workDir, target, link);
}
}
if (files != null) {
for (int i = 0; i < files.length; i++) {
String link = files[i].getFragment();
String target = new File(localFiles[i].toUri()).getPath();
symlink(workDir, target, link);
}
}
} }
setupCalled = true; setupCalled = true;
} }
/**
* Utility method for creating a symlink and warning on errors.
*
* If link is null, does nothing.
*/
private void symlink(File workDir, String target, String link)
throws IOException {
if (link != null) {
link = workDir.toString() + Path.SEPARATOR + link;
File flink = new File(link);
if (!flink.exists()) {
LOG.info(String.format("Creating symlink: %s <- %s", target, link));
if (0 != FileUtil.symLink(target, link)) {
LOG.warn(String.format("Failed to create symlink: %s <- %s", target,
link));
} else {
symlinksCreated.add(new File(link));
}
}
}
}
/** /**
* Are the resources that should be added to the classpath? * Are the resources that should be added to the classpath?
* Should be called after setup(). * Should be called after setup().
@ -217,6 +254,12 @@ class LocalDistributedCacheManager {
} }
public void close() throws IOException { public void close() throws IOException {
for (File symlink : symlinksCreated) {
if (!symlink.delete()) {
LOG.warn("Failed to delete symlink created by the local job runner: " +
symlink);
}
}
FileContext localFSFileContext = FileContext.getLocalFSFileContext(); FileContext localFSFileContext = FileContext.getLocalFSFileContext();
for (String archive : localArchives) { for (String archive : localArchives) {
localFSFileContext.delete(new Path(archive), true); localFSFileContext.delete(new Path(archive), true);

View File

@ -23,6 +23,7 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.jar.JarOutputStream; import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
@ -61,6 +62,9 @@ import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
public class TestMRWithDistributedCache extends TestCase { public class TestMRWithDistributedCache extends TestCase {
private static Path TEST_ROOT_DIR = private static Path TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp")); new Path(System.getProperty("test.build.data","/tmp"));
private static File symlinkFile = new File("distributed.first.symlink");
private static File expectedAbsentSymlinkFile =
new File("distributed.second.jar");
private static Configuration conf = new Configuration(); private static Configuration conf = new Configuration();
private static FileSystem localFs; private static FileSystem localFs;
static { static {
@ -107,20 +111,17 @@ public class TestMRWithDistributedCache extends TestCase {
TestCase.assertNotNull(cl.getResource("distributed.jar.inside3")); TestCase.assertNotNull(cl.getResource("distributed.jar.inside3"));
TestCase.assertNull(cl.getResource("distributed.jar.inside4")); TestCase.assertNull(cl.getResource("distributed.jar.inside4"));
// Check that the symlink for the renaming was created in the cwd; // Check that the symlink for the renaming was created in the cwd;
// This only happens for real for non-local jobtrackers. TestCase.assertTrue("symlink distributed.first.symlink doesn't exist",
// (The symlinks exist in "localRunner/" for local Jobtrackers, symlinkFile.exists());
// but the user has no way to get at them. TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1,
if (!"local".equals( symlinkFile.length());
context.getConfiguration().get(JTConfig.JT_IPC_ADDRESS))) {
File symlinkFile = new File("distributed.first.symlink"); TestCase.assertFalse("second file should not be symlinked",
TestCase.assertTrue("symlink distributed.first.symlink doesn't exist", symlinkFile.exists()); expectedAbsentSymlinkFile.exists());
TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1, symlinkFile.length());
}
} }
} }
private void testWithConf(Configuration conf) throws IOException, private void testWithConf(Configuration conf) throws IOException,
InterruptedException, ClassNotFoundException, URISyntaxException { InterruptedException, ClassNotFoundException, URISyntaxException {
// Create a temporary file of length 1. // Create a temporary file of length 1.
@ -144,11 +145,7 @@ public class TestMRWithDistributedCache extends TestCase {
job.addFileToClassPath(second); job.addFileToClassPath(second);
job.addArchiveToClassPath(third); job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri()); job.addCacheArchive(fourth.toUri());
job.createSymlink();
// don't create symlink for LocalJobRunner
if (!"local".equals(conf.get(JTConfig.JT_IPC_ADDRESS))) {
job.createSymlink();
}
job.setMaxMapAttempts(1); // speed up failures job.setMaxMapAttempts(1); // speed up failures
job.submit(); job.submit();
@ -157,10 +154,17 @@ public class TestMRWithDistributedCache extends TestCase {
/** Tests using the local job runner. */ /** Tests using the local job runner. */
public void testLocalJobRunner() throws Exception { public void testLocalJobRunner() throws Exception {
symlinkFile.delete(); // ensure symlink is not present (e.g. if test is
// killed part way through)
Configuration c = new Configuration(); Configuration c = new Configuration();
c.set(JTConfig.JT_IPC_ADDRESS, "local"); c.set(JTConfig.JT_IPC_ADDRESS, "local");
c.set("fs.defaultFS", "file:///"); c.set("fs.defaultFS", "file:///");
testWithConf(c); testWithConf(c);
assertFalse("Symlink not removed by local job runner",
// Symlink target will have gone so can't use File.exists()
Arrays.asList(new File(".").list()).contains(symlinkFile.getName()));
} }
private Path createTempFile(String filename, String contents) private Path createTempFile(String filename, String contents)