NIFI-673: Added Completion Strategy to FetchSFTP

This commit is contained in:
Mark Payne 2015-10-05 16:11:40 -04:00
parent d1d57931bf
commit b0322d9ffe
10 changed files with 265 additions and 47 deletions

View File

@ -70,23 +70,25 @@ import org.codehaus.jackson.map.ObjectMapper;
* *
* <p> * <p>
* In order to make use of this abstract class, the entities listed must meet the following criteria: * In order to make use of this abstract class, the entities listed must meet the following criteria:
* <ul>
* <li>
* Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
* returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
* </li>
* <li>
* Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is
* new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
* than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's
* identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been
* seen already.
* </li>
* <li>
* Entity must have a user-readable name that can be used for logging purposes.
* </li>
* </p> * </p>
* *
* <ul>
* <li>
* Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
* returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
* </li>
* <li>
* Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is
* new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
* than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's
* identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been
* seen already.
* </li>
* <li>
* Entity must have a user-readable name that can be used for logging purposes.
* </li>
* </ul>
*
* <p> * <p>
* This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the remote system. This is performed using * This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the remote system. This is performed using
* two different mechanisms. First, state is stored locally. This allows the system to be restarted and begin processing where it left off. The state that is * two different mechanisms. First, state is stored locally. This allows the system to be restarted and begin processing where it left off. The state that is
@ -111,6 +113,7 @@ import org.codehaus.jackson.map.ObjectMapper;
* *
* <p> * <p>
* Subclasses are responsible for the following: * Subclasses are responsible for the following:
* </p>
* *
* <ul> * <ul>
* <li> * <li>
@ -134,7 +137,6 @@ import org.codehaus.jackson.map.ObjectMapper;
* a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared. * a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
* </li> * </li>
* </ul> * </ul>
* </p>
*/ */
@TriggerSerially @TriggerSerially
public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor { public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
@ -372,8 +374,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
int listCount = 0; int listCount = 0;
Long latestListingTimestamp = null; Long latestListingTimestamp = null;
for (final T entity : entityList) { for (final T entity : entityList) {
final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp || final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp
(entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier()))); || (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier())));
// Create the FlowFile for this path. // Create the FlowFile for this path.
if (list) { if (list) {

View File

@ -31,7 +31,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -49,9 +51,18 @@ import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple; import org.apache.nifi.util.Tuple;
/** /**
* A base class for FetchSFTP, FetchFTP processors * A base class for FetchSFTP, FetchFTP processors.
*
* Note that implementations of this class should never use the @SupportsBatching annotation! Doing so
* could result in data loss!
*/ */
public abstract class FetchFileTransfer extends AbstractProcessor { public abstract class FetchFileTransfer extends AbstractProcessor {
static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is");
static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the <Move Destination Directory> property");
static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system");
static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname") .name("Hostname")
.description("The fully-qualified hostname or IP address of the host to fetch the data from") .description("The fully-qualified hostname or IP address of the host to fetch the data from")
@ -73,13 +84,25 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder() static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder()
.name("Delete Original") .name("Completion Strategy")
.description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred") .description("Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be "
.defaultValue("true") + "logged but the data will still be transferred.")
.allowableValues("true", "false") .expressionLanguageSupported(false)
.allowableValues(COMPLETION_NONE, COMPLETION_MOVE, COMPLETION_DELETE)
.defaultValue(COMPLETION_NONE.getValue())
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
.name("Move Destination Directory")
.description("The directory on the remote server to the move the original file to once it has been ingested into NiFi. "
+ "This property is ignored unless the Completion Strategy is set to \"Move File\". The specified directory must already exist on"
+ "the remote system, or the rename will fail.")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -156,7 +179,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
properties.add(HOSTNAME); properties.add(HOSTNAME);
properties.add(UNDEFAULTED_PORT); properties.add(UNDEFAULTED_PORT);
properties.add(REMOTE_FILENAME); properties.add(REMOTE_FILENAME);
properties.add(DELETE_ORIGINAL); properties.add(COMPLETION_STRATEGY);
properties.add(MOVE_DESTINATION_DIR);
return properties; return properties;
} }
@ -203,6 +227,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
final InputStream in; final InputStream in;
try { try {
in = transfer.getInputStream(filename, flowFile); in = transfer.getInputStream(filename, flowFile);
flowFile = session.write(flowFile, new OutputStreamCallback() { flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override @Override
public void process(final OutputStream out) throws IOException { public void process(final OutputStream out) throws IOException {
@ -250,15 +275,34 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
stopWatch.getElapsed(TimeUnit.MILLISECONDS)); stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
// delete remote file is necessary // it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
final boolean deleteOriginal = context.getProperty(DELETE_ORIGINAL).asBoolean(); // we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would
if (deleteOriginal) { // result in data loss! If we commit the session first, we are safe.
session.commit();
final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();
if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
try { try {
transfer.deleteFile(null, filename); transfer.deleteFile(null, filename);
} catch (final FileNotFoundException e) { } catch (final FileNotFoundException e) {
// file doesn't exist -- effectively the same as removing it. Move on. // file doesn't exist -- effectively the same as removing it. Move on.
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", new Object[] {flowFile, host, port, filename, ioe}, ioe); getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}",
new Object[] {flowFile, host, port, filename, ioe}, ioe);
}
} else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
if (!targetDir.endsWith("/")) {
targetDir = targetDir + "/";
}
final String simpleFilename = StringUtils.substringAfterLast(filename, "/");
final String target = targetDir + simpleFilename;
try {
transfer.rename(filename, target);
} catch (final IOException ioe) {
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
new Object[] {flowFile, host, port, filename, ioe}, ioe);
} }
} }
} }

View File

@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -35,8 +34,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer;
// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.
@SupportsBatching
@Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"}) @Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
@CapabilityDescription("Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.") @CapabilityDescription("Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.")
@SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class}) @SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class})
@ -50,15 +48,18 @@ public class FetchSFTP extends FetchFileTransfer {
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(FetchFileTransfer.HOSTNAME); properties.add(HOSTNAME);
properties.add(SFTPTransfer.PORT); properties.add(port);
properties.add(SFTPTransfer.USERNAME); properties.add(SFTPTransfer.USERNAME);
properties.add(SFTPTransfer.PASSWORD); properties.add(SFTPTransfer.PASSWORD);
properties.add(SFTPTransfer.PRIVATE_KEY_PATH); properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE); properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
properties.add(FetchFileTransfer.REMOTE_FILENAME); properties.add(REMOTE_FILENAME);
properties.add(SFTPTransfer.DELETE_ORIGINAL); properties.add(COMPLETION_STRATEGY);
properties.add(MOVE_DESTINATION_DIR);
properties.add(SFTPTransfer.CONNECTION_TIMEOUT); properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
properties.add(SFTPTransfer.DATA_TIMEOUT); properties.add(SFTPTransfer.DATA_TIMEOUT);
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT); properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);

View File

@ -38,6 +38,13 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
.required(true) .required(true)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("The port to connect to on the remote host to fetch the data from")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.required(true)
.build();
public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder() public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
.name("Remote Path") .name("Remote Path")
.description("The path on the remote system from which to pull or push files") .description("The path on the remote system from which to pull or push files")
@ -52,6 +59,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) { protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put(getProtocolName() + ".remote.host", context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue()); attributes.put(getProtocolName() + ".remote.host", context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
attributes.put(getProtocolName() + ".remote.port", context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions().getValue());
attributes.put("file.owner", fileInfo.getOwner()); attributes.put("file.owner", fileInfo.getOwner());
attributes.put("file.group", fileInfo.getGroup()); attributes.put("file.group", fileInfo.getGroup());
attributes.put("file.permissions", fileInfo.getPermissions()); attributes.put("file.permissions", fileInfo.getPermissions());

View File

@ -38,6 +38,7 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
@SeeAlso({FetchSFTP.class, GetSFTP.class, PutSFTP.class}) @SeeAlso({FetchSFTP.class, GetSFTP.class, PutSFTP.class})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "sftp.remote.host", description = "The hostname of the SFTP Server"), @WritesAttribute(attribute = "sftp.remote.host", description = "The hostname of the SFTP Server"),
@WritesAttribute(attribute = "sftp.remote.port", description = "The port that was connected to on the SFTP Server"),
@WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"), @WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"),
@WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"), @WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"),
@WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"), @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"),
@ -48,9 +49,11 @@ public class ListSFTP extends ListFileTransfer {
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SFTPTransfer.HOSTNAME); properties.add(HOSTNAME);
properties.add(SFTPTransfer.PORT); properties.add(port);
properties.add(SFTPTransfer.USERNAME); properties.add(SFTPTransfer.USERNAME);
properties.add(SFTPTransfer.PASSWORD); properties.add(SFTPTransfer.PASSWORD);
properties.add(SFTPTransfer.PRIVATE_KEY_PATH); properties.add(SFTPTransfer.PRIVATE_KEY_PATH);

