NIFI-7089 Upgraded Apache SSHD from 1.7.0 to 2.8.0

- Replaced fake-sftp-server-rule with test Apache SSHD Server

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5646.
This commit is contained in:
exceptionfactory 2022-01-07 11:49:33 -06:00 committed by Pierre Villard
parent 42626adab8
commit ca8bc17c0a
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
6 changed files with 85 additions and 216 deletions

View File

@ -323,12 +323,6 @@
<artifactId>MockFtpServer</artifactId> <artifactId>MockFtpServer</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.sshd</groupId>
<artifactId>sshd-core</artifactId>
<version>1.7.0</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>com.bazaarvoice.jolt</groupId> <groupId>com.bazaarvoice.jolt</groupId>
<artifactId>jolt-core</artifactId> <artifactId>jolt-core</artifactId>
@ -382,9 +376,13 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.github.stefanbirkner</groupId> <groupId>org.apache.sshd</groupId>
<artifactId>fake-sftp-server-rule</artifactId> <artifactId>sshd-core</artifactId>
<version>2.0.1</version> <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.sshd</groupId>
<artifactId>sshd-sftp</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -26,8 +26,6 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -36,8 +34,6 @@ import java.nio.file.Paths;
public class TestGetSFTP { public class TestGetSFTP {
private static final Logger logger = LoggerFactory.getLogger(TestGetSFTP.class);
private TestRunner getSFTPRunner; private TestRunner getSFTPRunner;
private static SSHTestServer sshTestServer; private static SSHTestServer sshTestServer;
@ -90,7 +86,7 @@ public class TestGetSFTP {
//Verify files deleted //Verify files deleted
for(int i=1;i<5;i++){ for(int i=1;i<5;i++){
Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile" + i + ".txt"); Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile" + i + ".txt");
Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists()); Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists());
} }
getSFTPRunner.clearTransferState(); getSFTPRunner.clearTransferState();
@ -114,7 +110,7 @@ public class TestGetSFTP {
// Verify files deleted // Verify files deleted
for (int i = 1; i < 3; i++) { for (int i = 1; i < 3; i++) {
Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile" + i + ".txt"); Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile" + i + ".txt");
Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists()); Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists());
} }
getSFTPRunner.clearTransferState(); getSFTPRunner.clearTransferState();
@ -140,10 +136,10 @@ public class TestGetSFTP {
//Verify non-dotted files were deleted and dotted files were not deleted //Verify non-dotted files were deleted and dotted files were not deleted
Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile1.txt"); Path file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile1.txt");
Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists()); Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists());
file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile3.txt"); file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/testFile3.txt");
Assert.assertTrue("File not deleted.", !file1.toAbsolutePath().toFile().exists()); Assert.assertFalse("File not deleted.", file1.toAbsolutePath().toFile().exists());
file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/.testFile2.txt"); file1 = Paths.get(sshTestServer.getVirtualFileSystemPath() + "/.testFile2.txt");
Assert.assertTrue("File deleted.", file1.toAbsolutePath().toFile().exists()); Assert.assertTrue("File deleted.", file1.toAbsolutePath().toFile().exists());

View File

