NIFI-6242 PutFileTransfer generating incorrect provenance event

This commit is contained in:
tlsmith 2020-10-23 18:10:40 +00:00 committed by markap14
parent 954c09d0c3
commit 7214dc0f85
3 changed files with 104 additions and 1 deletions

View File

@ -95,12 +95,16 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
} }
final ComponentLog logger = getLogger(); final ComponentLog logger = getLogger();
final String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
final int maxNumberOfFiles = context.getProperty(FileTransfer.BATCH_SIZE).asInteger(); final int maxNumberOfFiles = context.getProperty(FileTransfer.BATCH_SIZE).asInteger();
int fileCount = 0; int fileCount = 0;
try (final T transfer = getFileTransfer(context)) { try (final T transfer = getFileTransfer(context)) {
do { do {
//check if hostname is regular expression requiring evaluation
if(context.getProperty(FileTransfer.HOSTNAME).isExpressionLanguagePresent()) {
hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
}
final String rootPath = context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue(); final String rootPath = context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue();
final String workingDirPath; final String workingDirPath;
if (StringUtils.isBlank(rootPath)) { if (StringUtils.isBlank(rootPath)) {

View File

@ -145,6 +145,57 @@ public class TestFTP {
Assert.assertTrue(results.exists("c:\\data\\randombytes-1")); Assert.assertTrue(results.exists("c:\\data\\randombytes-1"));
} }
@Test
public void basicProvenanceEventTest() throws IOException {
TestRunner runner = TestRunners.newTestRunner(PutFTP.class);
runner.setProperty(FTPTransfer.HOSTNAME, "localhost");
runner.setProperty(FTPTransfer.USERNAME, username);
runner.setProperty(FTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort));
// Get two flowfiles to test by running data
try (FileInputStream fis = new FileInputStream("src/test/resources/randombytes-1")) {
Map<String, String> attributes = new HashMap<String, String>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
attributes.put("transfer-host", "localhost");
runner.enqueue(fis, attributes);
runner.run();
}
try (FileInputStream fis = new FileInputStream("src/test/resources/hello.txt")) {
Map<String, String> attributes = new HashMap<String, String>();
attributes.put(CoreAttributes.FILENAME.key(), "hello.txt");
attributes.put("transfer-host", "127.0.0.1");
runner.enqueue(fis, attributes);
runner.run();
}
runner.assertQueueEmpty();
runner.assertTransferCount(PutFTP.REL_SUCCESS, 2);
MockFlowFile flowFile1 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(0);
MockFlowFile flowFile2 = runner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(1);
runner.clearProvenanceEvents();
runner.clearTransferState();
HashMap<String, String> map1 = new HashMap<>();
HashMap<String, String> map2 = new HashMap<>();
map1.put(CoreAttributes.FILENAME.key(), "randombytes-xx");
map2.put(CoreAttributes.FILENAME.key(), "randombytes-yy");
flowFile1.putAttributes(map1);
flowFile2.putAttributes(map2);
//set to derive hostname
runner.setProperty(FTPTransfer.HOSTNAME, "${transfer-host}");
runner.setThreadCount(1);
runner.enqueue(flowFile1);
runner.enqueue(flowFile2);
runner.run();
runner.assertTransferCount(PutFTP.REL_SUCCESS, 2);
assert(runner.getProvenanceEvents().get(0).getTransitUri().contains("ftp://localhost"));
assert(runner.getProvenanceEvents().get(1).getTransitUri().contains("ftp://127.0.0.1"));
}
@Test @Test
public void basicFileGet() throws IOException { public void basicFileGet() throws IOException {
FileSystem results = fakeFtpServer.getFileSystem(); FileSystem results = fakeFtpServer.getFileSystem();

View File

@ -20,6 +20,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer;
import org.apache.nifi.processors.standard.util.SSHTestServer; import org.apache.nifi.processors.standard.util.SSHTestServer;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -243,9 +244,56 @@ public class TestPutSFTP {
putSFTPRunner.clearTransferState(); putSFTPRunner.clearTransferState();
} }
@Test
public void testPutSFTPProvenanceTransitUri() throws IOException {
emptyTestDirectory();
putSFTPRunner.setProperty(SFTPTransfer.REJECT_ZERO_BYTE, "false");
Map<String,String> attributes = new HashMap<>();
attributes.put("filename", "testfile.txt");
attributes.put("transfer-host","localhost");
putSFTPRunner.enqueue(Paths.get(testFile), attributes);
attributes = new HashMap<>();
attributes.put("filename", "testfile1.txt");
attributes.put("transfer-host","127.0.0.1");
putSFTPRunner.enqueue(Paths.get(testFile), attributes);
putSFTPRunner.run();
putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
putSFTPRunner.getProvenanceEvents().forEach(k->{
assert(k.getTransitUri().contains("sftp://localhost"));
});
//Two files in batch, should have 2 transferred to success, 0 to failure
putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
putSFTPRunner.assertTransferCount(PutSFTP.REL_REJECT, 0);
MockFlowFile flowFile1 = putSFTPRunner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(0);
MockFlowFile flowFile2 = putSFTPRunner.getFlowFilesForRelationship(PutFileTransfer.REL_SUCCESS).get(1);
putSFTPRunner.clearProvenanceEvents();
putSFTPRunner.clearTransferState();
//Test different destinations on flow file attributes
putSFTPRunner.setProperty(SFTPTransfer.HOSTNAME,"${transfer-host}"); //set to derive hostname
putSFTPRunner.setThreadCount(1);
putSFTPRunner.enqueue(flowFile1);
putSFTPRunner.enqueue(flowFile2);
putSFTPRunner.run();
putSFTPRunner.assertTransferCount(PutSFTP.REL_SUCCESS, 2);
assert(putSFTPRunner.getProvenanceEvents().get(0).getTransitUri().contains("sftp://localhost"));
assert(putSFTPRunner.getProvenanceEvents().get(1).getTransitUri().contains("sftp://127.0.0.1"));
putSFTPRunner.clearProvenanceEvents();
putSFTPRunner.clearTransferState();
}
private void emptyTestDirectory() throws IOException { private void emptyTestDirectory() throws IOException {
//Delete Virtual File System folder //Delete Virtual File System folder
Path dir = Paths.get(sshTestServer.getVirtualFileSystemPath()); Path dir = Paths.get(sshTestServer.getVirtualFileSystemPath());
FileUtils.cleanDirectory(dir.toFile()); FileUtils.cleanDirectory(dir.toFile());
} }
} }