diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java index 20bb92748c..cf6df69d1e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java @@ -74,6 +74,7 @@ public class FetchFTP extends FetchFileTransfer { properties.add(FTPTransfer.HTTP_PROXY_PASSWORD); properties.add(FTPTransfer.BUFFER_SIZE); properties.add(FILE_NOT_FOUND_LOG_LEVEL); + properties.add(FTPTransfer.UTF8_ENCODING); return properties; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java index e7d96c60fb..7d85b871d6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPFile; import org.apache.commons.net.ftp.FTPHTTPClient; @@ -546,19 +547,8 @@ public class FTPTransfer implements FileTransfer { } } - private FTPClient getClient(final FlowFile flowFile) throws IOException { - if (client != null) { - String desthost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); - if (remoteHostName.equals(desthost)) { - // destination matches so we can keep our current session - resetWorkingDirectory(); - return client; - } else { - // this flowFile is going to a different destination, reset session - close(); - } - } - + @VisibleForTesting + protected FTPClient createFTPClient() { final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(ctx, createComponentProxyConfigSupplier(ctx)); final Proxy.Type proxyType = proxyConfig.getProxyType(); @@ -574,6 +564,24 @@ public class FTPTransfer implements FileTransfer { client.setSocketFactory(new SocksProxySocketFactory(new Proxy(proxyType, new InetSocketAddress(proxyHost, proxyPort)))); } } + + return client; + } + + private FTPClient getClient(final FlowFile flowFile) throws IOException { + if (client != null) { + String desthost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); + if (remoteHostName.equals(desthost)) { + // destination matches so we can keep our current session + resetWorkingDirectory(); + return client; + } else { + // this flowFile is going to a different destination, reset session + close(); + } + } + + FTPClient client = createFTPClient(); this.client = client; client.setBufferSize(ctx.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue()); client.setDataTimeout(ctx.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java index 450119f50d..1400fc4775 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java @@ -202,6 +202,33 @@ public class TestFTP { retrievedFile.assertContentEquals("Just some random test test test chocolate"); } + @Test + public void basicFileFetchWithUTF8FileName() throws IOException { + FileSystem fs = fakeFtpServer.getFileSystem(); + + FileEntry sampleFile = new FileEntry("c:\\data\\őűőű.txt"); + sampleFile.setContents("Just some random test test test chocolate"); + fs.add(sampleFile); + + TestRunner runner = TestRunners.newTestRunner(FetchFTP.class); + runner.setProperty(FetchFTP.HOSTNAME, "localhost"); + runner.setProperty(FetchFTP.USERNAME, username); + runner.setProperty(FTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, String.valueOf(ftpPort)); + runner.setProperty(FetchFTP.REMOTE_FILENAME, "c:\\data\\őűőű.txt"); + runner.setProperty(FetchFTP.COMPLETION_STRATEGY, FetchFTP.COMPLETION_MOVE); + runner.setProperty(FetchFTP.MOVE_DESTINATION_DIR, "data"); + runner.setProperty(FTPTransfer.UTF8_ENCODING, "true"); + + runner.enqueue(""); + + runner.run(); + + runner.assertTransferCount(FetchFTP.REL_SUCCESS, 1); + final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0); + retrievedFile.assertContentEquals("Just some random test test test chocolate"); + } + @Test public void basicFileList() throws IOException, InterruptedException { FileSystem results = fakeFtpServer.getFileSystem(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java similarity index 53% rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java rename to nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java index 68eb627833..34c26c0d3d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java @@ -20,46 +20,74 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processors.standard.util.FileInfo; +import org.apache.nifi.processors.standard.util.FTPTransfer; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.PermissionDeniedException; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; -public class TestFetchFileTransfer { +public class TestFetchFTP { + + private TestableFetchFTP proc; + private TestRunner runner; + + @Before + public void setUp() throws Exception { + proc = new TestableFetchFTP(); + runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); - @Test - public void testContentFetched() { - final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); - final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); - proc.addContent("hello.txt", "world".getBytes()); - final Map attrs = new HashMap<>(); - attrs.put("filename", "hello.txt"); - runner.enqueue(new byte[0], attrs); + MockProcessContext ctx = (MockProcessContext) runner.getProcessContext(); + setDefaultValues(ctx, FTPTransfer.BUFFER_SIZE, FTPTransfer.DATA_TIMEOUT, FTPTransfer.CONNECTION_TIMEOUT, + FTPTransfer.CONNECTION_MODE, FTPTransfer.TRANSFER_MODE); + ctx.setProperty(FTPTransfer.USERNAME, "foo"); + ctx.setProperty(FTPTransfer.PASSWORD, "bar"); + } + + private void setDefaultValues(MockProcessContext ctx, PropertyDescriptor... propertyDescriptors) { + Arrays.stream(propertyDescriptors).forEach(d -> ctx.setProperty(d, d.getDefaultValue())); + } + + private void addFileAndEnqueue(String filename) { + proc.addContent(filename, "world".getBytes()); + runner.enqueue(new byte[0], Collections.singletonMap("filename", filename)); + } + + @Test + public void testContentFetched() { + addFileAndEnqueue("hello.txt"); runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); @@ -69,18 +97,7 @@ public class TestFetchFileTransfer { @Test public void testFilenameContainsPath() { - final String filenameWithPath = "./here/is/my/path/hello.txt"; - - final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); - runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); - runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); - - proc.addContent(filenameWithPath, "world".getBytes()); - final Map attrs = new HashMap<>(); - attrs.put("filename", filenameWithPath); - runner.enqueue(new byte[0], attrs); + addFileAndEnqueue("./here/is/my/path/hello.txt"); runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); @@ -92,16 +109,21 @@ public class TestFetchFileTransfer { } @Test - public void testContentNotFound() { - final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); - runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); - runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); + public void testControlEncodingIsSetToUTF8() { + runner.setProperty(FTPTransfer.UTF8_ENCODING, "true"); - final Map attrs = new HashMap<>(); - attrs.put("filename", "hello.txt"); - runner.enqueue(new byte[0], attrs); + addFileAndEnqueue("őűőű.txt"); + + runner.run(1, false, false); + + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + verify(proc.mockFtpClient).setControlEncoding(argument.capture()); + assertEquals("utf-8", argument.getValue().toLowerCase()); + } + + @Test + public void testContentNotFound() { + runner.enqueue(new byte[0], Collections.singletonMap("filename", "hello.txt")); runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 1); @@ -109,17 +131,8 @@ public class TestFetchFileTransfer { @Test public void testInsufficientPermissions() { - final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); - runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); - runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); - - proc.addContent("hello.txt", "world".getBytes()); + addFileAndEnqueue("hello.txt"); proc.allowAccess = false; - final Map attrs = new HashMap<>(); - attrs.put("filename", "hello.txt"); - runner.enqueue(new byte[0], attrs); runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1); @@ -128,19 +141,11 @@ public class TestFetchFileTransfer { @Test public void testMoveFileWithNoTrailingSlashDirName() { - final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); - runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); - runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved"); runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true"); - proc.addContent("hello.txt", "world".getBytes()); - final Map attrs = new HashMap<>(); - attrs.put("filename", "hello.txt"); - runner.enqueue(new byte[0], attrs); + addFileAndEnqueue("hello.txt"); runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); @@ -151,18 +156,10 @@ public class TestFetchFileTransfer { @Test public void testMoveFileWithTrailingSlashDirName() { - final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); - runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); - runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/"); - proc.addContent("hello.txt", "world".getBytes()); - final Map attrs = new HashMap<>(); - attrs.put("filename", "hello.txt"); - runner.enqueue(new byte[0], attrs); + addFileAndEnqueue("hello.txt"); runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); @@ -173,17 +170,9 @@ public class TestFetchFileTransfer { @Test public void testDeleteFile() { - final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); - runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); - runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue()); - proc.addContent("hello.txt", "world".getBytes()); - final Map attrs = new HashMap<>(); - attrs.put("filename", "hello.txt"); - runner.enqueue(new byte[0], attrs); + addFileAndEnqueue("hello.txt"); runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); @@ -192,19 +181,11 @@ public class TestFetchFileTransfer { @Test public void testDeleteFails() { - final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); - runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); - runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue()); - - proc.addContent("hello.txt", "world".getBytes()); - final Map attrs = new HashMap<>(); - attrs.put("filename", "hello.txt"); - runner.enqueue(new byte[0], attrs); proc.allowDelete = false; + addFileAndEnqueue("hello.txt"); + runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); assertFalse(proc.fileContents.isEmpty()); @@ -212,21 +193,13 @@ public class TestFetchFileTransfer { @Test public void testRenameFails() { - final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); - runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); - runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/"); - - proc.addContent("hello.txt", "world".getBytes()); - final Map attrs = new HashMap<>(); - attrs.put("filename", "hello.txt"); - runner.enqueue(new byte[0], attrs); proc.allowDelete = false; proc.allowRename = false; + addFileAndEnqueue("hello.txt"); + runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); assertEquals(1, proc.fileContents.size()); @@ -236,19 +209,12 @@ public class TestFetchFileTransfer { @Test public void testCreateDirFails() { - final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); - runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); - runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/"); runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true"); - proc.addContent("hello.txt", "world".getBytes()); - final Map attrs = new HashMap<>(); - attrs.put("filename", "hello.txt"); - runner.enqueue(new byte[0], attrs); + addFileAndEnqueue("hello.txt"); + proc.allowCreateDir = false; runner.run(1, false, false); @@ -259,13 +225,28 @@ public class TestFetchFileTransfer { } - private static class TestableFetchFileTransfer extends FetchFileTransfer { + private static class TestableFetchFTP extends FetchFTP { private boolean allowAccess = true; private boolean allowDelete = true; private boolean allowCreateDir = true; private boolean allowRename = true; private boolean closed = false; private final Map fileContents = new HashMap<>(); + private final FTPClient mockFtpClient = Mockito.mock(FTPClient.class); + + private TestableFetchFTP() throws IOException { + when(mockFtpClient.retrieveFileStream(anyString())) + .then((Answer) invocationOnMock -> { + byte[] content = fileContents.get(invocationOnMock.getArgument(0)); + if (content == null) { + throw new FileNotFoundException(); + } + return new ByteArrayInputStream(content); + }); + when(mockFtpClient.login(anyString(), anyString())).thenReturn(true); + when(mockFtpClient.setFileType(anyInt())).thenReturn(true); + + } public void addContent(final String filename, final byte[] content) { this.fileContents.put(filename, content); @@ -273,20 +254,11 @@ public class TestFetchFileTransfer { @Override protected FileTransfer createFileTransfer(final ProcessContext context) { - return new FileTransfer() { - @Override - public void close() throws IOException { - closed = true; - } + return new FTPTransfer(context, getLogger()) { @Override - public String getHomeDirectory(FlowFile flowFile) throws IOException { - return null; - } - - @Override - public List getListing() throws IOException { - return null; + protected FTPClient createFTPClient() { + return mockFtpClient; } @Override @@ -295,28 +267,7 @@ public class TestFetchFileTransfer { throw new PermissionDeniedException("test permission denied"); } - final byte[] content = fileContents.get(remoteFileName); - if (content == null) { - throw new FileNotFoundException(); - } - final InputStream in = new ByteArrayInputStream(content); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - StreamUtils.copy(in, out); - } - }); - return flowFile; - } - - @Override - public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException { - return null; - } - - @Override - public String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException { - return null; + return super.getRemoteFile(remoteFileName, flowFile, session); } @Override @@ -346,21 +297,6 @@ public class TestFetchFileTransfer { fileContents.put(target, content); } - @Override - public void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException { - - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public String getProtocolName() { - return "test"; - } - @Override public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException { if (!allowCreateDir) {