@ -14,193 +14,73 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.nio.file.Files;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome; import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.VerifiableProcessor; import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.standard.util.FTPTransfer; import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer;
import org.apache.nifi.processors.standard.util.SSHTestServer;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.Rule;
import java.security.SecureRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestListSFTP { public class TestListSFTP {
@Rule private static final String REMOTE_DIRECTORY = "/";
public final FakeSftpServerRule sftpServer = new FakeSftpServerRule();
int port;
final String username = "nifi-sftp-user"; private static final byte[] FILE_CONTENTS = String.class.getName().getBytes(StandardCharsets.UTF_8);
final String password = "Test test test chocolate";
private TestRunner runner;
private SSHTestServer sshServer;
private String tempFileName;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
sftpServer.addUser(username, password); sshServer = new SSHTestServer();
port = sftpServer.getPort(); sshServer.startServer();
writeTempFile();
sftpServer.putFile("/directory/smallfile.txt", "byte", StandardCharsets.UTF_8); runner = TestRunners.newTestRunner(ListSFTP.class);
runner.setProperty(ListSFTP.HOSTNAME, sshServer.getHost());
runner.setProperty(ListSFTP.USERNAME, sshServer.getUsername());
runner.setProperty(SFTPTransfer.PASSWORD, sshServer.getPassword());
runner.setProperty(FTPTransfer.PORT, Integer.toString(sshServer.getSSHPort()));
runner.setProperty(ListSFTP.REMOTE_PATH, REMOTE_DIRECTORY);
runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
sftpServer.putFile("/directory/file.txt", "a bit more content in this file", StandardCharsets.UTF_8); runner.assertValid();
assertVerificationSuccess();
byte[] bytes = new byte[120];
SecureRandom.getInstanceStrong().nextBytes(bytes);
sftpServer.putFile("/directory/file.bin", bytes);
} }
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
sftpServer.deleteAllFilesAndDirectories(); sshServer.stopServer();
}
@Test(timeout = 5000)
public void testListingWhileConcurrentlyWritingIntoMultipleDirectories() throws Exception {
AtomicInteger fileCounter = new AtomicInteger(1);
List<String> createdFileNames = new ArrayList<>();
CountDownLatch finishScheduledRun = new CountDownLatch(1);
CountDownLatch reachScanningSubDir = new CountDownLatch(1);
CountDownLatch writeMoreFiles = new CountDownLatch(1);
String baseDir = "/base/";
String subDir = "/base/subdir/";
TestRunner runner = TestRunners.newTestRunner(new ListSFTP() {
@Override
protected FileTransfer getFileTransfer(ProcessContext context) {
return new SFTPTransfer(context, getLogger()){
@Override
protected void getListing(String path, int depth, int maxResults, List<FileInfo> listing, boolean applyFilters) throws IOException {
if (path.contains("subdir")) {
reachScanningSubDir.countDown();
try {
writeMoreFiles.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
super.getListing(path, depth, maxResults, listing, applyFilters);
}
};
}
});
// This test fails with BY_TIMESTAMPS
// runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS.getValue());
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIME_WINDOW.getValue());
runner.setProperty(ListSFTP.HOSTNAME, "localhost");
runner.setProperty(ListSFTP.USERNAME, username);
runner.setProperty(SFTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, Integer.toString(port));
runner.setProperty(ListSFTP.REMOTE_PATH, baseDir);
runner.setProperty(FileTransfer.RECURSIVE_SEARCH, "true");
runner.assertValid();
ExecutorService executorService = null;
try {
executorService = Executors.newFixedThreadPool(1);
sftpServer.createDirectory("/base");
uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames);
uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames);
executorService.submit(() -> {
try {
runner.run(1, false);
} finally {
finishScheduledRun.countDown();
}
});
reachScanningSubDir.await();
uploadFile(baseDir, fileCounter.getAndIncrement(), createdFileNames);
Thread.sleep(1100); // Make sure the next file has greater timestamp
uploadFile(subDir, "sub." + fileCounter.getAndIncrement(), createdFileNames);
writeMoreFiles.countDown();
Thread.sleep(1100); // Need to wait for 1+ sec if the file timestamps have only sec precision.
finishScheduledRun.await();
runner.run();
List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
List<String> successFileNames = successFiles.stream()
.map(MockFlowFile::getAttributes)
.map(attributes -> attributes.get("filename"))
.sorted()
.collect(Collectors.toList());
Collections.sort(createdFileNames);
assertEquals(createdFileNames, successFileNames);
} finally {
if (executorService != null) {
executorService.shutdown();
}
}
}
private void uploadFile(String baseDir, Object fileSuffix, List<String> createdFileNames) throws Exception {
String fileName = "file." + fileSuffix;
sftpServer.putFile(baseDir + fileName, "unimportant", StandardCharsets.UTF_8);
createdFileNames.add(fileName);
} }
@Test @Test
public void basicFileList() throws InterruptedException { public void testRunFileFound() {
TestRunner runner = TestRunners.newTestRunner(ListSFTP.class);
runner.setProperty(ListSFTP.HOSTNAME, "localhost");
runner.setProperty(ListSFTP.USERNAME, username);
runner.setProperty(SFTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, Integer.toString(port));
runner.setProperty(ListSFTP.REMOTE_PATH, "/directory/");
runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
runner.assertValid();
// Ensure wait for enough lag time.
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2);
runner.run(); runner.run();
assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects. Of those, 3 match the filter.");
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute("sftp.remote.host"); runner.assertAllFlowFilesContainAttribute("sftp.remote.host");
runner.assertAllFlowFilesContainAttribute("sftp.remote.port"); runner.assertAllFlowFilesContainAttribute("sftp.remote.port");
runner.assertAllFlowFilesContainAttribute("sftp.listing.user"); runner.assertAllFlowFilesContainAttribute("sftp.listing.user");
@ -212,46 +92,34 @@ public class TestListSFTP {
runner.assertAllFlowFilesContainAttribute( "filename"); runner.assertAllFlowFilesContainAttribute( "filename");
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0); final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
retrievedFile.assertAttributeEquals("sftp.listing.user", username); retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername());
retrievedFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), tempFileName);
} }
@Test @Test
public void sizeFilteredFileList() throws InterruptedException { public void testRunFileNotFoundMinSizeFiltered() {
TestRunner runner = TestRunners.newTestRunner(ListSFTP.class); runner.setProperty(ListFile.MIN_SIZE, "1KB");
runner.setProperty(ListSFTP.HOSTNAME, "localhost");
runner.setProperty(ListSFTP.USERNAME, username);
runner.setProperty(SFTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, Integer.toString(port));
runner.setProperty(ListSFTP.REMOTE_PATH, "/directory/");
runner.setProperty(ListFile.MIN_SIZE, "8B");
runner.setProperty(ListFile.MAX_SIZE, "100B");
runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
runner.assertValid();
// Ensure wait for enough lag time.
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2);
runner.run(); runner.run();
assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects. Of those, 1 matches the filter."); runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0);
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
//the only file between the limits
retrievedFile.assertAttributeEquals("filename", "file.txt");
} }
private void assertVerificationOutcome(final TestRunner runner, final Outcome expectedOutcome, final String expectedExplanationRegex) { private void assertVerificationSuccess() {
final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor()) final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); .verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(1, results.size()); assertEquals(1, results.size());
final ConfigVerificationResult result = results.get(0); final ConfigVerificationResult result = results.get(0);
assertEquals(expectedOutcome, result.getOutcome()); assertEquals(Outcome.SUCCESSFUL, result.getOutcome());
assertTrue(String.format("Expected verification result to match pattern [%s]. Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()), }
result.getExplanation().matches(expectedExplanationRegex));
private void writeTempFile() {
final File file = new File(sshServer.getVirtualFileSystemPath(), String.format("%s-%s", getClass().getSimpleName(), UUID.randomUUID()));
try {
Files.write(file.toPath(), FILE_CONTENTS);
tempFileName = file.getName();
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
} }
} }

