From d7e41e2005724fb5aed0e5dbab5a3036c45a85bd Mon Sep 17 00:00:00 2001 From: Lehel Date: Mon, 25 Oct 2021 22:29:32 +0200 Subject: [PATCH] NIFI-9328: Transfer cleanup and reuse added to FetchFileTransfer in case of FileNotFound and PermissionDenied exceptions. - Corrects connection handling in FetchFTP and FetchSFTP This closes #5478 Signed-off-by: David Handermann --- .../standard/FetchFileTransfer.java | 22 +++---- .../processors/standard/TestFetchFTP.java | 59 +++++++++++++++++-- 2 files changed, 63 insertions(+), 18 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index e7ea4918be..2d38116f5f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -254,31 +254,30 @@ public abstract class FetchFileTransfer extends AbstractProcessor { transfer = transferWrapper.getFileTransfer(); } - boolean closeConnection = false; try { // Pull data from remote system. try { flowFile = transfer.getRemoteFile(filename, flowFile, session); } catch (final FileNotFoundException e) { - closeConnection = false; getLogger().log(levelFileNotFound, "Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", - new Object[]{flowFile, filename, host, REL_NOT_FOUND.getName()}); + flowFile, filename, host, REL_NOT_FOUND.getName()); session.transfer(session.penalize(flowFile), REL_NOT_FOUND); session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND); + cleanupTransfer(transfer, false, transferQueue, host, port); return; } catch (final PermissionDeniedException e) { - closeConnection = false; getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}", - new Object[]{flowFile, filename, host, REL_PERMISSION_DENIED.getName()}); + flowFile, filename, host, REL_PERMISSION_DENIED.getName()); session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED); session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED); + cleanupTransfer(transfer, false, transferQueue, host, port); return; } catch (final ProcessException | IOException e) { - closeConnection = true; getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure", new Object[]{flowFile, filename, host, port, e.toString()}, e); session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE); + cleanupTransfer(transfer, true, transferQueue, host, port); return; } @@ -306,22 +305,17 @@ public abstract class FetchFileTransfer extends AbstractProcessor { // it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where // we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would // result in data loss! If we commit the session first, we are safe. - final boolean close = closeConnection; final BlockingQueue queue = transferQueue; - final Runnable cleanupTask = () -> cleanupTransfer(transfer, close, queue, host, port); + final Runnable cleanupTask = () -> cleanupTransfer(transfer, false, queue, host, port); final FlowFile flowFileReceived = flowFile; session.commitAsync(() -> { performCompletionStrategy(transfer, context, flowFileReceived, filename, host, port); cleanupTask.run(); - }, t -> { - cleanupTask.run(); - }); + }, t -> cleanupTask.run()); } catch (final Throwable t) { getLogger().error("Failed to fetch file", t); - if (transfer != null) { - cleanupTransfer(transfer, closeConnection, transferQueue, host, port); - } + cleanupTransfer(transfer, true, transferQueue, host, port); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java index 34c26c0d3d..1ee337de03 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java @@ -91,7 +91,7 @@ public class TestFetchFTP { runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); - assertFalse(proc.closed); + assertFalse(proc.isClosed); runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world"); } @@ -101,7 +101,7 @@ public class TestFetchFTP { runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); - assertFalse(proc.closed); + assertFalse(proc.isClosed); MockFlowFile transferredFlowFile = runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0); transferredFlowFile.assertContentEquals("world"); transferredFlowFile.assertAttributeExists(CoreAttributes.PATH.key()); @@ -138,6 +138,42 @@ public class TestFetchFTP { runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1); } + @Test + public void testInsufficientPermissionsDoesNotCloseConnection() { + addFileAndEnqueue("hello1.txt"); + addFileAndEnqueue("hello2.txt"); + proc.allowAccess = false; + + runner.run(2, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 2); + + assertEquals(1, proc.numberOfFileTransfers); + assertFalse(proc.isClosed); + } + + @Test + public void testFileNotFoundDoesNotCloseConnection() { + addFileAndEnqueue("hello1.txt"); + addFileAndEnqueue("hello2.txt"); + proc.isFileNotFound = true; + + runner.run(2, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 2); + + assertEquals(1, proc.numberOfFileTransfers); + assertFalse(proc.isClosed); + } + + @Test + public void testCommunicationFailureClosesConnection() { + addFileAndEnqueue("hello.txt"); + proc.isCommFailure = true; + + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_COMMS_FAILURE, 1); + + assertTrue(proc.isClosed); + } @Test public void testMoveFileWithNoTrailingSlashDirName() { @@ -230,7 +266,10 @@ public class TestFetchFTP { private boolean allowDelete = true; private boolean allowCreateDir = true; private boolean allowRename = true; - private boolean closed = false; + private boolean isClosed = false; + private boolean isFileNotFound = false; + private boolean isCommFailure = false; + private int numberOfFileTransfers = 0; private final Map fileContents = new HashMap<>(); private final FTPClient mockFtpClient = Mockito.mock(FTPClient.class); @@ -254,6 +293,7 @@ public class TestFetchFTP { @Override protected FileTransfer createFileTransfer(final ProcessContext context) { + numberOfFileTransfers++; return new FTPTransfer(context, getLogger()) { @Override @@ -266,7 +306,12 @@ public class TestFetchFTP { if (!allowAccess) { throw new PermissionDeniedException("test permission denied"); } - + if (isFileNotFound) { + throw new FileNotFoundException("test file not found"); + } + if (isCommFailure) { + throw new IOException("test communication failure"); + } return super.getRemoteFile(remoteFileName, flowFile, session); } @@ -303,6 +348,12 @@ public class TestFetchFTP { throw new PermissionDeniedException("test permission denied"); } } + + @Override + public void close() throws IOException { + super.close(); + isClosed = true; + } }; } }