NIFI-1416: If FetchSFTP's Remote File has a directory name in it, do not include that as part of the 'filename' attribute but instead add a 'path' attribute

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Mark Payne 2016-01-20 12:22:56 -05:00 committed by Aldrin Piri
parent b25db650fd
commit 92e6961b50
3 changed files with 36 additions and 1 deletions

View File

@ -274,7 +274,15 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
attributes.put(protocolName + ".remote.host", host); attributes.put(protocolName + ".remote.host", host);
attributes.put(protocolName + ".remote.port", String.valueOf(port)); attributes.put(protocolName + ".remote.port", String.valueOf(port));
attributes.put(protocolName + ".remote.filename", filename); attributes.put(protocolName + ".remote.filename", filename);
if (filename.contains("/")) {
final String path = StringUtils.substringBeforeLast(filename, "/");
final String filenameOnly = StringUtils.substringAfterLast(filename, "/");
attributes.put(CoreAttributes.PATH.key(), path);
attributes.put(CoreAttributes.FILENAME.key(), filenameOnly);
} else {
attributes.put(CoreAttributes.FILENAME.key(), filename); attributes.put(CoreAttributes.FILENAME.key(), filename);
}
flowFile = session.putAllAttributes(flowFile, attributes); flowFile = session.putAllAttributes(flowFile, attributes);
// emit provenance event and transfer FlowFile // emit provenance event and transfer FlowFile

View File

@ -46,6 +46,7 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
@WritesAttribute(attribute = "sftp.remote.port", description = "The port that was used to communicate with the remote SFTP server"), @WritesAttribute(attribute = "sftp.remote.port", description = "The port that was used to communicate with the remote SFTP server"),
@WritesAttribute(attribute = "sftp.remote.filename", description = "The name of the remote file that was pulled"), @WritesAttribute(attribute = "sftp.remote.filename", description = "The name of the remote file that was pulled"),
@WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"), @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"),
@WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute")
}) })
public class FetchSFTP extends FetchFileTransfer { public class FetchSFTP extends FetchFileTransfer {

View File

@ -31,10 +31,12 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileInfo; import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.PermissionDeniedException; import org.apache.nifi.processors.standard.util.PermissionDeniedException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Test; import org.junit.Test;
@ -60,6 +62,30 @@ public class TestFetchFileTransfer {
runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world"); runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world");
} }
@Test
public void testFilenameContainsPath() {
final String filenameWithPath = "./here/is/my/path/hello.txt";
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
proc.addContent(filenameWithPath, "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", filenameWithPath);
runner.enqueue(new byte[0], attrs);
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
assertFalse(proc.closed);
MockFlowFile transferredFlowFile = runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0);
transferredFlowFile.assertContentEquals("world");
transferredFlowFile.assertAttributeExists(CoreAttributes.PATH.key());
transferredFlowFile.assertAttributeEquals(CoreAttributes.PATH.key(), "./here/is/my/path");
}
@Test @Test
public void testContentNotFound() { public void testContentNotFound() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();