diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 39d7a6d53c..fe0262eeaa 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -285,6 +285,18 @@ MockFtpServer test + + org.apache.mina + mina-core + 2.0.19 + test + + + org.apache.sshd + sshd-core + 1.7.0 + test + com.bazaarvoice.jolt jolt-core diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java new file mode 100644 index 0000000000..a4f532ae8e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.io.FileUtils; +import org.apache.nifi.processors.standard.util.SFTPTransfer; +import org.apache.nifi.processors.standard.util.SSHTestServer; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class TestGetSFTP { + + private static final Logger logger = LoggerFactory.getLogger(TestGetSFTP.class); + + private TestRunner getSFTPRunner; + private static SSHTestServer sshTestServer; + + @BeforeClass + public static void setupSSHD() throws IOException { + sshTestServer = new SSHTestServer(); + sshTestServer.startServer(); + } + + @AfterClass + public static void cleanupSSHD() throws IOException { + sshTestServer.stopServer(); + } + + @Before + public void setup(){ + getSFTPRunner = TestRunners.newTestRunner(GetSFTP.class); + getSFTPRunner.setProperty(SFTPTransfer.HOSTNAME, "localhost"); + getSFTPRunner.setProperty(SFTPTransfer.PORT, Integer.toString(sshTestServer.getSSHPort())); + getSFTPRunner.setProperty(SFTPTransfer.USERNAME, sshTestServer.getUsername()); + getSFTPRunner.setProperty(SFTPTransfer.PASSWORD, sshTestServer.getPassword()); + getSFTPRunner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, "false"); + getSFTPRunner.setProperty(SFTPTransfer.DATA_TIMEOUT, "30 sec"); + getSFTPRunner.setProperty(SFTPTransfer.REMOTE_PATH, "/"); + getSFTPRunner.removeProperty(SFTPTransfer.FILE_FILTER_REGEX); + getSFTPRunner.setProperty(SFTPTransfer.PATH_FILTER_REGEX, ""); + getSFTPRunner.setProperty(SFTPTransfer.POLLING_INTERVAL, "60 sec"); + getSFTPRunner.setProperty(SFTPTransfer.RECURSIVE_SEARCH, "false"); + getSFTPRunner.setProperty(SFTPTransfer.IGNORE_DOTTED_FILES, "true"); + getSFTPRunner.setProperty(SFTPTransfer.DELETE_ORIGINAL, "true"); + getSFTPRunner.setProperty(SFTPTransfer.MAX_SELECTS, "100"); + getSFTPRunner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "5000"); + + getSFTPRunner.setValidateExpressionUsage(false); + } + + @Test + public void testGetSFTPFileBasicRead() throws IOException { + emptyTestDirectory(); + + touchFile(sshTestServer.getVirtualFileSystemPath() + "testFile1.txt"); + touchFile(sshTestServer.getVirtualFileSystemPath() + "testFile2.txt"); + touchFile(sshTestServer.getVirtualFileSystemPath() + "testFile3.txt"); + touchFile(sshTestServer.getVirtualFileSystemPath() + "testFile4.txt"); + + getSFTPRunner.run(); + + getSFTPRunner.assertTransferCount(GetSFTP.REL_SUCCESS, 4); + + //Verify files deleted + for(int i=1;i<5;i++){ + Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile" + i + ".txt"); + Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists()); + } + + getSFTPRunner.clearTransferState(); + } + + @Test + public void testGetSFTPIgnoreDottedFiles() throws IOException { + emptyTestDirectory(); + + touchFile(sshTestServer.getVirtualFileSystemPath() + "testFile1.txt"); + touchFile(sshTestServer.getVirtualFileSystemPath() + ".testFile2.txt"); + touchFile(sshTestServer.getVirtualFileSystemPath() + "testFile3.txt"); + touchFile(sshTestServer.getVirtualFileSystemPath() + ".testFile4.txt"); + + getSFTPRunner.run(); + + getSFTPRunner.assertTransferCount(GetSFTP.REL_SUCCESS, 2); + + //Verify non-dotted files were deleted and dotted files were not deleted + Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile1.txt"); + Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists()); + + file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile3.txt"); + Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists()); + + file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/.testFile2.txt"); + Assert.assertTrue("File deleted.", file1.toAbsolutePath().toFile().exists()); + + file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/.testFile4.txt"); + Assert.assertTrue("File deleted.", file1.toAbsolutePath().toFile().exists()); + + getSFTPRunner.clearTransferState(); + } + + private void touchFile(String file) throws IOException { + FileUtils.writeStringToFile(new File(file), "", "UTF-8"); + } + + private void emptyTestDirectory() throws IOException { + //Delete Virtual File System folder + Path dir = Paths.get(sshTestServer.getVirtualFileSystemPath()); + FileUtils.cleanDirectory(dir.toFile()); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java new file mode 100644 index 0000000000..194e485ce2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.io.FileUtils; +import org.apache.nifi.processors.standard.util.FileTransfer; +import org.apache.nifi.processors.standard.util.SFTPTransfer; +import org.apache.nifi.processors.standard.util.SSHTestServer; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +public class TestPutSFTP { + + private static final Logger logger = LoggerFactory.getLogger(TestPutSFTP.class); + + private TestRunner putSFTPRunner; + private static SSHTestServer sshTestServer; + + private final String testFile = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "hello.txt"; + + @BeforeClass + public static void setupSSHD() throws IOException { + sshTestServer = new SSHTestServer(); + sshTestServer.startServer(); + } + + @AfterClass + public static void cleanupSSHD() throws IOException { + sshTestServer.stopServer(); + } + + @Before + public void setup(){ + putSFTPRunner = TestRunners.newTestRunner(PutSFTP.class); + putSFTPRunner.setProperty(SFTPTransfer.HOSTNAME, "localhost"); + putSFTPRunner.setProperty(SFTPTransfer.PORT, Integer.toString(sshTestServer.getSSHPort())); + putSFTPRunner.setProperty(SFTPTransfer.USERNAME, sshTestServer.getUsername()); + putSFTPRunner.setProperty(SFTPTransfer.PASSWORD, sshTestServer.getPassword()); + putSFTPRunner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, "false"); + putSFTPRunner.setProperty(SFTPTransfer.BATCH_SIZE, "2"); + putSFTPRunner.setProperty(SFTPTransfer.REMOTE_PATH, "nifi_test/"); + putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true"); + putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE); + putSFTPRunner.setProperty(SFTPTransfer.CREATE_DIRECTORY, "true"); + putSFTPRunner.setProperty(SFTPTransfer.DATA_TIMEOUT, "30 sec"); + putSFTPRunner.setValidateExpressionUsage(false); + } + + @Test + public void testPutSFTPFile() throws IOException { + emptyTestDirectory(); + + Map attributes = new HashMap<>(); + attributes.put("filename", "testfile.txt"); + + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + putSFTPRunner.run(); + + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); + + //verify directory exists + Path newDirectory = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/"); + Path newFile = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/testfile.txt"); + Assert.assertTrue("New directory not created.", newDirectory.toAbsolutePath().toFile().exists()); + Assert.assertTrue("New File not created.", newFile.toAbsolutePath().toFile().exists()); + putSFTPRunner.clearTransferState(); + } + + @Test + public void testPutSFTPFileZeroByte() throws IOException { + emptyTestDirectory(); + + Map attributes = new HashMap<>(); + attributes.put("filename", "testfile.txt"); + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + + attributes = new HashMap<>(); + attributes.put("filename", "testfile1.txt"); + putSFTPRunner.enqueue("", attributes); + + putSFTPRunner.run(); + + //Two files in batch, should have only 1 transferred to sucess, 1 to failure + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); + putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 1); + putSFTPRunner.clearTransferState(); + + attributes = new HashMap<>(); + attributes.put("filename", "testfile1.txt"); + putSFTPRunner.enqueue("", attributes); + + putSFTPRunner.run(); + + //One files in batch, should have 0 transferred to output since it's zero byte + putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 1); + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); + putSFTPRunner.clearTransferState(); + + //allow zero byte files + putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false"); + + attributes = new HashMap<>(); + attributes.put("filename", "testfile1.txt"); + putSFTPRunner.enqueue("", attributes); + + putSFTPRunner.run(); + + //should have 1 transferred to sucess + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); + + //revert settings + putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "true"); + putSFTPRunner.clearTransferState(); + } + + @Test + public void testPutSFTPFileConflictResolution() throws IOException { + emptyTestDirectory(); + + //Try transferring file with the same name as a directory, should fail in all cases + // except RESOLUTION of NONE + Path dir = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test" ); + Path dir2 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "nifi_test/testfile" ); + Files.createDirectory(dir); + Files.createDirectory(dir2); + + Map attributes = new HashMap<>(); + attributes.put("filename", "testfile"); + + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + putSFTPRunner.run(); + + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); + putSFTPRunner.assertTransferCount(PutSFTP.REL_FAILURE, 1); + + //Prepare by uploading test file + attributes = new HashMap<>(); + attributes.put("filename", "testfile.txt"); + + putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE); + + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + putSFTPRunner.run(); + putSFTPRunner.clearTransferState(); + + //set conflict resolution mode to REJECT + putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REJECT); + + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + putSFTPRunner.run(); + + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); + putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 1); + putSFTPRunner.clearTransferState(); + + //set conflict resolution mode to IGNORE + putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_IGNORE); + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + putSFTPRunner.run(); + + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); + putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 0); + + putSFTPRunner.clearTransferState(); + + //set conflict resolution mode to FAIL + putSFTPRunner.setProperty(SFTPTransfer.CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_FAIL); + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + putSFTPRunner.run(); + + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 0); + putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 0); + putSFTPRunner.assertTransferCount(PutSFTP.REL_FAILURE, 1); + + putSFTPRunner.clearTransferState(); + } + + @Test + public void testPutSFTPBatching() throws IOException { + emptyTestDirectory(); + + Map attributes = new HashMap<>(); + attributes.put("filename", "testfile.txt"); + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + + attributes = new HashMap<>(); + attributes.put("filename", "testfile2.txt"); + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + + attributes = new HashMap<>(); + attributes.put("filename", "testfile3.txt"); + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + + attributes = new HashMap<>(); + attributes.put("filename", "testfile4.txt"); + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + + attributes = new HashMap<>(); + attributes.put("filename", "testfile5.txt"); + putSFTPRunner.enqueue(Paths.get(testFile), attributes); + + putSFTPRunner.run(); + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); + + putSFTPRunner.clearTransferState(); + + putSFTPRunner.run(); + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2); + + putSFTPRunner.clearTransferState(); + + putSFTPRunner.run(); + putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 1); + putSFTPRunner.clearTransferState(); + } + + private void emptyTestDirectory() throws IOException { + //Delete Virtual File System folder + Path dir = Paths.get(sshTestServer.getVirtualFileSystemPath()); + FileUtils.cleanDirectory(dir.toFile()); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java new file mode 100644 index 0000000000..f3d810b34d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import org.apache.commons.io.FileUtils; +import org.apache.sshd.common.NamedFactory; +import org.apache.sshd.common.file.virtualfs.VirtualFileSystemFactory; +import org.apache.sshd.server.Command; +import org.apache.sshd.server.SshServer; +import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; +import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +public class SSHTestServer { + public int getSSHPort(){ + return sshd.getPort(); + } + + public String getVirtualFileSystemPath() { + return virtualFileSystemPath; + } + + public void setVirtualFileSystemPath(String virtualFileSystemPath) { + this.virtualFileSystemPath = virtualFileSystemPath; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + private static SshServer sshd; + private String virtualFileSystemPath = "target/ssh_vfs/"; + + private String username = "nifiuser"; + private String password = "nifipassword"; + + public void SSHTestServer(){ + + } + + public void startServer() throws IOException { + sshd = SshServer.setUpDefaultServer(); + sshd.setHost("localhost"); + + sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider()); + + //Accept all keys for authentication + sshd.setPublickeyAuthenticator((s, publicKey, serverSession) -> true); + + //Allow username/password authentication using pre-defined credentials + sshd.setPasswordAuthenticator((username, password, serverSession) -> this.username.equals(username) && this.password.equals(password)); + + //Setup Virtual File System (VFS) + //Ensure VFS folder exists + Path dir = Paths.get(getVirtualFileSystemPath()); + Files.createDirectories(dir); + sshd.setFileSystemFactory(new VirtualFileSystemFactory(dir.toAbsolutePath())); + + //Add SFTP support + List> sftpCommandFactory = new ArrayList<>(); + sftpCommandFactory.add(new SftpSubsystemFactory()); + sshd.setSubsystemFactories(sftpCommandFactory); + + sshd.start(); + } + + public void stopServer() throws IOException { + if(sshd == null) return; + sshd.stop(true); + + //Delete Virtual File System folder + Path dir = Paths.get(getVirtualFileSystemPath()); + FileUtils.deleteDirectory(dir.toFile()); + } +} \ No newline at end of file