diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java index c5ff8f327d..e10f42b851 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java @@ -21,7 +21,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.processors.standard.util.SSHTestServer; -import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.AfterEach; @@ -33,14 +33,33 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; class TestPutSFTP { private static final String FLOW_FILE_CONTENTS = TestPutSFTP.class.getSimpleName(); + private static final String LOCALHOST = "localhost"; + + private static final String LOCALHOST_ADDRESS = "127.0.0.1"; + + private static final String REMOTE_DIRECTORY = "nifi_test/"; + + private static final String FIRST_FILENAME = "1.txt"; + + private static final String TRANSFER_HOST_ATTRIBUTE = "transfer-host"; + + private static final int BATCH_SIZE = 2; + + private static final byte[] ZERO_BYTES = new byte[]{}; + + private static final String TRANSIT_URI_FORMAT = "sftp://%s"; + private SSHTestServer sshTestServer; private TestRunner runner; @@ -51,16 +70,16 @@ class TestPutSFTP { sshTestServer.startServer(); runner = TestRunners.newTestRunner(PutSFTP.class); - runner.setProperty(SFTPTransfer.HOSTNAME, "localhost"); + runner.setProperty(SFTPTransfer.HOSTNAME, LOCALHOST); runner.setProperty(SFTPTransfer.PORT, Integer.toString(sshTestServer.getSSHPort())); runner.setProperty(SFTPTransfer.USERNAME, sshTestServer.getUsername()); runner.setProperty(SFTPTransfer.PASSWORD, sshTestServer.getPassword()); - runner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, "false"); - runner.setProperty(SFTPTransfer.BATCH_SIZE, "2"); - runner.setProperty(SFTPTransfer.REMOTE_PATH, "nifi_test/"); - runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true"); + runner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, Boolean.FALSE.toString()); + runner.setProperty(SFTPTransfer.BATCH_SIZE, Integer.toString(BATCH_SIZE)); + runner.setProperty(SFTPTransfer.REMOTE_PATH, REMOTE_DIRECTORY); + runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, Boolean.TRUE.toString()); runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE); - runner.setProperty(SFTPTransfer.CREATE_DIRECTORY, "true"); + runner.setProperty(SFTPTransfer.CREATE_DIRECTORY, Boolean.TRUE.toString()); runner.setProperty(SFTPTransfer.DATA_TIMEOUT, "30 sec"); runner.setValidateExpressionUsage(false); } @@ -74,176 +93,158 @@ class TestPutSFTP { @Test void testRunNewDirectory() { - runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt")); + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME)); runner.run(); runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); - //verify directory exists - Path newDirectory = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/"); - Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/1.txt"); + Path newDirectory = Paths.get(sshTestServer.getVirtualFileSystemPath() + REMOTE_DIRECTORY); + Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + REMOTE_DIRECTORY + FIRST_FILENAME); assertTrue(newDirectory.toAbsolutePath().toFile().exists(), "New Directory not created"); assertTrue(newFile.toAbsolutePath().toFile().exists(), "New File not created"); runner.clearTransferState(); } @Test - void testRunZeroByteFile() { - runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt")); - runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "2.txt")); - + void testRunZeroByteFileRejected() { + runner.enqueue(ZERO_BYTES, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME)); runner.run(); - //Two files in batch, should have only 1 transferred to sucess, 1 to failure - runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); runner.assertTransferCount(PutSFTP.REL_REJECT, 1); - runner.clearTransferState(); - - runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt")); - - runner.run(); - - //One files in batch, should have 0 transferred to output since it's zero byte - runner.assertTransferCount(PutSFTP.REL_REJECT, 1); - runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); - runner.clearTransferState(); - - //allow zero byte files - runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false"); - - runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt")); - - runner.run(); - - //should have 1 transferred to sucess - runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); - - //revert settings - runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true"); - runner.clearTransferState(); } @Test - void testRunConflictResolution() throws IOException { - final String directoryName = "nifi_test"; - final String filename = "1"; + void testRunZeroByteFileAllowed() { + runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, Boolean.FALSE.toString()); + runner.enqueue(ZERO_BYTES, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME)); + runner.run(); - final Path directory = Paths.get(sshTestServer.getVirtualFileSystemPath() + directoryName ); - final Path subDirectory = Paths.get(directory.toString(), filename); - Files.createDirectory(directory); - Files.createFile(subDirectory); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); + } - final Map flowFileAttributes = Collections.singletonMap(CoreAttributes.FILENAME.key(), filename); + @Test + void testRunConflictResolutionReplaceStrategy() throws IOException { + createRemoteFile(); - // REPLACE Strategy runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE); - - runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes); + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME)); runner.run(); runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); runner.assertTransferCount(PutSFTP.REL_REJECT, 0); runner.assertTransferCount(PutSFTP.REL_FAILURE, 0); - runner.clearTransferState(); + } + + @Test + void testRunConflictResolutionRejectStrategy() throws IOException { + createRemoteFile(); - // REJECT Strategy runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REJECT); - - runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes); + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME)); runner.run(); runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); runner.assertTransferCount(PutSFTP.REL_REJECT, 1); runner.assertTransferCount(PutSFTP.REL_FAILURE, 0); - runner.clearTransferState(); + } + + @Test + void testRunConflictResolutionIgnoreStrategy() throws IOException { + createRemoteFile(); - // IGNORE Strategy runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_IGNORE); - runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes); + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME)); runner.run(); runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); runner.assertTransferCount(PutSFTP.REL_REJECT, 0); runner.assertTransferCount(PutSFTP.REL_FAILURE, 0); + } - runner.clearTransferState(); + @Test + void testRunConflictResolutionFailStrategy() throws IOException { + createRemoteFile(); - // FAIL Strategy runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_FAIL); - runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes); + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME)); runner.run(); runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); runner.assertTransferCount(PutSFTP.REL_REJECT, 0); runner.assertTransferCount(PutSFTP.REL_FAILURE, 1); - - runner.clearTransferState(); } @Test void testRunBatching() { - runner.setProperty(SFTPTransfer.BATCH_SIZE, "2"); - - runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt")); - runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "2.txt")); - runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "3.txt")); - runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "4.txt")); - runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "5.txt")); + final int files = 4; + for (int fileNumber = 1; fileNumber <= files; fileNumber++) { + final String filename = Integer.toString(fileNumber); + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), filename)); + } runner.run(); - runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); - + runner.assertTransferCount(PutSFTP.REL_SUCCESS, BATCH_SIZE); runner.clearTransferState(); runner.run(); - runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); - - runner.clearTransferState(); - - runner.run(); - runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); - runner.clearTransferState(); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, BATCH_SIZE); + runner.assertQueueEmpty(); } @Test void testRunTransitUri() { - runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false"); - Map attributes = new HashMap<>(); - attributes.put("filename", "testfile.txt"); - attributes.put("transfer-host","localhost"); - runner.enqueue(FLOW_FILE_CONTENTS, attributes); + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), FIRST_FILENAME)); - attributes = new HashMap<>(); - attributes.put("filename", "testfile1.txt"); - attributes.put("transfer-host","127.0.0.1"); - - runner.enqueue(FLOW_FILE_CONTENTS, attributes); runner.run(); - runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); - runner.getProvenanceEvents().forEach(k -> assertTrue(k.getTransitUri().contains("sftp://localhost"))); - //Two files in batch, should have 2 transferred to success, 0 to failure - runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); - runner.assertTransferCount(PutSFTP.REL_REJECT, 0); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); - MockFlowFile flowFile1 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(0); - MockFlowFile flowFile2 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(1); - runner.clearProvenanceEvents(); - runner.clearTransferState(); + final List records = runner.getProvenanceEvents(); + assertFalse(records.isEmpty()); - //Test different destinations on flow file attributes - runner.setProperty(SFTPTransfer.HOSTNAME,"${transfer-host}"); //set to derive hostname - - runner.setThreadCount(1); - runner.enqueue(flowFile1); - runner.enqueue(flowFile2); - runner.run(); - - runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); - assertTrue(runner.getProvenanceEvents().get(0).getTransitUri().contains("sftp://localhost")); - assertTrue(runner.getProvenanceEvents().get(1).getTransitUri().contains("sftp://127.0.0.1")); - - runner.clearProvenanceEvents(); - runner.clearTransferState(); + final ProvenanceEventRecord record = records.iterator().next(); + final String firstTransitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST); + assertTrue(record.getTransitUri().startsWith(firstTransitUri), "Transit URI not found"); } -} \ No newline at end of file + + @Test + void testRunTransitUriDifferentHosts() { + runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, Boolean.FALSE.toString()); + runner.setProperty(SFTPTransfer.HOSTNAME, "${transfer-host}"); + + final Map firstAttributes = new LinkedHashMap<>(); + firstAttributes.put(CoreAttributes.FILENAME.key(), FIRST_FILENAME); + firstAttributes.put(TRANSFER_HOST_ATTRIBUTE, LOCALHOST); + runner.enqueue(FLOW_FILE_CONTENTS, firstAttributes); + + final Map secondAttributes = new LinkedHashMap<>(); + secondAttributes.put(CoreAttributes.FILENAME.key(), FIRST_FILENAME); + secondAttributes.put(TRANSFER_HOST_ATTRIBUTE, LOCALHOST_ADDRESS); + runner.enqueue(FLOW_FILE_CONTENTS, secondAttributes); + + runner.run(); + + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); + + final List records = runner.getProvenanceEvents(); + + final String firstTransitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST); + final Optional firstRecord = records.stream() + .filter(record -> record.getTransitUri().startsWith(firstTransitUri)) + .findFirst(); + assertTrue(firstRecord.isPresent(), "First Transit URI not found"); + + final String secondTransitUri = String.format(TRANSIT_URI_FORMAT, LOCALHOST_ADDRESS); + final Optional secondRecord = records.stream() + .filter(record -> record.getTransitUri().startsWith(secondTransitUri)) + .findFirst(); + assertTrue(secondRecord.isPresent(), "Second Transit URI not found"); + } + + private void createRemoteFile() throws IOException { + final Path directory = Paths.get(sshTestServer.getVirtualFileSystemPath() + REMOTE_DIRECTORY); + final Path subDirectory = Paths.get(directory.toString(), FIRST_FILENAME); + Files.createDirectory(directory); + Files.createFile(subDirectory); + } +}