MAPREDUCE-6441. Improve temporary directory name generation in LocalDistributedCacheManager for concurrent processes (wattsinabox, rchiang, haibochen via snemeth)

This commit is contained in:
Szilard Nemeth 2019-10-18 15:25:50 +02:00
parent 6097e909ec
commit 19755b9b36
3 changed files with 78 additions and 32 deletions

View File

@ -37,7 +37,7 @@
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong; import java.util.UUID;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
@ -82,7 +82,7 @@ class LocalDistributedCacheManager {
* @param conf * @param conf
* @throws IOException * @throws IOException
*/ */
public void setup(JobConf conf) throws IOException { public void setup(JobConf conf, JobID jobId) throws IOException {
File workDir = new File(System.getProperty("user.dir")); File workDir = new File(System.getProperty("user.dir"));
// Generate YARN local resources objects corresponding to the distributed // Generate YARN local resources objects corresponding to the distributed
@ -91,9 +91,7 @@ public void setup(JobConf conf) throws IOException {
new LinkedHashMap<String, LocalResource>(); new LinkedHashMap<String, LocalResource>();
MRApps.setupDistributedCache(conf, localResources); MRApps.setupDistributedCache(conf, localResources);
// Generating unique numbers for FSDownload. // Generating unique numbers for FSDownload.
AtomicLong uniqueNumberGenerator =
new AtomicLong(System.currentTimeMillis());
// Find which resources are to be put on the local classpath // Find which resources are to be put on the local classpath
Map<String, Path> classpaths = new HashMap<String, Path>(); Map<String, Path> classpaths = new HashMap<String, Path>();
Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf); Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
@ -124,9 +122,10 @@ public void setup(JobConf conf) throws IOException {
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf); Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap(); Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
for (LocalResource resource : localResources.values()) { for (LocalResource resource : localResources.values()) {
Path destPathForDownload = new Path(destPath,
jobId.toString() + "_" + UUID.randomUUID().toString());
Callable<Path> download = Callable<Path> download =
new FSDownload(localFSFileContext, ugi, conf, new Path(destPath, new FSDownload(localFSFileContext, ugi, conf, destPathForDownload,
Long.toString(uniqueNumberGenerator.incrementAndGet())),
resource); resource);
Future<Path> future = exec.submit(download); Future<Path> future = exec.submit(download);
resourcesToPaths.put(resource, future); resourcesToPaths.put(resource, future);

View File

@ -169,7 +169,7 @@ public Job(JobID jobid, String jobSubmitDir) throws IOException {
// Manage the distributed cache. If there are files to be copied, // Manage the distributed cache. If there are files to be copied,
// this will trigger localFile to be re-written again. // this will trigger localFile to be re-written again.
localDistributedCacheManager = new LocalDistributedCacheManager(); localDistributedCacheManager = new LocalDistributedCacheManager();
localDistributedCacheManager.setup(conf); localDistributedCacheManager.setup(conf, jobid);
// Write out configuration file. Instead of copying it from // Write out configuration file. Instead of copying it from
// systemJobFile, we re-write it, since setup(), above, may have // systemJobFile, we re-write it, since setup(), above, may have

View File

@ -32,6 +32,13 @@
import java.net.URI; import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -54,37 +61,37 @@
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public class TestLocalDistributedCacheManager { public class TestLocalDistributedCacheManager {
private static FileSystem mockfs; private static FileSystem mockfs;
public static class MockFileSystem extends FilterFileSystem { public static class MockFileSystem extends FilterFileSystem {
public MockFileSystem() { public MockFileSystem() {
super(mockfs); super(mockfs);
} }
} }
private File localDir; private File localDir;
private static void delete(File file) throws IOException { private static void delete(File file) throws IOException {
if (file.getAbsolutePath().length() < 5) { if (file.getAbsolutePath().length() < 5) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Path [" + file + "] is too short, not deleting"); "Path [" + file + "] is too short, not deleting");
} }
if (file.exists()) { if (file.exists()) {
if (file.isDirectory()) { if (file.isDirectory()) {
File[] children = file.listFiles(); File[] children = file.listFiles();
if (children != null) { if (children != null) {
for (File child : children) { for (File child : children) {
delete(child); delete(child);
} }
} }
} }
if (!file.delete()) { if (!file.delete()) {
throw new RuntimeException( throw new RuntimeException(
"Could not delete path [" + file + "]"); "Could not delete path [" + file + "]");
} }
} }
} }
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
mockfs = mock(FileSystem.class); mockfs = mock(FileSystem.class);
@ -93,7 +100,7 @@ public void setup() throws Exception {
delete(localDir); delete(localDir);
localDir.mkdirs(); localDir.mkdirs();
} }
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
delete(localDir); delete(localDir);
@ -120,9 +127,10 @@ public void seek(long position) {}
@Test @Test
public void testDownload() throws Exception { public void testDownload() throws Exception {
JobID jobId = new JobID();
JobConf conf = new JobConf(); JobConf conf = new JobConf();
conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
URI mockBase = new URI("mock://test-nn1/"); URI mockBase = new URI("mock://test-nn1/");
when(mockfs.getUri()).thenReturn(mockBase); when(mockfs.getUri()).thenReturn(mockBase);
Path working = new Path("mock://test-nn1/user/me/"); Path working = new Path("mock://test-nn1/user/me/");
@ -137,14 +145,14 @@ public Path answer(InvocationOnMock args) throws Throwable {
final URI file = new URI("mock://test-nn1/user/me/file.txt#link"); final URI file = new URI("mock://test-nn1/user/me/file.txt#link");
final Path filePath = new Path(file); final Path filePath = new Path(file);
File link = new File("link"); File link = new File("link");
when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() { when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
@Override @Override
public FileStatus answer(InvocationOnMock args) throws Throwable { public FileStatus answer(InvocationOnMock args) throws Throwable {
Path p = (Path)args.getArguments()[0]; Path p = (Path)args.getArguments()[0];
if("file.txt".equals(p.getName())) { if("file.txt".equals(p.getName())) {
return new FileStatus(201, false, 1, 500, 101, 101, return new FileStatus(201, false, 1, 500, 101, 101,
FsPermission.getDefault(), "me", "me", filePath); FsPermission.getDefault(), "me", "me", filePath);
} else { } else {
throw new FileNotFoundException(p+" not supported by mocking"); throw new FileNotFoundException(p+" not supported by mocking");
} }
@ -176,7 +184,7 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
try { try {
manager.setup(conf); manager.setup(conf, jobId);
assertTrue(link.exists()); assertTrue(link.exists());
} finally { } finally {
manager.close(); manager.close();
@ -186,9 +194,10 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
@Test @Test
public void testEmptyDownload() throws Exception { public void testEmptyDownload() throws Exception {
JobID jobId = new JobID();
JobConf conf = new JobConf(); JobConf conf = new JobConf();
conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
URI mockBase = new URI("mock://test-nn1/"); URI mockBase = new URI("mock://test-nn1/");
when(mockfs.getUri()).thenReturn(mockBase); when(mockfs.getUri()).thenReturn(mockBase);
Path working = new Path("mock://test-nn1/user/me/"); Path working = new Path("mock://test-nn1/user/me/");
@ -199,7 +208,7 @@ public Path answer(InvocationOnMock args) throws Throwable {
return (Path) args.getArguments()[0]; return (Path) args.getArguments()[0];
} }
}); });
when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() { when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
@Override @Override
public FileStatus answer(InvocationOnMock args) throws Throwable { public FileStatus answer(InvocationOnMock args) throws Throwable {
@ -221,7 +230,7 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
try { try {
manager.setup(conf); manager.setup(conf, jobId);
} finally { } finally {
manager.close(); manager.close();
} }
@ -230,9 +239,10 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
@Test @Test
public void testDuplicateDownload() throws Exception { public void testDuplicateDownload() throws Exception {
JobID jobId = new JobID();
JobConf conf = new JobConf(); JobConf conf = new JobConf();
conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class); conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
URI mockBase = new URI("mock://test-nn1/"); URI mockBase = new URI("mock://test-nn1/");
when(mockfs.getUri()).thenReturn(mockBase); when(mockfs.getUri()).thenReturn(mockBase);
Path working = new Path("mock://test-nn1/user/me/"); Path working = new Path("mock://test-nn1/user/me/");
@ -247,14 +257,14 @@ public Path answer(InvocationOnMock args) throws Throwable {
final URI file = new URI("mock://test-nn1/user/me/file.txt#link"); final URI file = new URI("mock://test-nn1/user/me/file.txt#link");
final Path filePath = new Path(file); final Path filePath = new Path(file);
File link = new File("link"); File link = new File("link");
when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() { when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer<FileStatus>() {
@Override @Override
public FileStatus answer(InvocationOnMock args) throws Throwable { public FileStatus answer(InvocationOnMock args) throws Throwable {
Path p = (Path)args.getArguments()[0]; Path p = (Path)args.getArguments()[0];
if("file.txt".equals(p.getName())) { if("file.txt".equals(p.getName())) {
return new FileStatus(201, false, 1, 500, 101, 101, return new FileStatus(201, false, 1, 500, 101, 101,
FsPermission.getDefault(), "me", "me", filePath); FsPermission.getDefault(), "me", "me", filePath);
} else { } else {
throw new FileNotFoundException(p+" not supported by mocking"); throw new FileNotFoundException(p+" not supported by mocking");
} }
@ -287,11 +297,48 @@ public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
try { try {
manager.setup(conf); manager.setup(conf, jobId);
assertTrue(link.exists()); assertTrue(link.exists());
} finally { } finally {
manager.close(); manager.close();
} }
assertFalse(link.exists()); assertFalse(link.exists());
} }
/**
* This test tries to replicate the issue with the previous version of
* {@ref LocalDistributedCacheManager} when the resulting timestamp is
* identical as that in another process. Unfortunately, it is difficult
* to mimic such behavior in a single process unit test. And mocking
* the unique id (timestamp previously, UUID otherwise) won't prove the
* validity of one approach over the other.
*/
@Test
public void testMultipleCacheSetup() throws Exception {
JobID jobId = new JobID();
JobConf conf = new JobConf();
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
final int threadCount = 10;
final CyclicBarrier barrier = new CyclicBarrier(threadCount);
ArrayList<Callable<Void>> setupCallable = new ArrayList<>();
for (int i = 0; i < threadCount; ++i) {
setupCallable.add(() -> {
barrier.await();
manager.setup(conf, jobId);
return null;
});
}
ExecutorService ePool = Executors.newFixedThreadPool(threadCount);
try {
for (Future<Void> future : ePool.invokeAll(setupCallable)) {
future.get();
}
} finally {
ePool.shutdown();
manager.close();
}
}
} }