diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java index 3b9af1ad71..a57ea225e4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java @@ -95,12 +95,16 @@ public abstract class PutFileTransfer extends AbstractPr } final ComponentLog logger = getLogger(); - final String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); + String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); final int maxNumberOfFiles = context.getProperty(FileTransfer.BATCH_SIZE).asInteger(); int fileCount = 0; try (final T transfer = getFileTransfer(context)) { do { + //check if hostname is regular expression requiring evaluation + if(context.getProperty(FileTransfer.HOSTNAME).isExpressionLanguagePresent()) { + hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); + } final String rootPath = context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue(); final String workingDirPath; if (StringUtils.isBlank(rootPath)) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java index 1400fc4775..5809ea4359 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java @@ -145,6 +145,57 @@ public class TestFTP { Assert.assertTrue(results.exists("c:\\data\\randombytes-1")); } + @Test + public void basicProvenanceEventTest() throws IOException { + TestRunner runner = TestRunners.newTestRunner(PutFTP.class); + + runner.setProperty(FTPTransfer.HOSTNAME, "localhost"); + runner.setProperty(FTPTransfer.USERNAME, username); + runner.setProperty(FTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort)); + + // Get two flowfiles to test by running data + try (FileInputStream fis = new FileInputStream("src/test/resources/randombytes-1")) { + Map attributes = new HashMap(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + attributes.put("transfer-host", "localhost"); + runner.enqueue(fis, attributes); + runner.run(); + } + try (FileInputStream fis = new FileInputStream("src/test/resources/hello.txt")) { + Map attributes = new HashMap(); + attributes.put(CoreAttributes.FILENAME.key(), "hello.txt"); + attributes.put("transfer-host", "127.0.0.1"); + runner.enqueue(fis, attributes); + runner.run(); + } + runner.assertQueueEmpty(); + runner.assertTransferCount(PutFTP.REL_SUCCESS, 2); + + MockFlowFile flowFile1 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(0); + MockFlowFile flowFile2 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(1); + + runner.clearProvenanceEvents(); + runner.clearTransferState(); + HashMap map1 = new HashMap<>(); + HashMap map2 = new HashMap<>(); + map1.put(CoreAttributes.FILENAME.key(), "randombytes-xx"); + map2.put(CoreAttributes.FILENAME.key(), "randombytes-yy"); + + flowFile1.putAttributes(map1); + flowFile2.putAttributes(map2); + //set to derive hostname + runner.setProperty(FTPTransfer.HOSTNAME, "${transfer-host}"); + runner.setThreadCount(1); + runner.enqueue(flowFile1); + runner.enqueue(flowFile2); + runner.run(); + + runner.assertTransferCount(PutFTP.REL_SUCCESS, 2); + assert(runner.getProvenanceEvents().get(0).getTransitUri().contains("ftp://localhost")); + assert(runner.getProvenanceEvents().get(1).getTransitUri().contains("ftp://127.0.0.1")); + } + @Test public void basicFileGet() throws IOException { FileSystem results = fakeFtpServer.getFileSystem(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java index 194e485ce2..36aa2ede44 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java @@ -20,6 +20,7 @@ import org.apache.commons.io.FileUtils; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.processors.standard.util.SSHTestServer; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.AfterClass; @@ -243,9 +244,56 @@ public class TestPutSFTP { putSFTPRunner.clearTransferState(); } + @Test + public void testPutSFTPProvenanceTransitUri() throws IOException { + emptyTestDirectory(); + + putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false"); + Map attributes = new HashMap<>(); + attributes.put("filename", "testfile.txt"); + attributes.put("transfer-host","localhost"); + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + + attributes = new HashMap<>(); + attributes.put("filename", "testfile1.txt"); + attributes.put("transfer-host","127.0.0.1"); + + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + putSFTPRunner.run(); + + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); + putSFTPRunner.getProvenanceEvents().forEach(k->{ + assert(k.getTransitUri().contains("sftp://localhost")); + }); + //Two files in batch, should have 2 transferred to success, 0 to failure + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); + putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 0); + + MockFlowFile flowFile1 = putSFTPRunner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(0); + MockFlowFile flowFile2 = putSFTPRunner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(1); + putSFTPRunner.clearProvenanceEvents(); + putSFTPRunner.clearTransferState(); + + //Test different destinations on flow file attributes + putSFTPRunner.setProperty(SFTPTransfer.HOSTNAME,"${transfer-host}"); //set to derive hostname + + putSFTPRunner.setThreadCount(1); + putSFTPRunner.enqueue(flowFile1); + putSFTPRunner.enqueue(flowFile2); + putSFTPRunner.run(); + + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); + assert(putSFTPRunner.getProvenanceEvents().get(0).getTransitUri().contains("sftp://localhost")); + assert(putSFTPRunner.getProvenanceEvents().get(1).getTransitUri().contains("sftp://127.0.0.1")); + + putSFTPRunner.clearProvenanceEvents(); + putSFTPRunner.clearTransferState(); + } + private void emptyTestDirectory() throws IOException { //Delete Virtual File System folder Path dir = Paths.get(sshTestServer.getVirtualFileSystemPath()); FileUtils.cleanDirectory(dir.toFile()); } + } \ No newline at end of file