NIFI-7222 Cleaned up API for FTP/SFTP remote file retrieval and ensure we close remote file resources for SFTP pulls in particular

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4115.
This commit is contained in:
Joe Witt 2020-03-04 23:41:45 -05:00 committed by Pierre Villard
parent b82fec41d9
commit 040c8a0af9
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
7 changed files with 91 additions and 120 deletions

View File

@ -31,19 +31,15 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.PermissionDeniedException; import org.apache.nifi.processors.standard.util.PermissionDeniedException;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple; import org.apache.nifi.util.Tuple;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -261,20 +257,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
boolean closeConnection = false; boolean closeConnection = false;
try { try {
// Pull data from remote system. // Pull data from remote system.
final InputStream in;
try { try {
in = transfer.getInputStream(filename, flowFile); flowFile = transfer.getRemoteFile(filename, flowFile, session);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(in, out);
}
});
if (!transfer.flush(flowFile)) {
throw new IOException("completePendingCommand returned false, file transfer failed");
}
} catch (final FileNotFoundException e) { } catch (final FileNotFoundException e) {
closeConnection = false; closeConnection = false;

View File

@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path; import java.nio.file.Path;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
@ -189,12 +188,9 @@ public abstract class GetFileTransfer extends AbstractProcessor {
try { try {
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
final StopWatch stopWatch = new StopWatch(false); final StopWatch stopWatch = new StopWatch(false);
try (final InputStream in = transfer.getInputStream(file.getFullPathFileName())) { stopWatch.start();
stopWatch.start(); flowFile = transfer.getRemoteFile(file.getFullPathFileName(), flowFile, session);
flowFile = session.importFrom(in, flowFile); stopWatch.stop();
stopWatch.stop();
}
transfer.flush();
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final String dataRate = stopWatch.calculateDataRate(flowFile.getSize()); final String dataRate = stopWatch.calculateDataRate(flowFile.getSize());
flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", hostname); flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", hostname);

View File

@ -20,6 +20,7 @@ import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Proxy; import java.net.Proxy;
@ -50,10 +51,13 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec; import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.stream.io.StreamUtils;
public class FTPTransfer implements FileTransfer { public class FTPTransfer implements FileTransfer {
@ -314,35 +318,39 @@ public class FTPTransfer implements FileTransfer {
} }
@Override @Override
public InputStream getInputStream(String remoteFileName) throws IOException { public FlowFile getRemoteFile(final String remoteFileName, final FlowFile origFlowFile, final ProcessSession session) throws ProcessException, IOException {
return getInputStream(remoteFileName, null); final FTPClient client = getClient(origFlowFile);
} InputStream in = null;
FlowFile resultFlowFile = null;
@Override try {
public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException { in = client.retrieveFileStream(remoteFileName);
final FTPClient client = getClient(flowFile); if (in == null) {
InputStream in = client.retrieveFileStream(remoteFileName); final String response = client.getReplyString();
if (in == null) { // FTPClient doesn't throw exception if file not found.
final String response = client.getReplyString(); // Instead, response string will contain: "550 Can't open <absolute_path>: No such file or directory"
// FTPClient doesn't throw exception if file not found. if (response != null && response.trim().endsWith("No such file or directory")) {
// Instead, response string will contain: "550 Can't open <absolute_path>: No such file or directory" throw new FileNotFoundException(response);
if (response != null && response.trim().endsWith("No such file or directory")){ }
throw new FileNotFoundException(response); throw new IOException(response);
}
final InputStream remoteIn = in;
resultFlowFile = session.write(origFlowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(remoteIn, out);
}
});
client.completePendingCommand();
return resultFlowFile;
} finally {
if(in != null){
try{
in.close();
}catch(final IOException ioe){
//do nothing
}
} }
throw new IOException(response);
} }
return in;
}
@Override
public void flush() throws IOException {
final FTPClient client = getClient(null);
client.completePendingCommand();
}
@Override
public boolean flush(final FlowFile flowFile) throws IOException {
return getClient(flowFile).completePendingCommand();
} }
@Override @Override

View File

@ -26,6 +26,8 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
public interface FileTransfer extends Closeable { public interface FileTransfer extends Closeable {
@ -34,13 +36,7 @@ public interface FileTransfer extends Closeable {
List<FileInfo> getListing() throws IOException; List<FileInfo> getListing() throws IOException;
InputStream getInputStream(String remoteFileName) throws IOException; FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException;
InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException;
void flush() throws IOException;
boolean flush(FlowFile flowFile) throws IOException;
FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException; FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException;

View File

@ -45,18 +45,23 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext; import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec; import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.stream.io.StreamUtils;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.Proxy; import java.net.Proxy;
import java.net.Socket; import java.net.Socket;
@ -346,19 +351,22 @@ public class SFTPTransfer implements FileTransfer {
} }
@Override @Override
public InputStream getInputStream(final String remoteFileName) throws IOException { public FlowFile getRemoteFile(final String remoteFileName, final FlowFile origFlowFile, final ProcessSession session) throws ProcessException, IOException {
return getInputStream(remoteFileName, null); final SFTPClient sftpClient = getSFTPClient(origFlowFile);
} RemoteFile rf = null;
RemoteFile.ReadAheadRemoteFileInputStream rfis = null;
@Override FlowFile resultFlowFile = null;
public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
final SFTPClient sftpClient = getSFTPClient(flowFile);
try { try {
// The client has 'get' methods for downloading a file, but they don't offer a way to get access to an InputStream so rf = sftpClient.open(remoteFileName);
// this code is what the SFTPTransfer Downloader does to get a stream for the remote file contents rfis = rf.new ReadAheadRemoteFileInputStream(16);
final RemoteFile rf = sftpClient.open(remoteFileName); final InputStream in = rfis;
final RemoteFile.ReadAheadRemoteFileInputStream rfis = rf.new ReadAheadRemoteFileInputStream(16); resultFlowFile = session.write(origFlowFile, new OutputStreamCallback() {
return rfis; @Override
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(in, out);
}
});
return resultFlowFile;
} catch (final SFTPException e) { } catch (final SFTPException e) {
switch (e.getStatusCode()) { switch (e.getStatusCode()) {
case NO_SUCH_FILE: case NO_SUCH_FILE:
@ -368,19 +376,24 @@ public class SFTPTransfer implements FileTransfer {
default: default:
throw new IOException("Failed to obtain file content for " + remoteFileName, e); throw new IOException("Failed to obtain file content for " + remoteFileName, e);
} }
} finally {
if(rf != null){
try{
rf.close();
}catch(final IOException ioe){
//do nothing
}
}
if(rfis != null){
try{
rfis.close();
}catch(final IOException ioe){
//do nothing
}
}
} }
} }
@Override
public void flush() throws IOException {
// nothing needed here
}
@Override
public boolean flush(final FlowFile flowFile) throws IOException {
return true;
}
@Override @Override
public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException { public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException {
final SFTPClient sftpClient = getSFTPClient(flowFile); final SFTPClient sftpClient = getSFTPClient(flowFile);

View File

@ -26,6 +26,7 @@ import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -33,9 +34,13 @@ import java.util.Map;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.util.FileInfo; import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.PermissionDeniedException; import org.apache.nifi.processors.standard.util.PermissionDeniedException;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -285,12 +290,7 @@ public class TestFetchFileTransfer {
} }
@Override @Override
public InputStream getInputStream(final String remoteFileName) throws IOException { public FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException {
return getInputStream(remoteFileName, null);
}
@Override
public InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException {
if (!allowAccess) { if (!allowAccess) {
throw new PermissionDeniedException("test permission denied"); throw new PermissionDeniedException("test permission denied");
} }
@ -299,17 +299,14 @@ public class TestFetchFileTransfer {
if (content == null) { if (content == null) {
throw new FileNotFoundException(); throw new FileNotFoundException();
} }
final InputStream in = new ByteArrayInputStream(content);
return new ByteArrayInputStream(content); flowFile = session.write(flowFile, new OutputStreamCallback() {
} @Override
public void process(final OutputStream out) throws IOException {
@Override StreamUtils.copy(in, out);
public void flush() throws IOException { }
} });
return flowFile;
@Override
public boolean flush(FlowFile flowFile) throws IOException {
return true;
} }
@Override @Override

View File

@ -310,29 +310,6 @@ public class ITestSFTPTransferWithSSHTestServer {
} }
} }
@Test
public void testGetInputStream() throws IOException {
final String filename = "./" + DIR_2 + "/" + FILE_1;
final Map<PropertyDescriptor, String> properties = createBaseProperties();
try(final SFTPTransfer transfer = createSFTPTransfer(properties);
final InputStream in = transfer.getInputStream(filename)) {
final String content = IOUtils.toString(in, StandardCharsets.UTF_8);
assertEquals("dir2 file1", content);
}
}
@Test(expected = FileNotFoundException.class)
public void testGetInputStreamWhenFileDoesNotExist() throws IOException {
final String filename = "./" + DIR_2 + "/DOES-NOT-EXIST";
final Map<PropertyDescriptor, String> properties = createBaseProperties();
try(final SFTPTransfer transfer = createSFTPTransfer(properties);
final InputStream in = transfer.getInputStream(filename)) {
IOUtils.toString(in, StandardCharsets.UTF_8);
}
}
@Test @Test
public void testDeleteFileWithoutPath() throws IOException { public void testDeleteFileWithoutPath() throws IOException {
final Map<PropertyDescriptor, String> properties = createBaseProperties(); final Map<PropertyDescriptor, String> properties = createBaseProperties();