NIFI-9990 This closes #6030. Improved FTP 550 file unavailable handling

- Improved File Not Found reply detection
- Added Permission Denied reply handling

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
exceptionfactory 2022-05-10 10:08:14 -05:00 committed by Joe Witt
parent 313d70520d
commit 30f7c1ba1e
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
2 changed files with 66 additions and 18 deletions

View File

@ -135,6 +135,10 @@ public class FTPTransfer implements FileTransfer {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
private static final int REPLY_CODE_FILE_UNAVAILABLE = 550;
private static final Pattern NOT_FOUND_MESSAGE_PATTERN = Pattern.compile("(no such)|(not exist)|(not found)", Pattern.CASE_INSENSITIVE);
private static final FTPClientProvider FTP_CLIENT_PROVIDER = new StandardFTPClientProvider();
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS_AUTH};
@ -321,13 +325,22 @@ public class FTPTransfer implements FileTransfer {
FlowFile resultFlowFile;
try (InputStream in = client.retrieveFileStream(remoteFileName)) {
if (in == null) {
final String response = client.getReplyString();
// FTPClient doesn't throw exception if file not found.
// Instead, response string will contain: "550 Can't open <absolute_path>: No such file or directory"
if (response != null && response.trim().endsWith("No such file or directory")) {
throw new FileNotFoundException(response);
final String reply = client.getReplyString();
if (reply == null) {
throw new IOException("Retrieve File Failed: FTP server response not found");
}
throw new IOException(response);
// Get reply code after checking for reply string
final int replyCode = client.getReplyCode();
if (REPLY_CODE_FILE_UNAVAILABLE == replyCode) {
if (NOT_FOUND_MESSAGE_PATTERN.matcher(reply).find()) {
throw new FileNotFoundException(reply);
} else {
throw new PermissionDeniedException(reply);
}
}
throw new IOException(reply);
}
resultFlowFile = session.write(origFlowFile, out -> StreamUtils.copy(in, out));
client.completePendingCommand();
@ -606,5 +619,4 @@ public class FTPTransfer implements FileTransfer {
return componentProxyConfig;
};
}
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.FileInputStream;
import java.io.IOException;
@ -45,6 +45,7 @@ import org.mockftpserver.fake.UserAccount;
import org.mockftpserver.fake.filesystem.DirectoryEntry;
import org.mockftpserver.fake.filesystem.FileEntry;
import org.mockftpserver.fake.filesystem.FileSystem;
import org.mockftpserver.fake.filesystem.Permissions;
import org.mockftpserver.fake.filesystem.WindowsFakeFileSystem;
public class TestFTP {
@ -80,7 +81,6 @@ public class TestFTP {
ProcessContext pc;
/* Set the basic required values */
results = new HashSet<>();
runner.setProperty(FTPTransfer.USERNAME, "${el-username}");
runner.setProperty(FTPTransfer.HOSTNAME, "static-hostname");
runner.setProperty(FTPTransfer.PORT, "${el-portNumber}");
@ -132,8 +132,8 @@ public class TestFTP {
runner.setProperty(FTPTransfer.USERNAME, username);
runner.setProperty(FTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort));
try (FileInputStream fis = new FileInputStream("src/test/resources/randombytes-1");) {
Map<String, String> attributes = new HashMap<String, String>();
try (FileInputStream fis = new FileInputStream("src/test/resources/randombytes-1")) {
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
runner.enqueue(fis, attributes);
runner.run();
@ -157,14 +157,14 @@ public class TestFTP {
// Get two flowfiles to test by running data
try (FileInputStream fis = new FileInputStream("src/test/resources/randombytes-1")) {
Map<String, String> attributes = new HashMap<String, String>();
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
attributes.put("transfer-host", "localhost");
runner.enqueue(fis, attributes);
runner.run();
}
try (FileInputStream fis = new FileInputStream("src/test/resources/hello.txt")) {
Map<String, String> attributes = new HashMap<String, String>();
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "hello.txt");
attributes.put("transfer-host", "127.0.0.1");
runner.enqueue(fis, attributes);
@ -198,7 +198,7 @@ public class TestFTP {
}
@Test
public void basicFileGet() throws IOException {
public void basicFileGet() {
FileSystem results = fakeFtpServer.getFileSystem();
FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
@ -222,7 +222,7 @@ public class TestFTP {
}
@Test
public void basicFileFetch() throws IOException {
public void basicFileFetch() {
FileSystem results = fakeFtpServer.getFileSystem();
FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
@ -242,7 +242,7 @@ public class TestFTP {
runner.setProperty(FetchFTP.MOVE_DESTINATION_DIR, "data");
Map<String, String> attrs = new HashMap<String, String>();
Map<String, String> attrs = new HashMap<>();
attrs.put("host", "localhost");
attrs.put("username", username);
attrs.put("port", Integer.toString(ftpPort));
@ -254,10 +254,46 @@ public class TestFTP {
retrievedFile.assertContentEquals("Just some random test test test chocolate");
}
@Test
public void testFetchFileNotFound() {
final TestRunner runner = TestRunners.newTestRunner(FetchFTP.class);
runner.setProperty(FetchFTP.HOSTNAME, "127.0.0.1");
runner.setProperty(FetchFTP.USERNAME, username);
runner.setProperty(FTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort));
runner.setProperty(FetchFTP.REMOTE_FILENAME, "remote-file-not-found");
runner.enqueue(new byte[0]);
runner.run();
runner.assertAllFlowFilesTransferred(FetchFTP.REL_NOT_FOUND);
}
@Test
public void testFetchFilePermissionDenied() {
final FileSystem fs = fakeFtpServer.getFileSystem();
final FileEntry restrictedFileEntry = new FileEntry("c:\\data\\restricted");
restrictedFileEntry.setPermissions(Permissions.NONE);
fs.add(restrictedFileEntry);
final TestRunner runner = TestRunners.newTestRunner(FetchFTP.class);
runner.setProperty(FetchFTP.HOSTNAME, "127.0.0.1");
runner.setProperty(FetchFTP.USERNAME, username);
runner.setProperty(FTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort));
runner.setProperty(FetchFTP.REMOTE_FILENAME, "restricted");
runner.enqueue(new byte[0]);
runner.run();
runner.assertAllFlowFilesTransferred(FetchFTP.REL_PERMISSION_DENIED);
}
@Test
@EnabledIfSystemProperty(named = "file.encoding", matches = "UTF-8",
disabledReason = "org.mockftpserver does not support specification of charset")
public void basicFileFetchWithUTF8FileName() throws IOException {
public void basicFileFetchWithUTF8FileName() {
FileSystem fs = fakeFtpServer.getFileSystem();
FileEntry sampleFile = new FileEntry("c:\\data\\őűőű.txt");
@ -284,7 +320,7 @@ public class TestFTP {
}
@Test
public void basicFileList() throws IOException, InterruptedException {
public void basicFileList() throws InterruptedException {
FileSystem results = fakeFtpServer.getFileSystem();
FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");