View File

@ -28,8 +28,6 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -40,9 +38,6 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class TestPutSFTP { public class TestPutSFTP {
private static final Logger logger = LoggerFactory.getLogger(TestPutSFTP.class);
private TestRunner putSFTPRunner; private TestRunner putSFTPRunner;
private static SSHTestServer sshTestServer; private static SSHTestServer sshTestServer;

View File

@ -17,12 +17,10 @@
package org.apache.nifi.processors.standard.util; package org.apache.nifi.processors.standard.util;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.file.virtualfs.VirtualFileSystemFactory; import org.apache.sshd.common.file.virtualfs.VirtualFileSystemFactory;
import org.apache.sshd.server.Command;
import org.apache.sshd.server.SshServer; import org.apache.sshd.server.SshServer;
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory; import org.apache.sshd.sftp.server.SftpSubsystemFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@ -32,6 +30,16 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
public class SSHTestServer { public class SSHTestServer {
private static SshServer sshd;
private String virtualFileSystemPath = "target/ssh_vfs/";
private String host = "127.0.0.1";
private String username = "nifiuser";
private String password = "nifipassword";
public int getSSHPort(){ public int getSSHPort(){
return sshd.getPort(); return sshd.getPort();
} }
@ -60,19 +68,13 @@ public class SSHTestServer {
this.password = password; this.password = password;
} }
private static SshServer sshd; public String getHost() {
private String virtualFileSystemPath = "target/ssh_vfs/"; return host;
private String username = "nifiuser";
private String password = "nifipassword";
public void SSHTestServer(){
} }
public void startServer() throws IOException { public void startServer() throws IOException {
sshd = SshServer.setUpDefaultServer(); sshd = SshServer.setUpDefaultServer();
sshd.setHost("localhost"); sshd.setHost(host);
sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider()); sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
@ -88,8 +90,7 @@ public class SSHTestServer {
Files.createDirectories(dir); Files.createDirectories(dir);
sshd.setFileSystemFactory(new VirtualFileSystemFactory(dir.toAbsolutePath())); sshd.setFileSystemFactory(new VirtualFileSystemFactory(dir.toAbsolutePath()));
//Add SFTP support List<SftpSubsystemFactory> sftpCommandFactory = new ArrayList<>();
List<NamedFactory<Command>> sftpCommandFactory = new ArrayList<>();
sftpCommandFactory.add(new SftpSubsystemFactory()); sftpCommandFactory.add(new SftpSubsystemFactory());
sshd.setSubsystemFactories(sftpCommandFactory); sshd.setSubsystemFactories(sftpCommandFactory);
@ -97,10 +98,11 @@ public class SSHTestServer {
} }
public void stopServer() throws IOException { public void stopServer() throws IOException {
if(sshd == null) return; if (sshd == null) {
return;
}
sshd.stop(true); sshd.stop(true);
//Delete Virtual File System folder
Path dir = Paths.get(getVirtualFileSystemPath()); Path dir = Paths.get(getVirtualFileSystemPath());
FileUtils.deleteDirectory(dir.toFile()); FileUtils.deleteDirectory(dir.toFile());
} }

View File

@ -392,6 +392,16 @@
<artifactId>snakeyaml</artifactId> <artifactId>snakeyaml</artifactId>
<version>1.29</version> <version>1.29</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.sshd</groupId>
<artifactId>sshd-core</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.sshd</groupId>
<artifactId>sshd-sftp</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
</project> </project>