From 57ae9b65a0fd9f9f0880331debabbd75e64e1c40 Mon Sep 17 00:00:00 2001 From: uday Date: Sun, 15 Jul 2018 02:24:58 +0530 Subject: [PATCH] NIFI-1184 Mock FileSystem for PutHDFSTest This closes #2892 Signed-off-by: Mike Thomsen --- .../nifi/processors/hadoop/PutHDFSTest.java | 193 +++++++++++++----- 1 file changed, 140 insertions(+), 53 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 32569ac59f..46b377d1e2 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -17,9 +17,14 @@ package org.apache.nifi.processors.hadoop; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.util.Progressable; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -36,12 +41,14 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URI; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -51,7 +58,6 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -59,29 +65,19 @@ public class PutHDFSTest { private NiFiProperties mockNiFiProperties; private KerberosProperties kerberosProperties; - - @BeforeClass - public static void setUpClass() throws Exception { - /* - * Running Hadoop on Windows requires a special build which will produce required binaries and native modules [1]. Since functionality - * provided by this module and validated by these test does not have any native implication we do not distribute required binaries and native modules - * to support running these tests in Windows environment, therefore they are ignored. You can also get more info from this StackOverflow thread [2] - * - * [1] https://wiki.apache.org/hadoop/Hadoop2OnWindows - * [2] http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path - */ - } + private FileSystem mockFileSystem; @Before public void setup() { mockNiFiProperties = mock(NiFiProperties.class); when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); kerberosProperties = new KerberosProperties(null); + mockFileSystem = new MockFileSystem(); } @Test public void testValidators() { - PutHDFS proc = new TestablePutHDFS(kerberosProperties); + PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(proc); Collection results; ProcessContext pc; @@ -119,7 +115,7 @@ public class PutHDFSTest { assertTrue(vr.toString().contains("is invalid because short integer must be greater than zero")); } - proc = new TestablePutHDFS(kerberosProperties); + proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); runner = TestRunners.newTestRunner(proc); results = new HashSet<>(); runner.setProperty(PutHDFS.DIRECTORY, "/target"); @@ -134,7 +130,7 @@ public class PutHDFSTest { assertTrue(vr.toString().contains("is invalid because short integer must be greater than zero")); } - proc = new TestablePutHDFS(kerberosProperties); + proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); runner = TestRunners.newTestRunner(proc); results = new HashSet<>(); runner.setProperty(PutHDFS.DIRECTORY, "/target"); @@ -149,7 +145,7 @@ public class PutHDFSTest { assertTrue(vr.toString().contains("is invalid because octal umask [-1] cannot be negative")); } - proc = new TestablePutHDFS(kerberosProperties); + proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); runner = TestRunners.newTestRunner(proc); results = new HashSet<>(); runner.setProperty(PutHDFS.DIRECTORY, "/target"); @@ -178,7 +174,7 @@ public class PutHDFSTest { } results = new HashSet<>(); - proc = new TestablePutHDFS(kerberosProperties); + proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "/target"); runner.setProperty(PutHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName()); @@ -196,9 +192,8 @@ public class PutHDFSTest { @Test public void testPutFile() throws IOException { // Refer to comment in the BeforeClass method for an explanation - assumeTrue(isNotWindows()); - PutHDFS proc = new TestablePutHDFS(kerberosProperties); + PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); @@ -209,9 +204,6 @@ public class PutHDFSTest { runner.run(); } - Configuration config = new Configuration(); - FileSystem fs = FileSystem.get(config); - List failedFlowFiles = runner .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build()); assertTrue(failedFlowFiles.isEmpty()); @@ -219,7 +211,7 @@ public class PutHDFSTest { List flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS); assertEquals(1, flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); - assertTrue(fs.exists(new Path("target/test-classes/randombytes-1"))); + assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1"))); assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key())); assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); @@ -234,9 +226,8 @@ public class PutHDFSTest { @Test public void testPutFileWithCompression() throws IOException { // Refer to comment in the BeforeClass method for an explanation - assumeTrue(isNotWindows()); - PutHDFS proc = new TestablePutHDFS(kerberosProperties); + PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); @@ -248,9 +239,6 @@ public class PutHDFSTest { runner.run(); } - Configuration config = new Configuration(); - FileSystem fs = FileSystem.get(config); - List failedFlowFiles = runner .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build()); assertTrue(failedFlowFiles.isEmpty()); @@ -258,7 +246,7 @@ public class PutHDFSTest { List flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS); assertEquals(1, flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); - assertTrue(fs.exists(new Path("target/test-classes/randombytes-1.gz"))); + assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1.gz"))); assertEquals("randombytes-1.gz", flowFile.getAttribute(CoreAttributes.FILENAME.key())); assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); } @@ -266,14 +254,11 @@ public class PutHDFSTest { @Test public void testPutFileWithException() throws IOException { // Refer to comment in the BeforeClass method for an explanation - assumeTrue(isNotWindows()); String dirName = "target/testPutFileWrongPermissions"; File file = new File(dirName); file.mkdirs(); - Configuration config = new Configuration(); - FileSystem fs = FileSystem.get(config); - Path p = new Path(dirName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); + Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), mockFileSystem.getWorkingDirectory()); final KerberosProperties testKerberosProperties = kerberosProperties; TestRunner runner = TestRunners.newTestRunner(new PutHDFS() { @@ -302,28 +287,24 @@ public class PutHDFSTest { assertFalse(failedFlowFiles.isEmpty()); assertTrue(failedFlowFiles.get(0).isPenalized()); - fs.delete(p, true); + mockFileSystem.delete(p, true); } @Test public void testPutFileWhenDirectoryUsesValidELFunction() throws IOException { // Refer to comment in the BeforeClass method for an explanation - assumeTrue(isNotWindows()); - PutHDFS proc = new TestablePutHDFS(kerberosProperties); + PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "target/data_${literal('testing'):substring(0,4)}"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { - Map attributes = new HashMap(); + Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); runner.enqueue(fis, attributes); runner.run(); } - Configuration config = new Configuration(); - FileSystem fs = FileSystem.get(config); - List failedFlowFiles = runner .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build()); assertTrue(failedFlowFiles.isEmpty()); @@ -331,7 +312,7 @@ public class PutHDFSTest { List flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS); assertEquals(1, flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); - assertTrue(fs.exists(new Path("target/test-classes/randombytes-1"))); + assertTrue(mockFileSystem.exists(new Path("target/data_test/randombytes-1"))); assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key())); assertEquals("target/data_test", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); } @@ -339,9 +320,8 @@ public class PutHDFSTest { @Test public void testPutFileWhenDirectoryUsesUnrecognizedEL() throws IOException { // Refer to comment in the BeforeClass method for an explanation - assumeTrue(isNotWindows()); - PutHDFS proc = new TestablePutHDFS(kerberosProperties); + PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(proc); // this value somehow causes NiFi to not even recognize the EL, and thus it returns successfully from calling @@ -362,9 +342,8 @@ public class PutHDFSTest { @Test public void testPutFileWhenDirectoryUsesInvalidEL() throws IOException { // Refer to comment in the BeforeClass method for an explanation - assumeTrue(isNotWindows()); - PutHDFS proc = new TestablePutHDFS(kerberosProperties); + PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(proc); // the validator should pick up the invalid EL runner.setProperty(PutHDFS.DIRECTORY, "target/data_${literal('testing'):foo()}"); @@ -372,22 +351,130 @@ public class PutHDFSTest { runner.assertNotValid(); } - private boolean isNotWindows() { - return !System.getProperty("os.name").startsWith("Windows"); - } - - private static class TestablePutHDFS extends PutHDFS { + private class TestablePutHDFS extends PutHDFS { private KerberosProperties testKerberosProperties; + private FileSystem fileSystem; - public TestablePutHDFS(KerberosProperties testKerberosProperties) { + public TestablePutHDFS(KerberosProperties testKerberosProperties, FileSystem fileSystem) { this.testKerberosProperties = testKerberosProperties; + this.fileSystem = fileSystem; } @Override protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { return testKerberosProperties; } + + @Override + protected FileSystem getFileSystem(Configuration config) throws IOException { + return fileSystem; + } + + @Override + protected FileSystem getFileSystem() { + return fileSystem; + } } + private class MockFileSystem extends FileSystem { + private final Map pathToStatus = new HashMap<>(); + + @Override + public URI getUri() { + return URI.create("file:///"); + } + + @Override + public FSDataInputStream open(final Path f, final int bufferSize) throws IOException { + return null; + } + + @Override + public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, + final long blockSize, final Progressable progress) throws IOException { + pathToStatus.put(f, newFile(f)); + return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")); + } + + @Override + public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException { + return null; + } + + @Override + public boolean rename(final Path src, final Path dst) throws IOException { + if (pathToStatus.containsKey(src)) { + pathToStatus.put(dst, pathToStatus.remove(src)); + } else { + return false; + } + return true; + } + + @Override + public boolean delete(final Path f, final boolean recursive) throws IOException { + if (pathToStatus.containsKey(f)) { + pathToStatus.remove(f); + } else { + return false; + } + return true; + } + + @Override + public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException { + return null; + } + + @Override + public void setWorkingDirectory(final Path new_dir) { + + } + + @Override + public Path getWorkingDirectory() { + return new Path(new File(".").getAbsolutePath()); + } + + @Override + public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { + return false; + } + + @Override + public boolean mkdirs(Path f) throws IOException { + pathToStatus.put(f, newDir(f)); + return true; + } + + @Override + public FileStatus getFileStatus(final Path f) throws IOException { + final FileStatus fileStatus = pathToStatus.get(f); + if (fileStatus == null) throw new FileNotFoundException(); + return fileStatus; + } + + @Override + public boolean exists(Path f) throws IOException { + return pathToStatus.containsKey(f); + } + + private FileStatus newFile(Path p) { + return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0644), "owner", "group", p); + } + + private FileStatus newDir(Path p) { + return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", p); + } + + @Override + public long getDefaultBlockSize(Path f) { + return 33554432L; + } + } + + static FsPermission perms(short p) { + return new FsPermission(p); + } }