NIFI-4374: Added buffer size to FTP connections

This closes #2141.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Gino Lisignoli 2017-09-11 10:49:47 +12:00 committed by Koji Kawamura
parent 1aeb517c27
commit 897b8ab601
5 changed files with 12 additions and 0 deletions

View File

@ -72,6 +72,7 @@ public class FetchFTP extends FetchFileTransfer {
properties.add(FTPTransfer.PROXY_PORT); properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME); properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD); properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
properties.add(FTPTransfer.BUFFER_SIZE);
return properties; return properties;
} }

View File

@ -81,6 +81,7 @@ public class GetFTP extends GetFileTransfer {
properties.add(FTPTransfer.PROXY_PORT); properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME); properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD); properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
properties.add(FTPTransfer.BUFFER_SIZE);
properties.add(FTPTransfer.UTF8_ENCODING); properties.add(FTPTransfer.UTF8_ENCODING);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
} }

View File

@ -84,6 +84,7 @@ public class ListFTP extends ListFileTransfer {
properties.add(FTPTransfer.PROXY_PORT); properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME); properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD); properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
properties.add(FTPTransfer.BUFFER_SIZE);
properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION); properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
return properties; return properties;
} }

View File

@ -94,6 +94,7 @@ public class PutFTP extends PutFileTransfer<FTPTransfer> {
properties.add(FTPTransfer.PROXY_PORT); properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME); properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD); properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
properties.add(FTPTransfer.BUFFER_SIZE);
properties.add(FTPTransfer.UTF8_ENCODING); properties.add(FTPTransfer.UTF8_ENCODING);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);

View File

@ -41,6 +41,7 @@ import org.apache.commons.net.ftp.FTPReply;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
@ -105,6 +106,12 @@ public class FTPTransfer implements FileTransfer {
.required(false) .required(false)
.sensitive(true) .sensitive(true)
.build(); .build();
public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Internal Buffer Size")
.description("Set the internal buffer size for buffered data streams")
.defaultValue("16KB")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor UTF8_ENCODING = new PropertyDescriptor.Builder() public static final PropertyDescriptor UTF8_ENCODING = new PropertyDescriptor.Builder()
.name("ftp-use-utf8") .name("ftp-use-utf8")
.displayName("Use UTF-8 Encoding") .displayName("Use UTF-8 Encoding")
@ -527,6 +534,7 @@ public class FTPTransfer implements FileTransfer {
} }
} }
this.client = client; this.client = client;
client.setBufferSize(ctx.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue());
client.setDataTimeout(ctx.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); client.setDataTimeout(ctx.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
client.setDefaultTimeout(ctx.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); client.setDefaultTimeout(ctx.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
client.setRemoteVerificationEnabled(false); client.setRemoteVerificationEnabled(false);