diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 46b377d1e2..2334730af4 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -30,19 +30,21 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; -import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.ietf.jgss.GSSException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import javax.security.sasl.SaslException; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; @@ -57,20 +59,16 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class PutHDFSTest { - private NiFiProperties mockNiFiProperties; private KerberosProperties kerberosProperties; private FileSystem mockFileSystem; @Before public void setup() { - mockNiFiProperties = mock(NiFiProperties.class); - when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); kerberosProperties = new KerberosProperties(null); mockFileSystem = new MockFileSystem(); } @@ -191,14 +189,12 @@ public class PutHDFSTest { @Test public void testPutFile() throws IOException { - // Refer to comment in the BeforeClass method for an explanation - PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); - try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { - Map attributes = new HashMap(); + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { + Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); runner.enqueue(fis, attributes); runner.run(); @@ -225,15 +221,13 @@ public class PutHDFSTest { @Test public void testPutFileWithCompression() throws IOException { - // Refer to comment in the BeforeClass method for an explanation - PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); runner.setProperty(PutHDFS.COMPRESSION_CODEC, "GZIP"); - try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { - Map attributes = new HashMap(); + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { + Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); runner.enqueue(fis, attributes); runner.run(); @@ -252,31 +246,60 @@ public class PutHDFSTest { } @Test - public void testPutFileWithException() throws IOException { - // Refer to comment in the BeforeClass method for an explanation + public void testPutFileWithGSSException() throws IOException { + FileSystem noCredentialsFileSystem = new MockFileSystem() { + @Override + public FileStatus getFileStatus(Path path) throws IOException { + throw new IOException("ioe", new SaslException("sasle", new GSSException(13))); + } + }; + TestRunner runner = TestRunners.newTestRunner(new TestablePutHDFS(kerberosProperties, noCredentialsFileSystem)); + runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + } + + // assert no flowfiles transferred to outgoing relationships + runner.assertTransferCount(PutHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(PutHDFS.REL_FAILURE, 0); + // assert the input flowfile was penalized + List penalizedFlowFiles = runner.getPenalizedFlowFiles(); + assertEquals(1, penalizedFlowFiles.size()); + assertEquals("randombytes-1", penalizedFlowFiles.iterator().next().getAttribute(CoreAttributes.FILENAME.key())); + // assert the processor's queue is not empty + assertFalse(runner.isQueueEmpty()); + assertEquals(1, runner.getQueueSize().getObjectCount()); + // assert the input file is back on the queue + ProcessSession session = runner.getProcessSessionFactory().createSession(); + FlowFile queuedFlowFile = session.get(); + assertNotNull(queuedFlowFile); + assertEquals("randombytes-1", queuedFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + session.rollback(); + } + + @Test + public void testPutFileWithProcessException() throws IOException { String dirName = "target/testPutFileWrongPermissions"; File file = new File(dirName); file.mkdirs(); Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), mockFileSystem.getWorkingDirectory()); - final KerberosProperties testKerberosProperties = kerberosProperties; - TestRunner runner = TestRunners.newTestRunner(new PutHDFS() { + TestRunner runner = TestRunners.newTestRunner(new TestablePutHDFS(kerberosProperties, mockFileSystem) { @Override protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name, FlowFile flowFile) { throw new ProcessException("Forcing Exception to get thrown in order to verify proper handling"); } - - @Override - protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { - return testKerberosProperties; - } }); runner.setProperty(PutHDFS.DIRECTORY, dirName); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); - try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { - Map attributes = new HashMap(); + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { + Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); runner.enqueue(fis, attributes); runner.run(); @@ -292,13 +315,11 @@ public class PutHDFSTest { @Test public void testPutFileWhenDirectoryUsesValidELFunction() throws IOException { - // Refer to comment in the BeforeClass method for an explanation - PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "target/data_${literal('testing'):substring(0,4)}"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); - try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); runner.enqueue(fis, attributes); @@ -319,8 +340,6 @@ public class PutHDFSTest { @Test public void testPutFileWhenDirectoryUsesUnrecognizedEL() throws IOException { - // Refer to comment in the BeforeClass method for an explanation - PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(proc); @@ -329,8 +348,8 @@ public class PutHDFSTest { runner.setProperty(PutHDFS.DIRECTORY, "data_${literal('testing'):substring(0,4)%7D"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); - try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { - Map attributes = new HashMap(); + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { + Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); runner.enqueue(fis, attributes); runner.run(); @@ -340,9 +359,7 @@ public class PutHDFSTest { } @Test - public void testPutFileWhenDirectoryUsesInvalidEL() throws IOException { - // Refer to comment in the BeforeClass method for an explanation - + public void testPutFileWhenDirectoryUsesInvalidEL() { PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); TestRunner runner = TestRunners.newTestRunner(proc); // the validator should pick up the invalid EL @@ -356,7 +373,7 @@ public class PutHDFSTest { private KerberosProperties testKerberosProperties; private FileSystem fileSystem; - public TestablePutHDFS(KerberosProperties testKerberosProperties, FileSystem fileSystem) { + TestablePutHDFS(KerberosProperties testKerberosProperties, FileSystem fileSystem) { this.testKerberosProperties = testKerberosProperties; this.fileSystem = fileSystem; } @@ -367,7 +384,7 @@ public class PutHDFSTest { } @Override - protected FileSystem getFileSystem(Configuration config) throws IOException { + protected FileSystem getFileSystem(Configuration config) { return fileSystem; } @@ -386,24 +403,24 @@ public class PutHDFSTest { } @Override - public FSDataInputStream open(final Path f, final int bufferSize) throws IOException { + public FSDataInputStream open(final Path f, final int bufferSize) { return null; } @Override public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, - final long blockSize, final Progressable progress) throws IOException { + final long blockSize, final Progressable progress) { pathToStatus.put(f, newFile(f)); return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")); } @Override - public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException { + public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) { return null; } @Override - public boolean rename(final Path src, final Path dst) throws IOException { + public boolean rename(final Path src, final Path dst) { if (pathToStatus.containsKey(src)) { pathToStatus.put(dst, pathToStatus.remove(src)); } else { @@ -413,7 +430,7 @@ public class PutHDFSTest { } @Override - public boolean delete(final Path f, final boolean recursive) throws IOException { + public boolean delete(final Path f, final boolean recursive) { if (pathToStatus.containsKey(f)) { pathToStatus.remove(f); } else { @@ -423,7 +440,7 @@ public class PutHDFSTest { } @Override - public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException { + public FileStatus[] listStatus(final Path f) { return null; } @@ -438,12 +455,12 @@ public class PutHDFSTest { } @Override - public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { + public boolean mkdirs(final Path f, final FsPermission permission) { return false; } @Override - public boolean mkdirs(Path f) throws IOException { + public boolean mkdirs(Path f) { pathToStatus.put(f, newDir(f)); return true; } @@ -456,7 +473,7 @@ public class PutHDFSTest { } @Override - public boolean exists(Path f) throws IOException { + public boolean exists(Path f) { return pathToStatus.containsKey(f); }