From ca8bc17c0aa26a3ffd4ab171e964a60c1518361b Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Fri, 7 Jan 2022 11:49:33 -0600 Subject: [PATCH] NIFI-7089 Upgraded Apache SSHD from 1.7.0 to 2.8.0 - Replaced fake-sftp-server-rule with test Apache SSHD Server Signed-off-by: Pierre Villard This closes #5646. --- .../nifi-standard-processors/pom.xml | 16 +- .../nifi/processors/standard/TestGetSFTP.java | 12 +- .../processors/standard/TestListSFTP.java | 224 ++++-------------- .../nifi/processors/standard/TestPutSFTP.java | 5 - .../standard/util/SSHTestServer.java | 34 +-- nifi-nar-bundles/nifi-standard-bundle/pom.xml | 10 + 6 files changed, 85 insertions(+), 216 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 22c8362d8d..bb16d879e8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -323,12 +323,6 @@ MockFtpServer test - - org.apache.sshd - sshd-core - 1.7.0 - test - com.bazaarvoice.jolt jolt-core @@ -382,9 +376,13 @@ test - com.github.stefanbirkner - fake-sftp-server-rule - 2.0.1 + org.apache.sshd + sshd-core + test + + + org.apache.sshd + sshd-sftp test diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java index 5d063c75cc..79d57cd58a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetSFTP.java @@ -26,8 +26,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -36,8 +34,6 @@ import java.nio.file.Paths; public class TestGetSFTP { - private static final Logger logger = LoggerFactory.getLogger(TestGetSFTP.class); - private TestRunner getSFTPRunner; private static SSHTestServer sshTestServer; @@ -90,7 +86,7 @@ public class TestGetSFTP { //Verify files deleted for(int i=1;i<5;i++){ Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile" + i + ".txt"); - Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists()); + Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists()); } getSFTPRunner.clearTransferState(); @@ -114,7 +110,7 @@ public class TestGetSFTP { // Verify files deleted for (int i = 1; i < 3; i++) { Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile" + i + ".txt"); - Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists()); + Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists()); } getSFTPRunner.clearTransferState(); @@ -140,10 +136,10 @@ public class TestGetSFTP { //Verify non-dotted files were deleted and dotted files were not deleted Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile1.txt"); - Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists()); + Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists()); file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile3.txt"); - Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists()); + Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists()); file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/.testFile2.txt"); Assert.assertTrue("File deleted.", file1.toAbsolutePath().toFile().exists()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java index 11e1ff5176..18ec6375dd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java @@ -14,193 +14,73 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.processors.standard; -import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule; - +import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.nio.file.Files; import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.UUID; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult.Outcome; -import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.VerifiableProcessor; -import org.apache.nifi.processor.util.list.AbstractListProcessor; import org.apache.nifi.processors.standard.util.FTPTransfer; -import org.apache.nifi.processors.standard.util.FileInfo; -import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; +import org.apache.nifi.processors.standard.util.SSHTestServer; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.junit.Rule; -import java.security.SecureRandom; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class TestListSFTP { - @Rule - public final FakeSftpServerRule sftpServer = new FakeSftpServerRule(); - int port; + private static final String REMOTE_DIRECTORY = "/"; - final String username = "nifi-sftp-user"; - final String password = "Test test test chocolate"; + private static final byte[] FILE_CONTENTS = String.class.getName().getBytes(StandardCharsets.UTF_8); + + private TestRunner runner; + + private SSHTestServer sshServer; + + private String tempFileName; @Before public void setUp() throws Exception { - sftpServer.addUser(username, password); - port = sftpServer.getPort(); + sshServer = new SSHTestServer(); + sshServer.startServer(); + writeTempFile(); - sftpServer.putFile("/directory/smallfile.txt", "byte", StandardCharsets.UTF_8); + runner = TestRunners.newTestRunner(ListSFTP.class); + runner.setProperty(ListSFTP.HOSTNAME, sshServer.getHost()); + runner.setProperty(ListSFTP.USERNAME, sshServer.getUsername()); + runner.setProperty(SFTPTransfer.PASSWORD, sshServer.getPassword()); + runner.setProperty(FTPTransfer.PORT, Integer.toString(sshServer.getSSHPort())); + runner.setProperty(ListSFTP.REMOTE_PATH, REMOTE_DIRECTORY); + runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS); - sftpServer.putFile("/directory/file.txt", "a bit more content in this file", StandardCharsets.UTF_8); - - byte[] bytes = new byte[120]; - SecureRandom.getInstanceStrong().nextBytes(bytes); - - sftpServer.putFile("/directory/file.bin", bytes); + runner.assertValid(); + assertVerificationSuccess(); } @After public void tearDown() throws Exception { - sftpServer.deleteAllFilesAndDirectories(); - } - - @Test(timeout = 5000) - public void testListingWhileConcurrentlyWritingIntoMultipleDirectories() throws Exception { - AtomicInteger fileCounter = new AtomicInteger(1); - - List createdFileNames = new ArrayList<>(); - - CountDownLatch finishScheduledRun = new CountDownLatch(1); - CountDownLatch reachScanningSubDir = new CountDownLatch(1); - CountDownLatch writeMoreFiles = new CountDownLatch(1); - - String baseDir = "/base/"; - String subDir = "/base/subdir/"; - - TestRunner runner = TestRunners.newTestRunner(new ListSFTP() { - @Override - protected FileTransfer getFileTransfer(ProcessContext context) { - return new SFTPTransfer(context, getLogger()){ - @Override - protected void getListing(String path, int depth, int maxResults, List listing, boolean applyFilters) throws IOException { - if (path.contains("subdir")) { - reachScanningSubDir.countDown(); - try { - writeMoreFiles.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - super.getListing(path, depth, maxResults, listing, applyFilters); - } - }; - } - }); - - // This test fails with BY_TIMESTAMPS -// runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS.getValue()); - runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIME_WINDOW.getValue()); - runner.setProperty(ListSFTP.HOSTNAME, "localhost"); - runner.setProperty(ListSFTP.USERNAME, username); - runner.setProperty(SFTPTransfer.PASSWORD, password); - runner.setProperty(FTPTransfer.PORT, Integer.toString(port)); - runner.setProperty(ListSFTP.REMOTE_PATH, baseDir); - runner.setProperty(FileTransfer.RECURSIVE_SEARCH, "true"); - - runner.assertValid(); - - ExecutorService executorService = null; - try { - executorService = Executors.newFixedThreadPool(1); - sftpServer.createDirectory("/base"); - - uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames); - uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames); - - executorService.submit(() -> { - try { - runner.run(1, false); - } finally { - finishScheduledRun.countDown(); - } - }); - - reachScanningSubDir.await(); - - uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames); - Thread.sleep(1100); // Make sure the next file has greater timestamp - uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames); - - writeMoreFiles.countDown(); - - Thread.sleep(1100); // Need to wait for 1+ sec if the file timestamps have only sec precision. - finishScheduledRun.await(); - runner.run(); - - List successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - - List successFileNames = successFiles.stream() - .map(MockFlowFile::getAttributes) - .map(attributes -> attributes.get("filename")) - .sorted() - .collect(Collectors.toList()); - - Collections.sort(createdFileNames); - - assertEquals(createdFileNames, successFileNames); - } finally { - if (executorService != null) { - executorService.shutdown(); - } - } - } - - private void uploadFile(String baseDir, Object fileSuffix, List createdFileNames) throws Exception { - String fileName = "file." + fileSuffix; - - sftpServer.putFile(baseDir + fileName, "unimportant", StandardCharsets.UTF_8); - - createdFileNames.add(fileName); + sshServer.stopServer(); } @Test - public void basicFileList() throws InterruptedException { - TestRunner runner = TestRunners.newTestRunner(ListSFTP.class); - runner.setProperty(ListSFTP.HOSTNAME, "localhost"); - runner.setProperty(ListSFTP.USERNAME, username); - runner.setProperty(SFTPTransfer.PASSWORD, password); - runner.setProperty(FTPTransfer.PORT, Integer.toString(port)); - runner.setProperty(ListSFTP.REMOTE_PATH, "/directory/"); - - runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS); - runner.assertValid(); - - // Ensure wait for enough lag time. - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2); - + public void testRunFileFound() { runner.run(); - assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects. Of those, 3 match the filter."); - - runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3); + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1); runner.assertAllFlowFilesContainAttribute("sftp.remote.host"); runner.assertAllFlowFilesContainAttribute("sftp.remote.port"); runner.assertAllFlowFilesContainAttribute("sftp.listing.user"); @@ -212,46 +92,34 @@ public class TestListSFTP { runner.assertAllFlowFilesContainAttribute( "filename"); final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0); - retrievedFile.assertAttributeEquals("sftp.listing.user", username); + retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername()); + retrievedFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), tempFileName); } - @Test - public void sizeFilteredFileList() throws InterruptedException { - TestRunner runner = TestRunners.newTestRunner(ListSFTP.class); - runner.setProperty(ListSFTP.HOSTNAME, "localhost"); - runner.setProperty(ListSFTP.USERNAME, username); - runner.setProperty(SFTPTransfer.PASSWORD, password); - runner.setProperty(FTPTransfer.PORT, Integer.toString(port)); - runner.setProperty(ListSFTP.REMOTE_PATH, "/directory/"); - runner.setProperty(ListFile.MIN_SIZE, "8B"); - runner.setProperty(ListFile.MAX_SIZE, "100B"); - - - runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS); - runner.assertValid(); - - // Ensure wait for enough lag time. - Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2); + public void testRunFileNotFoundMinSizeFiltered() { + runner.setProperty(ListFile.MIN_SIZE, "1KB"); runner.run(); - assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects. Of those, 1 matches the filter."); - runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1); - - final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0); - //the only file between the limits - retrievedFile.assertAttributeEquals("filename", "file.txt"); + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0); } - private void assertVerificationOutcome(final TestRunner runner, final Outcome expectedOutcome, final String expectedExplanationRegex) { + private void assertVerificationSuccess() { final List results = ((VerifiableProcessor) runner.getProcessor()) .verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); - assertEquals(1, results.size()); final ConfigVerificationResult result = results.get(0); - assertEquals(expectedOutcome, result.getOutcome()); - assertTrue(String.format("Expected verification result to match pattern [%s]. Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()), - result.getExplanation().matches(expectedExplanationRegex)); + assertEquals(Outcome.SUCCESSFUL, result.getOutcome()); + } + + private void writeTempFile() { + final File file = new File(sshServer.getVirtualFileSystemPath(), String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID())); + try { + Files.write(file.toPath(), FILE_CONTENTS); + tempFileName = file.getName(); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java index 36aa2ede44..ac5f163720 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSFTP.java @@ -28,8 +28,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -40,9 +38,6 @@ import java.util.HashMap; import java.util.Map; public class TestPutSFTP { - - private static final Logger logger = LoggerFactory.getLogger(TestPutSFTP.class); - private TestRunner putSFTPRunner; private static SSHTestServer sshTestServer; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java index f3d810b34d..f9747c965e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/SSHTestServer.java @@ -17,12 +17,10 @@ package org.apache.nifi.processors.standard.util; import org.apache.commons.io.FileUtils; -import org.apache.sshd.common.NamedFactory; import org.apache.sshd.common.file.virtualfs.VirtualFileSystemFactory; -import org.apache.sshd.server.Command; import org.apache.sshd.server.SshServer; import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; -import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory; +import org.apache.sshd.sftp.server.SftpSubsystemFactory; import java.io.IOException; import java.nio.file.Files; @@ -32,6 +30,16 @@ import java.util.ArrayList; import java.util.List; public class SSHTestServer { + private static SshServer sshd; + + private String virtualFileSystemPath = "target/ssh_vfs/"; + + private String host = "127.0.0.1"; + + private String username = "nifiuser"; + + private String password = "nifipassword"; + public int getSSHPort(){ return sshd.getPort(); } @@ -60,19 +68,13 @@ public class SSHTestServer { this.password = password; } - private static SshServer sshd; - private String virtualFileSystemPath = "target/ssh_vfs/"; - - private String username = "nifiuser"; - private String password = "nifipassword"; - - public void SSHTestServer(){ - + public String getHost() { + return host; } public void startServer() throws IOException { sshd = SshServer.setUpDefaultServer(); - sshd.setHost("localhost"); + sshd.setHost(host); sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider()); @@ -88,8 +90,7 @@ public class SSHTestServer { Files.createDirectories(dir); sshd.setFileSystemFactory(new VirtualFileSystemFactory(dir.toAbsolutePath())); - //Add SFTP support - List> sftpCommandFactory = new ArrayList<>(); + List sftpCommandFactory = new ArrayList<>(); sftpCommandFactory.add(new SftpSubsystemFactory()); sshd.setSubsystemFactories(sftpCommandFactory); @@ -97,10 +98,11 @@ public class SSHTestServer { } public void stopServer() throws IOException { - if(sshd == null) return; + if (sshd == null) { + return; + } sshd.stop(true); - //Delete Virtual File System folder Path dir = Paths.get(getVirtualFileSystemPath()); FileUtils.deleteDirectory(dir.toFile()); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/pom.xml index 05d2f232c3..018cd5b2d1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml @@ -392,6 +392,16 @@ snakeyaml 1.29 + + org.apache.sshd + sshd-core + 2.8.0 + + + org.apache.sshd + sshd-sftp + 2.8.0 +