diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index 6182b0aa84..0023c4b2f5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -239,9 +239,13 @@ public abstract class FetchFileTransfer extends AbstractProcessor { @Override public void process(final OutputStream out) throws IOException { StreamUtils.copy(in, out); - transfer.flush(); } }); + + if(!transfer.flush(flowFile)) { + throw new IOException("completePendingCommand returned false, file transfer failed"); + } + transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime())); } catch (final FileNotFoundException e) { getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", @@ -297,7 +301,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor { final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue(); if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) { try { - transfer.deleteFile(null, filename); + transfer.deleteFile(flowFile, null, filename); } catch (final FileNotFoundException e) { // file doesn't exist -- effectively the same as removing it. Move on. } catch (final IOException ioe) { @@ -313,7 +317,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor { final String target = targetDir + simpleFilename; try { - transfer.rename(filename, target); + transfer.rename(flowFile, filename, target); } catch (final IOException ioe) { getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}", new Object[] {flowFile, host, port, filename, ioe}, ioe); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java index 00dfccf048..4ce31de485 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java @@ -208,7 +208,7 @@ public abstract class GetFileTransfer extends AbstractProcessor { if (deleteOriginal) { try { - transfer.deleteFile(null, file.getFullPathFileName()); + transfer.deleteFile(flowFile, null, file.getFullPathFileName()); } catch (final IOException e) { logger.error("Failed to remove remote file {} due to {}; deleting local copy", new Object[]{file.getFullPathFileName(), e}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java index 58e443a6f5..3f35c4e399 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java @@ -94,7 +94,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor { @Override protected String getPath(final ProcessContext context) { - return context.getProperty(REMOTE_PATH).getValue(); + return context.getProperty(REMOTE_PATH).evaluateAttributeExpressions().getValue(); } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java index 054d1d871b..cbaa9ecaab 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java @@ -230,7 +230,7 @@ public abstract class PutFileTransfer extends AbstractPr logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile}); break; case FileTransfer.CONFLICT_RESOLUTION_REPLACE: - transfer.deleteFile(path, fileName); + transfer.deleteFile(flowFile, path, fileName); destinationRelationship = REL_SUCCESS; transferFile = true; penalizeFile = false; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java index b64a6f88d1..aeec6c3fef 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java @@ -289,7 +289,7 @@ public class FTPTransfer implements FileTransfer { @Override public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException { - final FTPClient client = getClient(null); + final FTPClient client = getClient(flowFile); InputStream in = client.retrieveFileStream(remoteFileName); if (in == null) { throw new IOException(client.getReplyString()); @@ -303,6 +303,11 @@ public class FTPTransfer implements FileTransfer { client.completePendingCommand(); } + @Override + public boolean flush(final FlowFile flowFile) throws IOException { + return getClient(flowFile).completePendingCommand(); + } + @Override public FileInfo getRemoteFileInfo(final FlowFile flowFile, String path, String remoteFileName) throws IOException { final FTPClient client = getClient(flowFile); @@ -444,8 +449,8 @@ public class FTPTransfer implements FileTransfer { @Override - public void rename(final String source, final String target) throws IOException { - final FTPClient client = getClient(null); + public void rename(final FlowFile flowFile, final String source, final String target) throws IOException { + final FTPClient client = getClient(flowFile); final boolean renameSuccessful = client.rename(source, target); if (!renameSuccessful) { throw new IOException("Failed to rename temporary file " + source + " to " + target + " due to: " + client.getReplyString()); @@ -453,8 +458,8 @@ public class FTPTransfer implements FileTransfer { } @Override - public void deleteFile(final String path, final String remoteFileName) throws IOException { - final FTPClient client = getClient(null); + public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException { + final FTPClient client = getClient(flowFile); if (path != null) { setWorkingDirectory(path); } @@ -464,8 +469,8 @@ public class FTPTransfer implements FileTransfer { } @Override - public void deleteDirectory(final String remoteDirectoryName) throws IOException { - final FTPClient client = getClient(null); + public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException { + final FTPClient client = getClient(flowFile); final boolean success = client.removeDirectory(remoteDirectoryName); if (!success) { throw new IOException("Failed to remove directory " + remoteDirectoryName + " due to " + client.getReplyString()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java index 22d9ec52ea..ac7f728ea0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java @@ -39,15 +39,17 @@ public interface FileTransfer extends Closeable { void flush() throws IOException; + boolean flush(FlowFile flowFile) throws IOException; + FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException; String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException; - void rename(String source, String target) throws IOException; + void rename(FlowFile flowFile, String source, String target) throws IOException; - void deleteFile(String path, String remoteFileName) throws IOException; + void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException; - void deleteDirectory(String remoteDirectoryName) throws IOException; + void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException; boolean isClosed(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index a6a9e4b70b..bc31ba967c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -309,7 +309,12 @@ public class SFTPTransfer implements FileTransfer { } @Override - public void deleteFile(final String path, final String remoteFileName) throws IOException { + public boolean flush(final FlowFile flowFile) throws IOException { + return true; + } + + @Override + public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException { final String fullPath = (path == null) ? remoteFileName : (path.endsWith("/")) ? path + remoteFileName : path + "/" + remoteFileName; try { sftp.rm(fullPath); @@ -326,7 +331,7 @@ public class SFTPTransfer implements FileTransfer { } @Override - public void deleteDirectory(final String remoteDirectoryName) throws IOException { + public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException { try { sftp.rm(remoteDirectoryName); } catch (final SftpException e) { @@ -613,8 +618,8 @@ public class SFTPTransfer implements FileTransfer { } @Override - public void rename(final String source, final String target) throws IOException { - final ChannelSftp sftp = getChannel(null); + public void rename(final FlowFile flowFile, final String source, final String target) throws IOException { + final ChannelSftp sftp = getChannel(flowFile); try { sftp.rename(source, target); } catch (final SftpException e) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java index 102931f28c..d8797dc8a9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java @@ -36,7 +36,6 @@ import org.mockftpserver.fake.filesystem.FileSystem; import org.mockftpserver.fake.filesystem.WindowsFakeFileSystem; import java.io.FileInputStream; - import java.io.IOException; import java.util.Collection; import java.util.HashMap; @@ -167,4 +166,71 @@ public class TestFTP { final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(GetFTP.REL_SUCCESS).get(0); retrievedFile.assertContentEquals("Just some random test test test chocolate"); } + + @Test + public void basicFileFetch() throws IOException { + FileSystem results = fakeFtpServer.getFileSystem(); + + FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2"); + sampleFile.setContents("Just some random test test test chocolate"); + results.add(sampleFile); + + // Check file exists + Assert.assertTrue(results.exists("c:\\data\\randombytes-2")); + + TestRunner runner = TestRunners.newTestRunner(FetchFTP.class); + runner.setProperty(FetchFTP.HOSTNAME, "${host}"); + runner.setProperty(FetchFTP.USERNAME, "${username}"); + runner.setProperty(FTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, "${port}"); + runner.setProperty(FetchFTP.REMOTE_FILENAME, "c:\\data\\randombytes-2"); + runner.setProperty(FetchFTP.COMPLETION_STRATEGY, FetchFTP.COMPLETION_MOVE); + runner.setProperty(FetchFTP.MOVE_DESTINATION_DIR, "data"); + + + Map attrs = new HashMap(); + attrs.put("host", "localhost"); + attrs.put("username", username); + attrs.put("port", Integer.toString(ftpPort)); + runner.enqueue("", attrs); + + runner.run(); + + final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0); + retrievedFile.assertContentEquals("Just some random test test test chocolate"); + } + + @Test + public void basicFileList() throws IOException { + FileSystem results = fakeFtpServer.getFileSystem(); + + FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2"); + sampleFile.setContents("Just some random test test test chocolate"); + results.add(sampleFile); + + // Check file exists + Assert.assertTrue(results.exists("c:\\data\\randombytes-2")); + + TestRunner runner = TestRunners.newTestRunner(ListFTP.class); + runner.setProperty(ListFTP.HOSTNAME, "localhost"); + runner.setProperty(ListFTP.USERNAME, username); + runner.setProperty(FTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort)); + runner.setProperty(ListFTP.REMOTE_PATH, "/"); + runner.assertValid(); + + runner.run(); + + final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0); + runner.assertAllFlowFilesContainAttribute("ftp.remote.host"); + runner.assertAllFlowFilesContainAttribute("ftp.remote.port"); + runner.assertAllFlowFilesContainAttribute("ftp.listing.user"); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_OWNER_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_GROUP_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE); + retrievedFile.assertAttributeEquals("ftp.listing.user", username); + retrievedFile.assertAttributeEquals("filename", "randombytes-2"); + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java index 2b78a4b09f..4965893e36 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java @@ -281,6 +281,11 @@ public class TestFetchFileTransfer { public void flush() throws IOException { } + @Override + public boolean flush(FlowFile flowFile) throws IOException { + return true; + } + @Override public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException { return null; @@ -292,7 +297,7 @@ public class TestFetchFileTransfer { } @Override - public void deleteFile(String path, String remoteFileName) throws IOException { + public void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException { if (!allowDelete) { throw new PermissionDeniedException("test permission denied"); } @@ -305,7 +310,7 @@ public class TestFetchFileTransfer { } @Override - public void rename(String source, String target) throws IOException { + public void rename(FlowFile flowFile, String source, String target) throws IOException { if (!allowRename) { throw new PermissionDeniedException("test permission denied"); } @@ -319,7 +324,7 @@ public class TestFetchFileTransfer { } @Override - public void deleteDirectory(String remoteDirectoryName) throws IOException { + public void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException { }