NIFI-3281 - fix for (S)FTP processors when using EL against FFs

NIFI-3281 - Review - handle completePendingCommand return and added a unit test for ListFTP

NIFI-3281 - Review - Added flow file for EL evaluation in other methods and added unit test for NIFI-3590

This closes #1974.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Pierre Villard 2017-07-04 10:48:41 +02:00 committed by Koji Kawamura
parent 50f22162b0
commit a1706d12f5
9 changed files with 111 additions and 24 deletions

View File

@ -239,9 +239,13 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
@Override @Override
public void process(final OutputStream out) throws IOException { public void process(final OutputStream out) throws IOException {
StreamUtils.copy(in, out); StreamUtils.copy(in, out);
transfer.flush();
} }
}); });
if(!transfer.flush(flowFile)) {
throw new IOException("completePendingCommand returned false, file transfer failed");
}
transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime())); transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime()));
} catch (final FileNotFoundException e) { } catch (final FileNotFoundException e) {
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
@ -297,7 +301,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue(); final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();
if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) { if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
try { try {
transfer.deleteFile(null, filename); transfer.deleteFile(flowFile, null, filename);
} catch (final FileNotFoundException e) { } catch (final FileNotFoundException e) {
// file doesn't exist -- effectively the same as removing it. Move on. // file doesn't exist -- effectively the same as removing it. Move on.
} catch (final IOException ioe) { } catch (final IOException ioe) {
@ -313,7 +317,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
final String target = targetDir + simpleFilename; final String target = targetDir + simpleFilename;
try { try {
transfer.rename(filename, target); transfer.rename(flowFile, filename, target);
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}", getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
new Object[] {flowFile, host, port, filename, ioe}, ioe); new Object[] {flowFile, host, port, filename, ioe}, ioe);

View File

@ -208,7 +208,7 @@ public abstract class GetFileTransfer extends AbstractProcessor {
if (deleteOriginal) { if (deleteOriginal) {
try { try {
transfer.deleteFile(null, file.getFullPathFileName()); transfer.deleteFile(flowFile, null, file.getFullPathFileName());
} catch (final IOException e) { } catch (final IOException e) {
logger.error("Failed to remove remote file {} due to {}; deleting local copy", logger.error("Failed to remove remote file {} due to {}; deleting local copy",
new Object[]{file.getFullPathFileName(), e}); new Object[]{file.getFullPathFileName(), e});

View File

@ -94,7 +94,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
@Override @Override
protected String getPath(final ProcessContext context) { protected String getPath(final ProcessContext context) {
return context.getProperty(REMOTE_PATH).getValue(); return context.getProperty(REMOTE_PATH).evaluateAttributeExpressions().getValue();
} }
@Override @Override

View File

@ -230,7 +230,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile}); logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile});
break; break;
case FileTransfer.CONFLICT_RESOLUTION_REPLACE: case FileTransfer.CONFLICT_RESOLUTION_REPLACE:
transfer.deleteFile(path, fileName); transfer.deleteFile(flowFile, path, fileName);
destinationRelationship = REL_SUCCESS; destinationRelationship = REL_SUCCESS;
transferFile = true; transferFile = true;
penalizeFile = false; penalizeFile = false;

View File

@ -289,7 +289,7 @@ public class FTPTransfer implements FileTransfer {
@Override @Override
public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException { public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
final FTPClient client = getClient(null); final FTPClient client = getClient(flowFile);
InputStream in = client.retrieveFileStream(remoteFileName); InputStream in = client.retrieveFileStream(remoteFileName);
if (in == null) { if (in == null) {
throw new IOException(client.getReplyString()); throw new IOException(client.getReplyString());
@ -303,6 +303,11 @@ public class FTPTransfer implements FileTransfer {
client.completePendingCommand(); client.completePendingCommand();
} }
@Override
public boolean flush(final FlowFile flowFile) throws IOException {
return getClient(flowFile).completePendingCommand();
}
@Override @Override
public FileInfo getRemoteFileInfo(final FlowFile flowFile, String path, String remoteFileName) throws IOException { public FileInfo getRemoteFileInfo(final FlowFile flowFile, String path, String remoteFileName) throws IOException {
final FTPClient client = getClient(flowFile); final FTPClient client = getClient(flowFile);
@ -444,8 +449,8 @@ public class FTPTransfer implements FileTransfer {
@Override @Override
public void rename(final String source, final String target) throws IOException { public void rename(final FlowFile flowFile, final String source, final String target) throws IOException {
final FTPClient client = getClient(null); final FTPClient client = getClient(flowFile);
final boolean renameSuccessful = client.rename(source, target); final boolean renameSuccessful = client.rename(source, target);
if (!renameSuccessful) { if (!renameSuccessful) {
throw new IOException("Failed to rename temporary file " + source + " to " + target + " due to: " + client.getReplyString()); throw new IOException("Failed to rename temporary file " + source + " to " + target + " due to: " + client.getReplyString());
@ -453,8 +458,8 @@ public class FTPTransfer implements FileTransfer {
} }
@Override @Override
public void deleteFile(final String path, final String remoteFileName) throws IOException { public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException {
final FTPClient client = getClient(null); final FTPClient client = getClient(flowFile);
if (path != null) { if (path != null) {
setWorkingDirectory(path); setWorkingDirectory(path);
} }
@ -464,8 +469,8 @@ public class FTPTransfer implements FileTransfer {
} }
@Override @Override
public void deleteDirectory(final String remoteDirectoryName) throws IOException { public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException {
final FTPClient client = getClient(null); final FTPClient client = getClient(flowFile);
final boolean success = client.removeDirectory(remoteDirectoryName); final boolean success = client.removeDirectory(remoteDirectoryName);
if (!success) { if (!success) {
throw new IOException("Failed to remove directory " + remoteDirectoryName + " due to " + client.getReplyString()); throw new IOException("Failed to remove directory " + remoteDirectoryName + " due to " + client.getReplyString());

View File

@ -39,15 +39,17 @@ public interface FileTransfer extends Closeable {
void flush() throws IOException; void flush() throws IOException;
boolean flush(FlowFile flowFile) throws IOException;
FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException; FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException;
String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException; String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException;
void rename(String source, String target) throws IOException; void rename(FlowFile flowFile, String source, String target) throws IOException;
void deleteFile(String path, String remoteFileName) throws IOException; void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException;
void deleteDirectory(String remoteDirectoryName) throws IOException; void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException;
boolean isClosed(); boolean isClosed();

View File

@ -309,7 +309,12 @@ public class SFTPTransfer implements FileTransfer {
} }
@Override @Override
public void deleteFile(final String path, final String remoteFileName) throws IOException { public boolean flush(final FlowFile flowFile) throws IOException {
return true;
}
@Override
public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException {
final String fullPath = (path == null) ? remoteFileName : (path.endsWith("/")) ? path + remoteFileName : path + "/" + remoteFileName; final String fullPath = (path == null) ? remoteFileName : (path.endsWith("/")) ? path + remoteFileName : path + "/" + remoteFileName;
try { try {
sftp.rm(fullPath); sftp.rm(fullPath);
@ -326,7 +331,7 @@ public class SFTPTransfer implements FileTransfer {
} }
@Override @Override
public void deleteDirectory(final String remoteDirectoryName) throws IOException { public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException {
try { try {
sftp.rm(remoteDirectoryName); sftp.rm(remoteDirectoryName);
} catch (final SftpException e) { } catch (final SftpException e) {
@ -613,8 +618,8 @@ public class SFTPTransfer implements FileTransfer {
} }
@Override @Override
public void rename(final String source, final String target) throws IOException { public void rename(final FlowFile flowFile, final String source, final String target) throws IOException {
final ChannelSftp sftp = getChannel(null); final ChannelSftp sftp = getChannel(flowFile);
try { try {
sftp.rename(source, target); sftp.rename(source, target);
} catch (final SftpException e) { } catch (final SftpException e) {

View File

@ -36,7 +36,6 @@ import org.mockftpserver.fake.filesystem.FileSystem;
import org.mockftpserver.fake.filesystem.WindowsFakeFileSystem; import org.mockftpserver.fake.filesystem.WindowsFakeFileSystem;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
@ -167,4 +166,71 @@ public class TestFTP {
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(GetFTP.REL_SUCCESS).get(0); final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(GetFTP.REL_SUCCESS).get(0);
retrievedFile.assertContentEquals("Just some random test test test chocolate"); retrievedFile.assertContentEquals("Just some random test test test chocolate");
} }
@Test
public void basicFileFetch() throws IOException {
FileSystem results = fakeFtpServer.getFileSystem();
FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
sampleFile.setContents("Just some random test test test chocolate");
results.add(sampleFile);
// Check file exists
Assert.assertTrue(results.exists("c:\\data\\randombytes-2"));
TestRunner runner = TestRunners.newTestRunner(FetchFTP.class);
runner.setProperty(FetchFTP.HOSTNAME, "${host}");
runner.setProperty(FetchFTP.USERNAME, "${username}");
runner.setProperty(FTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, "${port}");
runner.setProperty(FetchFTP.REMOTE_FILENAME, "c:\\data\\randombytes-2");
runner.setProperty(FetchFTP.COMPLETION_STRATEGY, FetchFTP.COMPLETION_MOVE);
runner.setProperty(FetchFTP.MOVE_DESTINATION_DIR, "data");
Map<String, String> attrs = new HashMap<String, String>();
attrs.put("host", "localhost");
attrs.put("username", username);
attrs.put("port", Integer.toString(ftpPort));
runner.enqueue("", attrs);
runner.run();
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0);
retrievedFile.assertContentEquals("Just some random test test test chocolate");
}
@Test
public void basicFileList() throws IOException {
FileSystem results = fakeFtpServer.getFileSystem();
FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
sampleFile.setContents("Just some random test test test chocolate");
results.add(sampleFile);
// Check file exists
Assert.assertTrue(results.exists("c:\\data\\randombytes-2"));
TestRunner runner = TestRunners.newTestRunner(ListFTP.class);
runner.setProperty(ListFTP.HOSTNAME, "localhost");
runner.setProperty(ListFTP.USERNAME, username);
runner.setProperty(FTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort));
runner.setProperty(ListFTP.REMOTE_PATH, "/");
runner.assertValid();
runner.run();
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0);
runner.assertAllFlowFilesContainAttribute("ftp.remote.host");
runner.assertAllFlowFilesContainAttribute("ftp.remote.port");
runner.assertAllFlowFilesContainAttribute("ftp.listing.user");
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_OWNER_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_GROUP_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE);
retrievedFile.assertAttributeEquals("ftp.listing.user", username);
retrievedFile.assertAttributeEquals("filename", "randombytes-2");
}
} }

View File

@ -281,6 +281,11 @@ public class TestFetchFileTransfer {
public void flush() throws IOException { public void flush() throws IOException {
} }
@Override
public boolean flush(FlowFile flowFile) throws IOException {
return true;
}
@Override @Override
public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException { public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException {
return null; return null;
@ -292,7 +297,7 @@ public class TestFetchFileTransfer {
} }
@Override @Override
public void deleteFile(String path, String remoteFileName) throws IOException { public void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException {
if (!allowDelete) { if (!allowDelete) {
throw new PermissionDeniedException("test permission denied"); throw new PermissionDeniedException("test permission denied");
} }
@ -305,7 +310,7 @@ public class TestFetchFileTransfer {
} }
@Override @Override
public void rename(String source, String target) throws IOException { public void rename(FlowFile flowFile, String source, String target) throws IOException {
if (!allowRename) { if (!allowRename) {
throw new PermissionDeniedException("test permission denied"); throw new PermissionDeniedException("test permission denied");
} }
@ -319,7 +324,7 @@ public class TestFetchFileTransfer {
} }
@Override @Override
public void deleteDirectory(String remoteDirectoryName) throws IOException { public void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException {
} }