NIFI-9328: Transfer cleanup and reuse added to FetchFileTransfer in case of FileNotFound and PermissionDenied exceptions.

- Corrects connection handling in FetchFTP and FetchSFTP

This closes #5478

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Lehel 2021-10-25 22:29:32 +02:00 committed by exceptionfactory
parent 1191e511a5
commit d7e41e2005
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 63 additions and 18 deletions

View File

@ -254,31 +254,30 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
transfer = transferWrapper.getFileTransfer();
}
boolean closeConnection = false;
try {
// Pull data from remote system.
try {
flowFile = transfer.getRemoteFile(filename, flowFile, session);
} catch (final FileNotFoundException e) {
closeConnection = false;
getLogger().log(levelFileNotFound, "Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
new Object[]{flowFile, filename, host, REL_NOT_FOUND.getName()});
flowFile, filename, host, REL_NOT_FOUND.getName());
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
cleanupTransfer(transfer, false, transferQueue, host, port);
return;
} catch (final PermissionDeniedException e) {
closeConnection = false;
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}",
new Object[]{flowFile, filename, host, REL_PERMISSION_DENIED.getName()});
flowFile, filename, host, REL_PERMISSION_DENIED.getName());
session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);
session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
cleanupTransfer(transfer, false, transferQueue, host, port);
return;
} catch (final ProcessException | IOException e) {
closeConnection = true;
getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure",
new Object[]{flowFile, filename, host, port, e.toString()}, e);
session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
cleanupTransfer(transfer, true, transferQueue, host, port);
return;
}
@ -306,22 +305,17 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
// it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
// we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would
// result in data loss! If we commit the session first, we are safe.
final boolean close = closeConnection;
final BlockingQueue<FileTransferIdleWrapper> queue = transferQueue;
final Runnable cleanupTask = () -> cleanupTransfer(transfer, close, queue, host, port);
final Runnable cleanupTask = () -> cleanupTransfer(transfer, false, queue, host, port);
final FlowFile flowFileReceived = flowFile;
session.commitAsync(() -> {
performCompletionStrategy(transfer, context, flowFileReceived, filename, host, port);
cleanupTask.run();
}, t -> {
cleanupTask.run();
});
}, t -> cleanupTask.run());
} catch (final Throwable t) {
getLogger().error("Failed to fetch file", t);
if (transfer != null) {
cleanupTransfer(transfer, closeConnection, transferQueue, host, port);
}
cleanupTransfer(transfer, true, transferQueue, host, port);
}
}

View File

@ -91,7 +91,7 @@ public class TestFetchFTP {
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
assertFalse(proc.closed);
assertFalse(proc.isClosed);
runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world");
}
@ -101,7 +101,7 @@ public class TestFetchFTP {
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
assertFalse(proc.closed);
assertFalse(proc.isClosed);
MockFlowFile transferredFlowFile = runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0);
transferredFlowFile.assertContentEquals("world");
transferredFlowFile.assertAttributeExists(CoreAttributes.PATH.key());
@ -138,6 +138,42 @@ public class TestFetchFTP {
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
}
@Test
public void testInsufficientPermissionsDoesNotCloseConnection() {
addFileAndEnqueue("hello1.txt");
addFileAndEnqueue("hello2.txt");
proc.allowAccess = false;
runner.run(2, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 2);
assertEquals(1, proc.numberOfFileTransfers);
assertFalse(proc.isClosed);
}
@Test
public void testFileNotFoundDoesNotCloseConnection() {
addFileAndEnqueue("hello1.txt");
addFileAndEnqueue("hello2.txt");
proc.isFileNotFound = true;
runner.run(2, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 2);
assertEquals(1, proc.numberOfFileTransfers);
assertFalse(proc.isClosed);
}
@Test
public void testCommunicationFailureClosesConnection() {
addFileAndEnqueue("hello.txt");
proc.isCommFailure = true;
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_COMMS_FAILURE, 1);
assertTrue(proc.isClosed);
}
@Test
public void testMoveFileWithNoTrailingSlashDirName() {
@ -230,7 +266,10 @@ public class TestFetchFTP {
private boolean allowDelete = true;
private boolean allowCreateDir = true;
private boolean allowRename = true;
private boolean closed = false;
private boolean isClosed = false;
private boolean isFileNotFound = false;
private boolean isCommFailure = false;
private int numberOfFileTransfers = 0;
private final Map<String, byte[]> fileContents = new HashMap<>();
private final FTPClient mockFtpClient = Mockito.mock(FTPClient.class);
@ -254,6 +293,7 @@ public class TestFetchFTP {
@Override
protected FileTransfer createFileTransfer(final ProcessContext context) {
numberOfFileTransfers++;
return new FTPTransfer(context, getLogger()) {
@Override
@ -266,7 +306,12 @@ public class TestFetchFTP {
if (!allowAccess) {
throw new PermissionDeniedException("test permission denied");
}
if (isFileNotFound) {
throw new FileNotFoundException("test file not found");
}
if (isCommFailure) {
throw new IOException("test communication failure");
}
return super.getRemoteFile(remoteFileName, flowFile, session);
}
@ -303,6 +348,12 @@ public class TestFetchFTP {
throw new PermissionDeniedException("test permission denied");
}
}
@Override
public void close() throws IOException {
super.close();
isClosed = true;
}
};
}
}