From a1a7ee23f7aa8cbf83074e50a4b1474de04dcafc Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 31 Jul 2012 14:49:07 +0000 Subject: [PATCH] merge -r 1367580:1367581 from trunk. FIXES: MAPREDUCE-4456 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1367585 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapred/LocalDistributedCacheManager.java | 38 +-- .../TestLocalDistributedCacheManager.java | 281 ++++++++++++++++++ .../mapred/TestMRWithDistributedCache.java | 3 +- 4 files changed, 299 insertions(+), 26 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d1f64067be5..5000ea1783f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -642,6 +642,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans via tgraves) + MAPREDUCE-4456. LocalDistributedCacheManager can get an + ArrayIndexOutOfBounds when creating symlinks (Robert Evans via tgraves) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java index 670959b8c4f..85cef6e2a6c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java @@ -18,12 +18,9 @@ package org.apache.hadoop.mapred; -import com.google.common.collect.Maps; - import java.io.File; import java.io.IOException; import java.net.MalformedURLException; -import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.net.URLClassLoader; @@ -34,6 +31,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -60,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.FSDownload; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -85,6 +84,9 @@ class LocalDistributedCacheManager { * @throws IOException */ public void setup(JobConf conf) throws IOException { + boolean mkLinks = DistributedCache.getSymlink(conf); + File workDir = new File(System.getProperty("user.dir")); + // Generate YARN local resources objects corresponding to the distributed // cache configuration Map localResources = @@ -132,7 +134,8 @@ class LocalDistributedCacheManager { Future future = exec.submit(download); resourcesToPaths.put(resource, future); } - for (LocalResource resource : localResources.values()) { + for (Entry entry : localResources.entrySet()) { + LocalResource resource = entry.getValue(); Path path; try { path = resourcesToPaths.get(resource).get(); @@ -142,6 +145,12 @@ class LocalDistributedCacheManager { throw new IOException(e); } String pathString = path.toUri().toString(); + if(mkLinks) { + String link = entry.getKey(); + String target = new File(path.toUri()).getPath(); + symlink(workDir, target, link); + } + if (resource.getType() == LocalResourceType.ARCHIVE) { localArchives.add(pathString); } else if (resource.getType() == LocalResourceType.FILE) { @@ -175,27 +184,6 @@ class LocalDistributedCacheManager { .arrayToString(localFiles.toArray(new String[localArchives .size()]))); } - if (DistributedCache.getSymlink(conf)) { - File workDir = new File(System.getProperty("user.dir")); - URI[] archives = DistributedCache.getCacheArchives(conf); - URI[] files = DistributedCache.getCacheFiles(conf); - Path[] localArchives = DistributedCache.getLocalCacheArchives(conf); - Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); - if (archives != null) { - for (int i = 0; i < archives.length; i++) { - String link = archives[i].getFragment(); - String target = new File(localArchives[i].toUri()).getPath(); - symlink(workDir, target, link); - } - } - if (files != null) { - for (int i = 0; i < files.length; i++) { - String link = files[i].getFragment(); - String target = new File(localFiles[i].toUri()).getPath(); - symlink(workDir, target, link); - } - } - } setupCalled = true; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java new file mode 100644 index 00000000000..368ac2bceda --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +@SuppressWarnings("deprecation") +public class TestLocalDistributedCacheManager { + + private static FileSystem mockfs; + + public static class MockFileSystem extends FilterFileSystem { + public MockFileSystem() { + super(mockfs); + } + } + + private File localDir; + + private static void delete(File file) throws IOException { + if (file.getAbsolutePath().length() < 5) { + throw new IllegalArgumentException( + "Path [" + file + "] is too short, not deleting"); + } + if (file.exists()) { + if (file.isDirectory()) { + File[] children = file.listFiles(); + if (children != null) { + for (File child : children) { + delete(child); + } + } + } + if (!file.delete()) { + throw new RuntimeException( + "Could not delete path [" + file + "]"); + } + } + } + + @Before + public void setup() throws Exception { + mockfs = mock(FileSystem.class); + localDir = new File(System.getProperty("test.build.dir", "target/test-dir"), + TestLocalDistributedCacheManager.class.getName()); + delete(localDir); + localDir.mkdirs(); + } + + @After + public void cleanup() throws Exception { + delete(localDir); + } + + @SuppressWarnings("rawtypes") + @Test + public void testDownload() throws Exception { + JobConf conf = new JobConf(); + conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); + + URI mockBase = new URI("mock://test-nn1/"); + when(mockfs.getUri()).thenReturn(mockBase); + Path working = new Path("mock://test-nn1/user/me/"); + when(mockfs.getWorkingDirectory()).thenReturn(working); + when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer() { + @Override + public Path answer(InvocationOnMock args) throws Throwable { + return (Path) args.getArguments()[0]; + } + }); + + final URI file = new URI("mock://test-nn1/user/me/file.txt#link"); + final Path filePath = new Path(file); + File link = new File("link"); + + when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer() { + @Override + public FileStatus answer(InvocationOnMock args) throws Throwable { + Path p = (Path)args.getArguments()[0]; + if("file.txt".equals(p.getName())) { + return new FileStatus(201, false, 1, 500, 101, 101, + FsPermission.getDefault(), "me", "me", filePath); + } else { + throw new FileNotFoundException(p+" not supported by mocking"); + } + } + }); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock args) throws Throwable { + //Ignored boolean overwrite = (Boolean) args.getArguments()[0]; + Path src = (Path)args.getArguments()[1]; + Path dst = (Path)args.getArguments()[2]; + if("file.txt".equals(src.getName())) { + File f = new File(dst.toUri().getPath()); + FileWriter writer = new FileWriter(f); + try { + writer.append("This is a test file\n"); + } finally { + if(writer != null) writer.close(); + } + } else { + throw new FileNotFoundException(src+" not supported by mocking"); + } + return null; + } + }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class)); + + DistributedCache.addCacheFile(file, conf); + conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101"); + conf.set(MRJobConfig.CACHE_FILES_SIZES, "201"); + conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false"); + conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); + conf.set(MRJobConfig.CACHE_SYMLINK, "yes"); + LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); + try { + manager.setup(conf); + assertTrue(link.exists()); + } finally { + manager.close(); + } + assertFalse(link.exists()); + } + + @SuppressWarnings("rawtypes") + @Test + public void testEmptyDownload() throws Exception { + JobConf conf = new JobConf(); + conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); + + URI mockBase = new URI("mock://test-nn1/"); + when(mockfs.getUri()).thenReturn(mockBase); + Path working = new Path("mock://test-nn1/user/me/"); + when(mockfs.getWorkingDirectory()).thenReturn(working); + when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer() { + @Override + public Path answer(InvocationOnMock args) throws Throwable { + return (Path) args.getArguments()[0]; + } + }); + + when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer() { + @Override + public FileStatus answer(InvocationOnMock args) throws Throwable { + Path p = (Path)args.getArguments()[0]; + throw new FileNotFoundException(p+" not supported by mocking"); + } + }); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock args) throws Throwable { + //Ignored boolean overwrite = (Boolean) args.getArguments()[0]; + Path src = (Path)args.getArguments()[1]; + throw new FileNotFoundException(src+" not supported by mocking"); + } + }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class)); + + conf.set(MRJobConfig.CACHE_FILES, ""); + conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); + conf.set(MRJobConfig.CACHE_SYMLINK, "yes"); + LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); + try { + manager.setup(conf); + } finally { + manager.close(); + } + } + + + @SuppressWarnings("rawtypes") + @Test + public void testDuplicateDownload() throws Exception { + JobConf conf = new JobConf(); + conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); + + URI mockBase = new URI("mock://test-nn1/"); + when(mockfs.getUri()).thenReturn(mockBase); + Path working = new Path("mock://test-nn1/user/me/"); + when(mockfs.getWorkingDirectory()).thenReturn(working); + when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer() { + @Override + public Path answer(InvocationOnMock args) throws Throwable { + return (Path) args.getArguments()[0]; + } + }); + + final URI file = new URI("mock://test-nn1/user/me/file.txt#link"); + final Path filePath = new Path(file); + File link = new File("link"); + + when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer() { + @Override + public FileStatus answer(InvocationOnMock args) throws Throwable { + Path p = (Path)args.getArguments()[0]; + if("file.txt".equals(p.getName())) { + return new FileStatus(201, false, 1, 500, 101, 101, + FsPermission.getDefault(), "me", "me", filePath); + } else { + throw new FileNotFoundException(p+" not supported by mocking"); + } + } + }); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock args) throws Throwable { + //Ignored boolean overwrite = (Boolean) args.getArguments()[0]; + Path src = (Path)args.getArguments()[1]; + Path dst = (Path)args.getArguments()[2]; + if("file.txt".equals(src.getName())) { + File f = new File(dst.toUri().getPath()); + FileWriter writer = new FileWriter(f); + try { + writer.append("This is a test file\n"); + } finally { + if(writer != null) writer.close(); + } + } else { + throw new FileNotFoundException(src+" not supported by mocking"); + } + return null; + } + }).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class)); + + DistributedCache.addCacheFile(file, conf); + DistributedCache.addCacheFile(file, conf); + conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101"); + conf.set(MRJobConfig.CACHE_FILES_SIZES, "201,201"); + conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "false,false"); + conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); + conf.set(MRJobConfig.CACHE_SYMLINK, "yes"); + LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); + try { + manager.setup(conf); + assertTrue(link.exists()); + } finally { + manager.close(); + } + assertFalse(link.exists()); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java index 43368078214..c9ce7cba9b7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java @@ -117,7 +117,8 @@ public class TestMRWithDistributedCache extends TestCase { TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1, symlinkFile.length()); - TestCase.assertFalse("second file should not be symlinked", + //This last one is a difference between MRv2 and MRv1 + TestCase.assertTrue("second file should be symlinked too", expectedAbsentSymlinkFile.exists()); } }