From 4b7f8cbe1f860dc99d72a86ea3c211c86b2f7aa8 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Thu, 21 Apr 2022 13:41:08 -0500 Subject: [PATCH] NIFI-9949 This closes #5988. Corrected intermittent failures in TestPutSFTP - Changed SSH server to start and stop after each method - Replaced queued file with string FlowFile contents - Refactored TestPutSFTP using JUnit 5 Signed-off-by: Joe Witt --- .../nifi/processors/standard/TestPutSFTP.java | 319 ++++++++---------- 1 file changed, 137 insertions(+), 182 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java index ac5f163720..c5ff8f327d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java @@ -17,278 +17,233 @@ package org.apache.nifi.processors.standard; import org.apache.commons.io.FileUtils; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.processors.standard.util.SSHTestServer; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; -import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collections; import java.util.HashMap; import java.util.Map; -public class TestPutSFTP { - private TestRunner putSFTPRunner; - private static SSHTestServer sshTestServer; +import static org.junit.jupiter.api.Assertions.assertTrue; - private final String testFile = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "hello.txt"; +class TestPutSFTP { + private static final String FLOW_FILE_CONTENTS = TestPutSFTP.class.getSimpleName(); - @BeforeClass - public static void setupSSHD() throws IOException { + private SSHTestServer sshTestServer; + + private TestRunner runner; + + @BeforeEach + void setRunner() throws IOException { sshTestServer = new SSHTestServer(); sshTestServer.startServer(); + + runner = TestRunners.newTestRunner(PutSFTP.class); + runner.setProperty(SFTPTransfer.HOSTNAME, "localhost"); + runner.setProperty(SFTPTransfer.PORT, Integer.toString(sshTestServer.getSSHPort())); + runner.setProperty(SFTPTransfer.USERNAME, sshTestServer.getUsername()); + runner.setProperty(SFTPTransfer.PASSWORD, sshTestServer.getPassword()); + runner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, "false"); + runner.setProperty(SFTPTransfer.BATCH_SIZE, "2"); + runner.setProperty(SFTPTransfer.REMOTE_PATH, "nifi_test/"); + runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true"); + runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE); + runner.setProperty(SFTPTransfer.CREATE_DIRECTORY, "true"); + runner.setProperty(SFTPTransfer.DATA_TIMEOUT, "30 sec"); + runner.setValidateExpressionUsage(false); } - @AfterClass - public static void cleanupSSHD() throws IOException { + @AfterEach + void clearDirectory() throws IOException { sshTestServer.stopServer(); - } - - @Before - public void setup(){ - putSFTPRunner = TestRunners.newTestRunner(PutSFTP.class); - putSFTPRunner.setProperty(SFTPTransfer.HOSTNAME, "localhost"); - putSFTPRunner.setProperty(SFTPTransfer.PORT, Integer.toString(sshTestServer.getSSHPort())); - putSFTPRunner.setProperty(SFTPTransfer.USERNAME, sshTestServer.getUsername()); - putSFTPRunner.setProperty(SFTPTransfer.PASSWORD, sshTestServer.getPassword()); - putSFTPRunner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, "false"); - putSFTPRunner.setProperty(SFTPTransfer.BATCH_SIZE, "2"); - putSFTPRunner.setProperty(SFTPTransfer.REMOTE_PATH, "nifi_test/"); - putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true"); - putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE); - putSFTPRunner.setProperty(SFTPTransfer.CREATE_DIRECTORY, "true"); - putSFTPRunner.setProperty(SFTPTransfer.DATA_TIMEOUT, "30 sec"); - putSFTPRunner.setValidateExpressionUsage(false); + final Path fileSystemPath = Paths.get(sshTestServer.getVirtualFileSystemPath()); + FileUtils.deleteQuietly(fileSystemPath.toFile()); } @Test - public void testPutSFTPFile() throws IOException { - emptyTestDirectory(); + void testRunNewDirectory() { + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt")); + runner.run(); - Map attributes = new HashMap<>(); - attributes.put("filename", "testfile.txt"); - - putSFTPRunner.enqueue(Paths.get(testFile), attributes); - putSFTPRunner.run(); - - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); //verify directory exists Path newDirectory = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/"); - Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/testfile.txt"); - Assert.assertTrue("New directory not created.", newDirectory.toAbsolutePath().toFile().exists()); - Assert.assertTrue("New File not created.", newFile.toAbsolutePath().toFile().exists()); - putSFTPRunner.clearTransferState(); + Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/1.txt"); + assertTrue(newDirectory.toAbsolutePath().toFile().exists(), "New Directory not created"); + assertTrue(newFile.toAbsolutePath().toFile().exists(), "New File not created"); + runner.clearTransferState(); } @Test - public void testPutSFTPFileZeroByte() throws IOException { - emptyTestDirectory(); + void testRunZeroByteFile() { + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt")); + runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "2.txt")); - Map attributes = new HashMap<>(); - attributes.put("filename", "testfile.txt"); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); - - attributes = new HashMap<>(); - attributes.put("filename", "testfile1.txt"); - putSFTPRunner.enqueue("", attributes); - - putSFTPRunner.run(); + runner.run(); //Two files in batch, should have only 1 transferred to sucess, 1 to failure - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); - putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 1); - putSFTPRunner.clearTransferState(); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); + runner.assertTransferCount(PutSFTP.REL_REJECT, 1); + runner.clearTransferState(); - attributes = new HashMap<>(); - attributes.put("filename", "testfile1.txt"); - putSFTPRunner.enqueue("", attributes); + runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt")); - putSFTPRunner.run(); + runner.run(); //One files in batch, should have 0 transferred to output since it's zero byte - putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 1); - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); - putSFTPRunner.clearTransferState(); + runner.assertTransferCount(PutSFTP.REL_REJECT, 1); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); + runner.clearTransferState(); //allow zero byte files - putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false"); + runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false"); - attributes = new HashMap<>(); - attributes.put("filename", "testfile1.txt"); - putSFTPRunner.enqueue("", attributes); + runner.enqueue("", Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt")); - putSFTPRunner.run(); + runner.run(); //should have 1 transferred to sucess - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); //revert settings - putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true"); - putSFTPRunner.clearTransferState(); + runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true"); + runner.clearTransferState(); } @Test - public void testPutSFTPFileConflictResolution() throws IOException { - emptyTestDirectory(); + void testRunConflictResolution() throws IOException { + final String directoryName = "nifi_test"; + final String filename = "1"; - //Try transferring file with the same name as a directory, should fail in all cases - // except RESOLUTION of NONE - Path dir = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test" ); - Path dir2 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/testfile" ); - Files.createDirectory(dir); - Files.createDirectory(dir2); + final Path directory = Paths.get(sshTestServer.getVirtualFileSystemPath() + directoryName ); + final Path subDirectory = Paths.get(directory.toString(), filename); + Files.createDirectory(directory); + Files.createFile(subDirectory); - Map attributes = new HashMap<>(); - attributes.put("filename", "testfile"); + final Map flowFileAttributes = Collections.singletonMap(CoreAttributes.FILENAME.key(), filename); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); - putSFTPRunner.run(); + // REPLACE Strategy + runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE); - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); - putSFTPRunner.assertTransferCount(PutSFTP.REL_FAILURE, 1); + runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes); + runner.run(); - //Prepare by uploading test file - attributes = new HashMap<>(); - attributes.put("filename", "testfile.txt"); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); + runner.assertTransferCount(PutSFTP.REL_REJECT, 0); + runner.assertTransferCount(PutSFTP.REL_FAILURE, 0); + runner.clearTransferState(); - putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE); + // REJECT Strategy + runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REJECT); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); - putSFTPRunner.run(); - putSFTPRunner.clearTransferState(); + runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes); + runner.run(); - //set conflict resolution mode to REJECT - putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REJECT); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); + runner.assertTransferCount(PutSFTP.REL_REJECT, 1); + runner.assertTransferCount(PutSFTP.REL_FAILURE, 0); + runner.clearTransferState(); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); - putSFTPRunner.run(); + // IGNORE Strategy + runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_IGNORE); + runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes); + runner.run(); - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); - putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 1); - putSFTPRunner.clearTransferState(); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); + runner.assertTransferCount(PutSFTP.REL_REJECT, 0); + runner.assertTransferCount(PutSFTP.REL_FAILURE, 0); - //set conflict resolution mode to IGNORE - putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_IGNORE); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); - putSFTPRunner.run(); + runner.clearTransferState(); - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); - putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 0); + // FAIL Strategy + runner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_FAIL); + runner.enqueue(FLOW_FILE_CONTENTS, flowFileAttributes); + runner.run(); - putSFTPRunner.clearTransferState(); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); + runner.assertTransferCount(PutSFTP.REL_REJECT, 0); + runner.assertTransferCount(PutSFTP.REL_FAILURE, 1); - //set conflict resolution mode to FAIL - putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_FAIL); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); - putSFTPRunner.run(); - - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); - putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 0); - putSFTPRunner.assertTransferCount(PutSFTP.REL_FAILURE, 1); - - putSFTPRunner.clearTransferState(); + runner.clearTransferState(); } @Test - public void testPutSFTPBatching() throws IOException { - emptyTestDirectory(); + void testRunBatching() { + runner.setProperty(SFTPTransfer.BATCH_SIZE, "2"); - Map attributes = new HashMap<>(); - attributes.put("filename", "testfile.txt"); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "1.txt")); + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "2.txt")); + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "3.txt")); + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "4.txt")); + runner.enqueue(FLOW_FILE_CONTENTS, Collections.singletonMap(CoreAttributes.FILENAME.key(), "5.txt")); - attributes = new HashMap<>(); - attributes.put("filename", "testfile2.txt"); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); + runner.run(); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); - attributes = new HashMap<>(); - attributes.put("filename", "testfile3.txt"); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); + runner.clearTransferState(); - attributes = new HashMap<>(); - attributes.put("filename", "testfile4.txt"); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); + runner.run(); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); - attributes = new HashMap<>(); - attributes.put("filename", "testfile5.txt"); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); + runner.clearTransferState(); - putSFTPRunner.run(); - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); - - putSFTPRunner.clearTransferState(); - - putSFTPRunner.run(); - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); - - putSFTPRunner.clearTransferState(); - - putSFTPRunner.run(); - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); - putSFTPRunner.clearTransferState(); + runner.run(); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); + runner.clearTransferState(); } @Test - public void testPutSFTPProvenanceTransitUri() throws IOException { - emptyTestDirectory(); - - putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false"); + void testRunTransitUri() { + runner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false"); Map attributes = new HashMap<>(); attributes.put("filename", "testfile.txt"); attributes.put("transfer-host","localhost"); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); + runner.enqueue(FLOW_FILE_CONTENTS, attributes); attributes = new HashMap<>(); attributes.put("filename", "testfile1.txt"); attributes.put("transfer-host","127.0.0.1"); - putSFTPRunner.enqueue(Paths.get(testFile), attributes); - putSFTPRunner.run(); + runner.enqueue(FLOW_FILE_CONTENTS, attributes); + runner.run(); - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); - putSFTPRunner.getProvenanceEvents().forEach(k->{ - assert(k.getTransitUri().contains("sftp://localhost")); - }); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); + runner.getProvenanceEvents().forEach(k -> assertTrue(k.getTransitUri().contains("sftp://localhost"))); //Two files in batch, should have 2 transferred to success, 0 to failure - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); - putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 0); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); + runner.assertTransferCount(PutSFTP.REL_REJECT, 0); - MockFlowFile flowFile1 = putSFTPRunner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(0); - MockFlowFile flowFile2 = putSFTPRunner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(1); - putSFTPRunner.clearProvenanceEvents(); - putSFTPRunner.clearTransferState(); + MockFlowFile flowFile1 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(0); + MockFlowFile flowFile2 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(1); + runner.clearProvenanceEvents(); + runner.clearTransferState(); //Test different destinations on flow file attributes - putSFTPRunner.setProperty(SFTPTransfer.HOSTNAME,"${transfer-host}"); //set to derive hostname + runner.setProperty(SFTPTransfer.HOSTNAME,"${transfer-host}"); //set to derive hostname - putSFTPRunner.setThreadCount(1); - putSFTPRunner.enqueue(flowFile1); - putSFTPRunner.enqueue(flowFile2); - putSFTPRunner.run(); + runner.setThreadCount(1); + runner.enqueue(flowFile1); + runner.enqueue(flowFile2); + runner.run(); - putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); - assert(putSFTPRunner.getProvenanceEvents().get(0).getTransitUri().contains("sftp://localhost")); - assert(putSFTPRunner.getProvenanceEvents().get(1).getTransitUri().contains("sftp://127.0.0.1")); + runner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); + assertTrue(runner.getProvenanceEvents().get(0).getTransitUri().contains("sftp://localhost")); + assertTrue(runner.getProvenanceEvents().get(1).getTransitUri().contains("sftp://127.0.0.1")); - putSFTPRunner.clearProvenanceEvents(); - putSFTPRunner.clearTransferState(); + runner.clearProvenanceEvents(); + runner.clearTransferState(); } - - private void emptyTestDirectory() throws IOException { - //Delete Virtual File System folder - Path dir = Paths.get(sshTestServer.getVirtualFileSystemPath()); - FileUtils.cleanDirectory(dir.toFile()); - } - } \ No newline at end of file