NIFI-7685: Add UTF8 support for FetchFTP

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4446
This commit is contained in:
Denes Arvay 2020-07-30 10:13:20 +02:00 committed by Matthew Burgess
parent e2ccfbbacf
commit 6990f0d3a9
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
4 changed files with 137 additions and 165 deletions

View File

@ -74,6 +74,7 @@ public class FetchFTP extends FetchFileTransfer {
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
properties.add(FTPTransfer.BUFFER_SIZE);
properties.add(FILE_NOT_FOUND_LOG_LEVEL);
properties.add(FTPTransfer.UTF8_ENCODING);
return properties;
}

View File

@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPHTTPClient;
@ -546,19 +547,8 @@ public class FTPTransfer implements FileTransfer {
}
}
private FTPClient getClient(final FlowFile flowFile) throws IOException {
if (client != null) {
String desthost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
if (remoteHostName.equals(desthost)) {
// destination matches so we can keep our current session
resetWorkingDirectory();
return client;
} else {
// this flowFile is going to a different destination, reset session
close();
}
}
@VisibleForTesting
protected FTPClient createFTPClient() {
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(ctx, createComponentProxyConfigSupplier(ctx));
final Proxy.Type proxyType = proxyConfig.getProxyType();
@ -574,6 +564,24 @@ public class FTPTransfer implements FileTransfer {
client.setSocketFactory(new SocksProxySocketFactory(new Proxy(proxyType, new InetSocketAddress(proxyHost, proxyPort))));
}
}
return client;
}
private FTPClient getClient(final FlowFile flowFile) throws IOException {
if (client != null) {
String desthost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
if (remoteHostName.equals(desthost)) {
// destination matches so we can keep our current session
resetWorkingDirectory();
return client;
} else {
// this flowFile is going to a different destination, reset session
close();
}
}
FTPClient client = createFTPClient();
this.client = client;
client.setBufferSize(ctx.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue());
client.setDataTimeout(ctx.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());

View File

@ -202,6 +202,33 @@ public class TestFTP {
retrievedFile.assertContentEquals("Just some random test test test chocolate");
}
@Test
public void basicFileFetchWithUTF8FileName() throws IOException {
FileSystem fs = fakeFtpServer.getFileSystem();
FileEntry sampleFile = new FileEntry("c:\\data\\őűőű.txt");
sampleFile.setContents("Just some random test test test chocolate");
fs.add(sampleFile);
TestRunner runner = TestRunners.newTestRunner(FetchFTP.class);
runner.setProperty(FetchFTP.HOSTNAME, "localhost");
runner.setProperty(FetchFTP.USERNAME, username);
runner.setProperty(FTPTransfer.PASSWORD, password);
runner.setProperty(FTPTransfer.PORT, String.valueOf(ftpPort));
runner.setProperty(FetchFTP.REMOTE_FILENAME, "c:\\data\\őűőű.txt");
runner.setProperty(FetchFTP.COMPLETION_STRATEGY, FetchFTP.COMPLETION_MOVE);
runner.setProperty(FetchFTP.MOVE_DESTINATION_DIR, "data");
runner.setProperty(FTPTransfer.UTF8_ENCODING, "true");
runner.enqueue("");
runner.run();
runner.assertTransferCount(FetchFTP.REL_SUCCESS, 1);
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0);
retrievedFile.assertContentEquals("Just some random test test test chocolate");
}
@Test
public void basicFileList() throws IOException, InterruptedException {
FileSystem results = fakeFtpServer.getFileSystem();

View File

@ -20,46 +20,74 @@ package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.PermissionDeniedException;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
public class TestFetchFileTransfer {
public class TestFetchFTP {
private TestableFetchFTP proc;
private TestRunner runner;
@Before
public void setUp() throws Exception {
proc = new TestableFetchFTP();
runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
@Test
public void testContentFetched() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
MockProcessContext ctx = (MockProcessContext) runner.getProcessContext();
setDefaultValues(ctx, FTPTransfer.BUFFER_SIZE, FTPTransfer.DATA_TIMEOUT, FTPTransfer.CONNECTION_TIMEOUT,
FTPTransfer.CONNECTION_MODE, FTPTransfer.TRANSFER_MODE);
ctx.setProperty(FTPTransfer.USERNAME, "foo");
ctx.setProperty(FTPTransfer.PASSWORD, "bar");
}
private void setDefaultValues(MockProcessContext ctx, PropertyDescriptor... propertyDescriptors) {
Arrays.stream(propertyDescriptors).forEach(d -> ctx.setProperty(d, d.getDefaultValue()));
}
private void addFileAndEnqueue(String filename) {
proc.addContent(filename, "world".getBytes());
runner.enqueue(new byte[0], Collections.singletonMap("filename", filename));
}
@Test
public void testContentFetched() {
addFileAndEnqueue("hello.txt");
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
@ -69,18 +97,7 @@ public class TestFetchFileTransfer {
@Test
public void testFilenameContainsPath() {
final String filenameWithPath = "./here/is/my/path/hello.txt";
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
proc.addContent(filenameWithPath, "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", filenameWithPath);
runner.enqueue(new byte[0], attrs);
addFileAndEnqueue("./here/is/my/path/hello.txt");
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
@ -92,16 +109,21 @@ public class TestFetchFileTransfer {
}
@Test
public void testContentNotFound() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
public void testControlEncodingIsSetToUTF8() {
runner.setProperty(FTPTransfer.UTF8_ENCODING, "true");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
addFileAndEnqueue("őűőű.txt");
runner.run(1, false, false);
ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
verify(proc.mockFtpClient).setControlEncoding(argument.capture());
assertEquals("utf-8", argument.getValue().toLowerCase());
}
@Test
public void testContentNotFound() {
runner.enqueue(new byte[0], Collections.singletonMap("filename", "hello.txt"));
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 1);
@ -109,17 +131,8 @@ public class TestFetchFileTransfer {
@Test
public void testInsufficientPermissions() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
proc.addContent("hello.txt", "world".getBytes());
addFileAndEnqueue("hello.txt");
proc.allowAccess = false;
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
@ -128,19 +141,11 @@ public class TestFetchFileTransfer {
@Test
public void testMoveFileWithNoTrailingSlashDirName() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved");
runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true");
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
addFileAndEnqueue("hello.txt");
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
@ -151,18 +156,10 @@ public class TestFetchFileTransfer {
@Test
public void testMoveFileWithTrailingSlashDirName() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
addFileAndEnqueue("hello.txt");
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
@ -173,17 +170,9 @@ public class TestFetchFileTransfer {
@Test
public void testDeleteFile() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue());
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
addFileAndEnqueue("hello.txt");
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
@ -192,19 +181,11 @@ public class TestFetchFileTransfer {
@Test
public void testDeleteFails() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue());
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
proc.allowDelete = false;
addFileAndEnqueue("hello.txt");
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
assertFalse(proc.fileContents.isEmpty());
@ -212,21 +193,13 @@ public class TestFetchFileTransfer {
@Test
public void testRenameFails() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
proc.allowDelete = false;
proc.allowRename = false;
addFileAndEnqueue("hello.txt");
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
assertEquals(1, proc.fileContents.size());
@ -236,19 +209,12 @@ public class TestFetchFileTransfer {
@Test
public void testCreateDirFails() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true");
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
addFileAndEnqueue("hello.txt");
proc.allowCreateDir = false;
runner.run(1, false, false);
@ -259,13 +225,28 @@ public class TestFetchFileTransfer {
}
private static class TestableFetchFileTransfer extends FetchFileTransfer {
private static class TestableFetchFTP extends FetchFTP {
private boolean allowAccess = true;
private boolean allowDelete = true;
private boolean allowCreateDir = true;
private boolean allowRename = true;
private boolean closed = false;
private final Map<String, byte[]> fileContents = new HashMap<>();
private final FTPClient mockFtpClient = Mockito.mock(FTPClient.class);
private TestableFetchFTP() throws IOException {
when(mockFtpClient.retrieveFileStream(anyString()))
.then((Answer) invocationOnMock -> {
byte[] content = fileContents.get(invocationOnMock.getArgument(0));
if (content == null) {
throw new FileNotFoundException();
}
return new ByteArrayInputStream(content);
});
when(mockFtpClient.login(anyString(), anyString())).thenReturn(true);
when(mockFtpClient.setFileType(anyInt())).thenReturn(true);
}
public void addContent(final String filename, final byte[] content) {
this.fileContents.put(filename, content);
@ -273,20 +254,11 @@ public class TestFetchFileTransfer {
@Override
protected FileTransfer createFileTransfer(final ProcessContext context) {
return new FileTransfer() {
@Override
public void close() throws IOException {
closed = true;
}
return new FTPTransfer(context, getLogger()) {
@Override
public String getHomeDirectory(FlowFile flowFile) throws IOException {
return null;
}
@Override
public List<FileInfo> getListing() throws IOException {
return null;
protected FTPClient createFTPClient() {
return mockFtpClient;
}
@Override
@ -295,28 +267,7 @@ public class TestFetchFileTransfer {
throw new PermissionDeniedException("test permission denied");
}
final byte[] content = fileContents.get(remoteFileName);
if (content == null) {
throw new FileNotFoundException();
}
final InputStream in = new ByteArrayInputStream(content);
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
StreamUtils.copy(in, out);
}
});
return flowFile;
}
@Override
public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException {
return null;
}
@Override
public String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException {
return null;
return super.getRemoteFile(remoteFileName, flowFile, session);
}
@Override
@ -346,21 +297,6 @@ public class TestFetchFileTransfer {
fileContents.put(target, content);
}
@Override
public void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public String getProtocolName() {
return "test";
}
@Override
public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException {
if (!allowCreateDir) {