diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java index 5d1c669e500..50cc63094bd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java @@ -21,7 +21,6 @@ package org.apache.hadoop.mapred; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -31,9 +30,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.ArrayList; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -44,22 +45,31 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.util.functional.CallableRaisingIOE; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +/** + * Test the LocalDistributedCacheManager using mocking. + * This suite is brittle to changes in the class under test. + */ @SuppressWarnings("deprecation") public class TestLocalDistributedCacheManager { + private static final byte[] TEST_DATA = "This is a test file\n".getBytes(); + private static FileSystem mockfs; public static class MockFileSystem extends FilterFileSystem { @@ -70,6 +80,14 @@ public class TestLocalDistributedCacheManager { private File localDir; + /** + * Recursive delete of a path. + * For safety, paths of length under 5 are rejected. + * @param file path to delete. + * @throws IOException never, it is just "a dummy in the method signature" + * @throws IllegalArgumentException path too short + * @throws RuntimeException File.delete() failed. + */ private static void delete(File file) throws IOException { if (file.getAbsolutePath().length() < 5) { throw new IllegalArgumentException( @@ -109,9 +127,9 @@ public class TestLocalDistributedCacheManager { * Mock input stream based on a byte array so that it can be used by a * FSDataInputStream. */ - private static class MockInputStream extends ByteArrayInputStream + private static final class MockInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable { - public MockInputStream(byte[] buf) { + private MockInputStream(byte[] buf) { super(buf); } @@ -134,47 +152,45 @@ public class TestLocalDistributedCacheManager { when(mockfs.getUri()).thenReturn(mockBase); Path working = new Path("mock://test-nn1/user/me/"); when(mockfs.getWorkingDirectory()).thenReturn(working); - when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer() { - @Override - public Path answer(InvocationOnMock args) throws Throwable { - return (Path) args.getArguments()[0]; - } - }); + when(mockfs.resolvePath(any(Path.class))).thenAnswer( + (Answer) args -> (Path) args.getArguments()[0]); final URI file = new URI("mock://test-nn1/user/me/file.txt#link"); final Path filePath = new Path(file); File link = new File("link"); + // return a filestatus for the file "*/file.txt"; raise FNFE for anything else when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer() { @Override public FileStatus answer(InvocationOnMock args) throws Throwable { Path p = (Path)args.getArguments()[0]; if("file.txt".equals(p.getName())) { - return new FileStatus(201, false, 1, 500, 101, 101, - FsPermission.getDefault(), "me", "me", filePath); + return createMockTestFileStatus(filePath); } else { - throw new FileNotFoundException(p+" not supported by mocking"); + throw notMocked(p); } } }); when(mockfs.getConf()).thenReturn(conf); final FSDataInputStream in = - new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes())); - when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer() { - @Override - public FSDataInputStream answer(InvocationOnMock args) throws Throwable { - Path src = (Path)args.getArguments()[0]; - if ("file.txt".equals(src.getName())) { - return in; - } else { - throw new FileNotFoundException(src+" not supported by mocking"); - } - } - }); + new FSDataInputStream(new MockInputStream(TEST_DATA)); + + // file.txt: return an openfile builder which will eventually return the data, + // anything else: FNFE + when(mockfs.openFile(any(Path.class))).thenAnswer( + (Answer) args -> { + Path src = (Path)args.getArguments()[0]; + if ("file.txt".equals(src.getName())) { + return new MockOpenFileBuilder(mockfs, src, + () -> CompletableFuture.completedFuture(in)); + } else { + throw notMocked(src); + } + }); Job.addCacheFile(file, conf); - Map policies = new HashMap(); + Map policies = new HashMap<>(); policies.put(file.toString(), true); Job.setFileSharedCacheUploadPolicies(conf, policies); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101"); @@ -191,6 +207,12 @@ public class TestLocalDistributedCacheManager { assertFalse(link.exists()); } + /** + * This test case sets the mock FS to raise FNFE + * on any getFileStatus/openFile calls. + * If the manager successfully starts up, it means that + * no files were probed for/opened. + */ @Test public void testEmptyDownload() throws Exception { JobID jobId = new JobID(); @@ -201,30 +223,21 @@ public class TestLocalDistributedCacheManager { when(mockfs.getUri()).thenReturn(mockBase); Path working = new Path("mock://test-nn1/user/me/"); when(mockfs.getWorkingDirectory()).thenReturn(working); - when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer() { - @Override - public Path answer(InvocationOnMock args) throws Throwable { - return (Path) args.getArguments()[0]; - } - }); + when(mockfs.resolvePath(any(Path.class))).thenAnswer( + (Answer) args -> (Path) args.getArguments()[0]); - when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new Answer() { - @Override - public FileStatus answer(InvocationOnMock args) throws Throwable { - Path p = (Path)args.getArguments()[0]; - throw new FileNotFoundException(p+" not supported by mocking"); - } - }); + when(mockfs.getFileStatus(any(Path.class))).thenAnswer( + (Answer) args -> { + Path p = (Path)args.getArguments()[0]; + throw notMocked(p); + }); when(mockfs.getConf()).thenReturn(conf); - when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer() { - @Override - public FSDataInputStream answer(InvocationOnMock args) throws Throwable { - Path src = (Path)args.getArguments()[0]; - throw new FileNotFoundException(src+" not supported by mocking"); - } - }); - + when(mockfs.openFile(any(Path.class))).thenAnswer( + (Answer) args -> { + Path src = (Path)args.getArguments()[0]; + throw notMocked(src); + }); conf.set(MRJobConfig.CACHE_FILES, ""); conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath()); LocalDistributedCacheManager manager = new LocalDistributedCacheManager(); @@ -236,6 +249,9 @@ public class TestLocalDistributedCacheManager { } + /** + * The same file can be added to the cache twice. + */ @Test public void testDuplicateDownload() throws Exception { JobID jobId = new JobID(); @@ -246,12 +262,8 @@ public class TestLocalDistributedCacheManager { when(mockfs.getUri()).thenReturn(mockBase); Path working = new Path("mock://test-nn1/user/me/"); when(mockfs.getWorkingDirectory()).thenReturn(working); - when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer() { - @Override - public Path answer(InvocationOnMock args) throws Throwable { - return (Path) args.getArguments()[0]; - } - }); + when(mockfs.resolvePath(any(Path.class))).thenAnswer( + (Answer) args -> (Path) args.getArguments()[0]); final URI file = new URI("mock://test-nn1/user/me/file.txt#link"); final Path filePath = new Path(file); @@ -262,32 +274,30 @@ public class TestLocalDistributedCacheManager { public FileStatus answer(InvocationOnMock args) throws Throwable { Path p = (Path)args.getArguments()[0]; if("file.txt".equals(p.getName())) { - return new FileStatus(201, false, 1, 500, 101, 101, - FsPermission.getDefault(), "me", "me", filePath); + return createMockTestFileStatus(filePath); } else { - throw new FileNotFoundException(p+" not supported by mocking"); + throw notMocked(p); } } }); when(mockfs.getConf()).thenReturn(conf); final FSDataInputStream in = - new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes())); - when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer() { - @Override - public FSDataInputStream answer(InvocationOnMock args) throws Throwable { - Path src = (Path)args.getArguments()[0]; - if ("file.txt".equals(src.getName())) { - return in; - } else { - throw new FileNotFoundException(src+" not supported by mocking"); - } - } - }); + new FSDataInputStream(new MockInputStream(TEST_DATA)); + when(mockfs.openFile(any(Path.class))).thenAnswer( + (Answer) args -> { + Path src = (Path)args.getArguments()[0]; + if ("file.txt".equals(src.getName())) { + return new MockOpenFileBuilder(mockfs, src, + () -> CompletableFuture.completedFuture(in)); + } else { + throw notMocked(src); + } + }); Job.addCacheFile(file, conf); Job.addCacheFile(file, conf); - Map policies = new HashMap(); + Map policies = new HashMap<>(); policies.put(file.toString(), true); Job.setFileSharedCacheUploadPolicies(conf, policies); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101"); @@ -306,7 +316,7 @@ public class TestLocalDistributedCacheManager { /** * This test tries to replicate the issue with the previous version of - * {@ref LocalDistributedCacheManager} when the resulting timestamp is + * {@link LocalDistributedCacheManager} when the resulting timestamp is * identical as that in another process. Unfortunately, it is difficult * to mimic such behavior in a single process unit test. And mocking * the unique id (timestamp previously, UUID otherwise) won't prove the @@ -321,7 +331,7 @@ public class TestLocalDistributedCacheManager { final int threadCount = 10; final CyclicBarrier barrier = new CyclicBarrier(threadCount); - ArrayList> setupCallable = new ArrayList<>(); + List> setupCallable = new ArrayList<>(); for (int i = 0; i < threadCount; ++i) { setupCallable.add(() -> { barrier.await(); @@ -340,4 +350,58 @@ public class TestLocalDistributedCacheManager { manager.close(); } } + + /** + * Create test file status using test data as the length. + * @param filePath path to the file + * @return a file status. + */ + private FileStatus createMockTestFileStatus(final Path filePath) { + return new FileStatus(TEST_DATA.length, false, 1, 500, 101, 101, + FsPermission.getDefault(), "me", "me", filePath); + } + + /** + * Exception to throw on a not mocked path. + * @return a FileNotFoundException + */ + private FileNotFoundException notMocked(final Path p) { + return new FileNotFoundException(p + " not supported by mocking"); + } + + /** + * Openfile builder where the build operation is a l-expression + * supplied in the constructor. + */ + private static final class MockOpenFileBuilder extends + FutureDataInputStreamBuilderImpl { + + /** + * Operation to invoke to build the result. + */ + private final CallableRaisingIOE> + buildTheResult; + + /** + * Create the builder. the FS and path must be non-null. + * FileSystem.getConf() is the only method invoked of the FS by + * the superclass. + * @param fileSystem fs + * @param path path to open + * @param buildTheResult builder operation. + */ + private MockOpenFileBuilder(final FileSystem fileSystem, Path path, + final CallableRaisingIOE> buildTheResult) { + super(fileSystem, path); + this.buildTheResult = buildTheResult; + } + + @Override + public CompletableFuture build() + throws IllegalArgumentException, UnsupportedOperationException, + IOException { + return buildTheResult.apply(); + } + } + }