diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index b7e8f61a76..b975548538 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -31,19 +31,15 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.PermissionDeniedException; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.Tuple; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -261,20 +257,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor { boolean closeConnection = false; try { // Pull data from remote system. - final InputStream in; try { - in = transfer.getInputStream(filename, flowFile); - - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - StreamUtils.copy(in, out); - } - }); - - if (!transfer.flush(flowFile)) { - throw new IOException("completePendingCommand returned false, file transfer failed"); - } + flowFile = transfer.getRemoteFile(filename, flowFile, session); } catch (final FileNotFoundException e) { closeConnection = false; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java index 4ce31de485..8ac5a27eec 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java @@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.nio.file.Path; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -189,12 +188,9 @@ public abstract class GetFileTransfer extends AbstractProcessor { try { FlowFile flowFile = session.create(); final StopWatch stopWatch = new StopWatch(false); - try (final InputStream in = transfer.getInputStream(file.getFullPathFileName())) { - stopWatch.start(); - flowFile = session.importFrom(in, flowFile); - stopWatch.stop(); - } - transfer.flush(); + stopWatch.start(); + flowFile = transfer.getRemoteFile(file.getFullPathFileName(), flowFile, session); + stopWatch.stop(); final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); final String dataRate = stopWatch.calculateDataRate(flowFile.getSize()); flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", hostname); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java index 6a2b2db84d..e7d96c60fb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Proxy; @@ -50,10 +51,13 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxySpec; +import org.apache.nifi.stream.io.StreamUtils; public class FTPTransfer implements FileTransfer { @@ -314,35 +318,39 @@ public class FTPTransfer implements FileTransfer { } @Override - public InputStream getInputStream(String remoteFileName) throws IOException { - return getInputStream(remoteFileName, null); - } - - @Override - public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException { - final FTPClient client = getClient(flowFile); - InputStream in = client.retrieveFileStream(remoteFileName); - if (in == null) { - final String response = client.getReplyString(); - // FTPClient doesn't throw exception if file not found. - // Instead, response string will contain: "550 Can't open : No such file or directory" - if (response != null && response.trim().endsWith("No such file or directory")){ - throw new FileNotFoundException(response); + public FlowFile getRemoteFile(final String remoteFileName, final FlowFile origFlowFile, final ProcessSession session) throws ProcessException, IOException { + final FTPClient client = getClient(origFlowFile); + InputStream in = null; + FlowFile resultFlowFile = null; + try { + in = client.retrieveFileStream(remoteFileName); + if (in == null) { + final String response = client.getReplyString(); + // FTPClient doesn't throw exception if file not found. + // Instead, response string will contain: "550 Can't open : No such file or directory" + if (response != null && response.trim().endsWith("No such file or directory")) { + throw new FileNotFoundException(response); + } + throw new IOException(response); + } + final InputStream remoteIn = in; + resultFlowFile = session.write(origFlowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + StreamUtils.copy(remoteIn, out); + } + }); + client.completePendingCommand(); + return resultFlowFile; + } finally { + if(in != null){ + try{ + in.close(); + }catch(final IOException ioe){ + //do nothing + } } - throw new IOException(response); } - return in; - } - - @Override - public void flush() throws IOException { - final FTPClient client = getClient(null); - client.completePendingCommand(); - } - - @Override - public boolean flush(final FlowFile flowFile) throws IOException { - return getClient(flowFile).completePendingCommand(); } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java index 56dd22d0e4..64bb1306fe 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java @@ -26,6 +26,8 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; public interface FileTransfer extends Closeable { @@ -34,13 +36,7 @@ public interface FileTransfer extends Closeable { List getListing() throws IOException; - InputStream getInputStream(String remoteFileName) throws IOException; - - InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException; - - void flush() throws IOException; - - boolean flush(FlowFile flowFile) throws IOException; + FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException; FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index 465bdde191..3a341bc10c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -45,18 +45,23 @@ import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxySpec; +import org.apache.nifi.stream.io.StreamUtils; import javax.net.SocketFactory; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.InetAddress; import java.net.Proxy; import java.net.Socket; @@ -346,19 +351,22 @@ public class SFTPTransfer implements FileTransfer { } @Override - public InputStream getInputStream(final String remoteFileName) throws IOException { - return getInputStream(remoteFileName, null); - } - - @Override - public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException { - final SFTPClient sftpClient = getSFTPClient(flowFile); + public FlowFile getRemoteFile(final String remoteFileName, final FlowFile origFlowFile, final ProcessSession session) throws ProcessException, IOException { + final SFTPClient sftpClient = getSFTPClient(origFlowFile); + RemoteFile rf = null; + RemoteFile.ReadAheadRemoteFileInputStream rfis = null; + FlowFile resultFlowFile = null; try { - // The client has 'get' methods for downloading a file, but they don't offer a way to get access to an InputStream so - // this code is what the SFTPTransfer Downloader does to get a stream for the remote file contents - final RemoteFile rf = sftpClient.open(remoteFileName); - final RemoteFile.ReadAheadRemoteFileInputStream rfis = rf.new ReadAheadRemoteFileInputStream(16); - return rfis; + rf = sftpClient.open(remoteFileName); + rfis = rf.new ReadAheadRemoteFileInputStream(16); + final InputStream in = rfis; + resultFlowFile = session.write(origFlowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + StreamUtils.copy(in, out); + } + }); + return resultFlowFile; } catch (final SFTPException e) { switch (e.getStatusCode()) { case NO_SUCH_FILE: @@ -368,19 +376,24 @@ public class SFTPTransfer implements FileTransfer { default: throw new IOException("Failed to obtain file content for " + remoteFileName, e); } + } finally { + if(rf != null){ + try{ + rf.close(); + }catch(final IOException ioe){ + //do nothing + } + } + if(rfis != null){ + try{ + rfis.close(); + }catch(final IOException ioe){ + //do nothing + } + } } } - @Override - public void flush() throws IOException { - // nothing needed here - } - - @Override - public boolean flush(final FlowFile flowFile) throws IOException { - return true; - } - @Override public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException { final SFTPClient sftpClient = getSFTPClient(flowFile); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java index de76b07fe9..68eb627833 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java @@ -26,6 +26,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,9 +34,13 @@ import java.util.Map; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processors.standard.util.FileInfo; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.PermissionDeniedException; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -285,12 +290,7 @@ public class TestFetchFileTransfer { } @Override - public InputStream getInputStream(final String remoteFileName) throws IOException { - return getInputStream(remoteFileName, null); - } - - @Override - public InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException { + public FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException { if (!allowAccess) { throw new PermissionDeniedException("test permission denied"); } @@ -299,17 +299,14 @@ public class TestFetchFileTransfer { if (content == null) { throw new FileNotFoundException(); } - - return new ByteArrayInputStream(content); - } - - @Override - public void flush() throws IOException { - } - - @Override - public boolean flush(FlowFile flowFile) throws IOException { - return true; + final InputStream in = new ByteArrayInputStream(content); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + StreamUtils.copy(in, out); + } + }); + return flowFile; } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java index 17a3cda035..de8c8b6b07 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java @@ -310,29 +310,6 @@ public class ITestSFTPTransferWithSSHTestServer { } } - @Test - public void testGetInputStream() throws IOException { - final String filename = "./" + DIR_2 + "/" + FILE_1; - final Map properties = createBaseProperties(); - - try(final SFTPTransfer transfer = createSFTPTransfer(properties); - final InputStream in = transfer.getInputStream(filename)) { - final String content = IOUtils.toString(in, StandardCharsets.UTF_8); - assertEquals("dir2 file1", content); - } - } - - @Test(expected = FileNotFoundException.class) - public void testGetInputStreamWhenFileDoesNotExist() throws IOException { - final String filename = "./" + DIR_2 + "/DOES-NOT-EXIST"; - final Map properties = createBaseProperties(); - - try(final SFTPTransfer transfer = createSFTPTransfer(properties); - final InputStream in = transfer.getInputStream(filename)) { - IOUtils.toString(in, StandardCharsets.UTF_8); - } - } - @Test public void testDeleteFileWithoutPath() throws IOException { final Map properties = createBaseProperties();