View File

@ -61,7 +61,7 @@ public class EntityListing {
/** /**
* Sets the Collection containing the identifiers of all entities in the listing whose Timestamp was * Sets the Collection containing the identifiers of all entities in the listing whose Timestamp was
* equal to {@link #getLatestTimestamp()} * equal to {@link #getLatestTimestamp()}
* *
* @param matchingIdentifiers the identifiers that have last modified date matching the latest timestamp * @param matchingIdentifiers the identifiers that have last modified date matching the latest timestamp
*/ */
public void setMatchingIdentifiers(Collection<String> matchingIdentifiers) { public void setMatchingIdentifiers(Collection<String> matchingIdentifiers) {

View File

@ -135,7 +135,7 @@ public class FTPTransfer implements FileTransfer {
client.disconnect(); client.disconnect();
} }
} catch (final Exception ex) { } catch (final Exception ex) {
logger.warn("Failed to close FTPClient due to {}", new Object[] { ex.toString() }, ex); logger.warn("Failed to close FTPClient due to {}", new Object[] {ex.toString()}, ex);
} }
client = null; client = null;
} }
@ -334,9 +334,9 @@ public class FTPTransfer implements FileTransfer {
final boolean cdSuccessful = setWorkingDirectory(remoteDirectory); final boolean cdSuccessful = setWorkingDirectory(remoteDirectory);
if (!cdSuccessful) { if (!cdSuccessful) {
logger.debug("Remote Directory {} does not exist; creating it", new Object[] { remoteDirectory }); logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
if (client.makeDirectory(remoteDirectory)) { if (client.makeDirectory(remoteDirectory)) {
logger.debug("Created {}", new Object[] { remoteDirectory }); logger.debug("Created {}", new Object[] {remoteDirectory});
} else { } else {
throw new IOException("Failed to create remote directory " + remoteDirectory); throw new IOException("Failed to create remote directory " + remoteDirectory);
} }
@ -392,10 +392,10 @@ public class FTPTransfer implements FileTransfer {
final String time = outformat.format(fileModifyTime); final String time = outformat.format(fileModifyTime);
if (!client.setModificationTime(tempFilename, time)) { if (!client.setModificationTime(tempFilename, time)) {
// FTP server probably doesn't support MFMT command // FTP server probably doesn't support MFMT command
logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] { flowFile, lastModifiedTime }); logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] {flowFile, lastModifiedTime});
} }
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] { flowFile, lastModifiedTime, e }); logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {flowFile, lastModifiedTime, e});
} }
} }
final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue(); final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
@ -404,17 +404,17 @@ public class FTPTransfer implements FileTransfer {
int perms = numberPermissions(permissions); int perms = numberPermissions(permissions);
if (perms >= 0) { if (perms >= 0) {
if (!client.sendSiteCommand("chmod " + Integer.toOctalString(perms) + " " + tempFilename)) { if (!client.sendSiteCommand("chmod " + Integer.toOctalString(perms) + " " + tempFilename)) {
logger.warn("Could not set permission on {} to {}", new Object[] { flowFile, permissions }); logger.warn("Could not set permission on {} to {}", new Object[] {flowFile, permissions});
} }
} }
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to set permission on {} to {} due to {}", new Object[] { flowFile, permissions, e }); logger.error("Failed to set permission on {} to {} due to {}", new Object[] {flowFile, permissions, e});
} }
} }
if (!filename.equals(tempFilename)) { if (!filename.equals(tempFilename)) {
try { try {
logger.debug("Renaming remote path from {} to {} for {}", new Object[] { tempFilename, filename, flowFile }); logger.debug("Renaming remote path from {} to {} for {}", new Object[] {tempFilename, filename, flowFile});
final boolean renameSuccessful = client.rename(tempFilename, filename); final boolean renameSuccessful = client.rename(tempFilename, filename);
if (!renameSuccessful) { if (!renameSuccessful) {
throw new IOException("Failed to rename temporary file " + tempFilename + " to " + fullPath + " due to: " + client.getReplyString()); throw new IOException("Failed to rename temporary file " + tempFilename + " to " + fullPath + " due to: " + client.getReplyString());
@ -432,6 +432,16 @@ public class FTPTransfer implements FileTransfer {
return fullPath; return fullPath;
} }
@Override
public void rename(final String source, final String target) throws IOException {
final FTPClient client = getClient(null);
final boolean renameSuccessful = client.rename(source, target);
if (!renameSuccessful) {
throw new IOException("Failed to rename temporary file " + source + " to " + target + " due to: " + client.getReplyString());
}
}
@Override @Override
public void deleteFile(final String path, final String remoteFileName) throws IOException { public void deleteFile(final String path, final String remoteFileName) throws IOException {
final FTPClient client = getClient(null); final FTPClient client = getClient(null);

View File

@ -43,6 +43,8 @@ public interface FileTransfer extends Closeable {
String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException; String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException;
void rename(String source, String target) throws IOException;
void deleteFile(String path, String remoteFileName) throws IOException; void deleteFile(String path, String remoteFileName) throws IOException;
void deleteDirectory(String remoteDirectoryName) throws IOException; void deleteDirectory(String remoteDirectoryName) throws IOException;

View File

@ -603,6 +603,23 @@ public class SFTPTransfer implements FileTransfer {
return fullPath; return fullPath;
} }
@Override
public void rename(final String source, final String target) throws IOException {
final ChannelSftp sftp = getChannel(null);
try {
sftp.rename(source, target);
} catch (final SftpException e) {
switch (e.id) {
case ChannelSftp.SSH_FX_NO_SUCH_FILE:
throw new FileNotFoundException();
case ChannelSftp.SSH_FX_PERMISSION_DENIED:
throw new PermissionDeniedException("Could not rename remote file " + source + " to " + target + " due to insufficient permissions");
default:
throw new IOException(e);
}
}
}
protected int numberPermissions(String perms) { protected int numberPermissions(String perms) {
int number = -1; int number = -1;
final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$"); final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$");

View File

@ -17,7 +17,9 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.File; import java.io.File;
@ -92,8 +94,119 @@ public class TestFetchFileTransfer {
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
} }
@Test
public void testMoveFileWithNoTrailingSlashDirName() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved");
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
proc.fileContents.containsKey("/moved/hello.txt");
assertEquals(1, proc.fileContents.size());
}
@Test
public void testMoveFileWithTrailingSlashDirName() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
proc.fileContents.containsKey("/moved/hello.txt");
assertEquals(1, proc.fileContents.size());
}
@Test
public void testDeleteFile() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue());
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
assertTrue(proc.fileContents.isEmpty());
}
@Test
public void testDeleteFails() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue());
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
proc.allowDelete = false;
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
assertFalse(proc.fileContents.isEmpty());
}
@Test
public void testRenameFails() {
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
proc.addContent("hello.txt", "world".getBytes());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
proc.allowDelete = false;
proc.allowRename = false;
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
assertEquals(1, proc.fileContents.size());
assertTrue(proc.fileContents.containsKey("hello.txt"));
}
private static class TestableFetchFileTransfer extends FetchFileTransfer { private static class TestableFetchFileTransfer extends FetchFileTransfer {
private boolean allowAccess = true; private boolean allowAccess = true;
private boolean allowDelete = true;
private boolean allowRename = true;
private boolean closed = false; private boolean closed = false;
private final Map<String, byte[]> fileContents = new HashMap<>(); private final Map<String, byte[]> fileContents = new HashMap<>();
@ -154,6 +267,10 @@ public class TestFetchFileTransfer {
@Override @Override
public void deleteFile(String path, String remoteFileName) throws IOException { public void deleteFile(String path, String remoteFileName) throws IOException {
if (!allowDelete) {
throw new PermissionDeniedException("test permission denied");
}
if (!fileContents.containsKey(remoteFileName)) { if (!fileContents.containsKey(remoteFileName)) {
throw new FileNotFoundException(); throw new FileNotFoundException();
} }
@ -161,6 +278,20 @@ public class TestFetchFileTransfer {
fileContents.remove(remoteFileName); fileContents.remove(remoteFileName);
} }
@Override
public void rename(String source, String target) throws IOException {
if (!allowRename) {
throw new PermissionDeniedException("test permission denied");
}
if (!fileContents.containsKey(source)) {
throw new FileNotFoundException();
}
final byte[] content = fileContents.remove(source);
fileContents.put(target, content);
}
@Override @Override
public void deleteDirectory(String remoteDirectoryName) throws IOException { public void deleteDirectory(String remoteDirectoryName) throws IOException {