diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index 8a7fadec1c..e5924832bb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -70,23 +70,25 @@ import org.codehaus.jackson.map.ObjectMapper;
*
*
* In order to make use of this abstract class, the entities listed must meet the following criteria:
- *
- * -
- * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
- * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
- *
- * -
- * Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is
- * new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
- * than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's
- * identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been
- * seen already.
- *
- * -
- * Entity must have a user-readable name that can be used for logging purposes.
- *
*
*
+ *
+ * -
+ * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
+ * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
+ *
+ * -
+ * Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is
+ * new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
+ * than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's
+ * identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been
+ * seen already.
+ *
+ * -
+ * Entity must have a user-readable name that can be used for logging purposes.
+ *
+ *
+ *
*
* This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the remote system. This is performed using
* two different mechanisms. First, state is stored locally. This allows the system to be restarted and begin processing where it left off. The state that is
@@ -111,6 +113,7 @@ import org.codehaus.jackson.map.ObjectMapper;
*
*
* Subclasses are responsible for the following:
+ *
*
*
* -
@@ -134,7 +137,6 @@ import org.codehaus.jackson.map.ObjectMapper;
* a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
*
*
- *
*/
@TriggerSerially
public abstract class AbstractListProcessor extends AbstractProcessor {
@@ -372,8 +374,8 @@ public abstract class AbstractListProcessor extends Ab
int listCount = 0;
Long latestListingTimestamp = null;
for (final T entity : entityList) {
- final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp ||
- (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier())));
+ final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp
+ || (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier())));
// Create the FlowFile for this path.
if (list) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index 5eecac32fb..ab0be78b58 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -31,7 +31,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -49,9 +51,18 @@ import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
/**
- * A base class for FetchSFTP, FetchFTP processors
+ * A base class for FetchSFTP, FetchFTP processors.
+ *
+ * Note that implementations of this class should never use the @SupportsBatching annotation! Doing so
+ * could result in data loss!
*/
public abstract class FetchFileTransfer extends AbstractProcessor {
+
+ static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is");
+ static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the property");
+ static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system");
+
+
static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The fully-qualified hostname or IP address of the host to fetch the data from")
@@ -73,13 +84,25 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
- public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder()
- .name("Delete Original")
- .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
- .defaultValue("true")
- .allowableValues("true", "false")
+ static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Completion Strategy")
+ .description("Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be "
+ + "logged but the data will still be transferred.")
+ .expressionLanguageSupported(false)
+ .allowableValues(COMPLETION_NONE, COMPLETION_MOVE, COMPLETION_DELETE)
+ .defaultValue(COMPLETION_NONE.getValue())
.required(true)
.build();
+ static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
+ .name("Move Destination Directory")
+ .description("The directory on the remote server to the move the original file to once it has been ingested into NiFi. "
+ + "This property is ignored unless the Completion Strategy is set to \"Move File\". The specified directory must already exist on"
+ + "the remote system, or the rename will fail.")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -156,7 +179,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
properties.add(HOSTNAME);
properties.add(UNDEFAULTED_PORT);
properties.add(REMOTE_FILENAME);
- properties.add(DELETE_ORIGINAL);
+ properties.add(COMPLETION_STRATEGY);
+ properties.add(MOVE_DESTINATION_DIR);
return properties;
}
@@ -203,6 +227,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
final InputStream in;
try {
in = transfer.getInputStream(filename, flowFile);
+
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
@@ -250,15 +275,34 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
- // delete remote file is necessary
- final boolean deleteOriginal = context.getProperty(DELETE_ORIGINAL).asBoolean();
- if (deleteOriginal) {
+ // it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
+ // we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would
+ // result in data loss! If we commit the session first, we are safe.
+ session.commit();
+
+ final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();
+ if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
try {
transfer.deleteFile(null, filename);
} catch (final FileNotFoundException e) {
// file doesn't exist -- effectively the same as removing it. Move on.
} catch (final IOException ioe) {
- getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", new Object[] {flowFile, host, port, filename, ioe}, ioe);
+ getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}",
+ new Object[] {flowFile, host, port, filename, ioe}, ioe);
+ }
+ } else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
+ String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
+ if (!targetDir.endsWith("/")) {
+ targetDir = targetDir + "/";
+ }
+ final String simpleFilename = StringUtils.substringAfterLast(filename, "/");
+ final String target = targetDir + simpleFilename;
+
+ try {
+ transfer.rename(filename, target);
+ } catch (final IOException ioe) {
+ getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
+ new Object[] {flowFile, host, port, filename, ioe}, ioe);
}
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index 6387e192b5..ad81c83db8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -35,8 +34,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
-
-@SupportsBatching
+// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.
@Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
@CapabilityDescription("Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.")
@SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class})
@@ -50,15 +48,18 @@ public class FetchSFTP extends FetchFileTransfer {
@Override
protected List getSupportedPropertyDescriptors() {
+ final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
+
final List properties = new ArrayList<>();
- properties.add(FetchFileTransfer.HOSTNAME);
- properties.add(SFTPTransfer.PORT);
+ properties.add(HOSTNAME);
+ properties.add(port);
properties.add(SFTPTransfer.USERNAME);
properties.add(SFTPTransfer.PASSWORD);
properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
- properties.add(FetchFileTransfer.REMOTE_FILENAME);
- properties.add(SFTPTransfer.DELETE_ORIGINAL);
+ properties.add(REMOTE_FILENAME);
+ properties.add(COMPLETION_STRATEGY);
+ properties.add(MOVE_DESTINATION_DIR);
properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
properties.add(SFTPTransfer.DATA_TIMEOUT);
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
index d6e1cd1ea1..1176fd0e53 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -38,6 +38,13 @@ public abstract class ListFileTransfer extends AbstractListProcessor {
.required(true)
.expressionLanguageSupported(true)
.build();
+ static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder()
+ .name("Port")
+ .description("The port to connect to on the remote host to fetch the data from")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .required(true)
+ .build();
public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
.name("Remote Path")
.description("The path on the remote system from which to pull or push files")
@@ -52,6 +59,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor {
protected Map createAttributes(final FileInfo fileInfo, final ProcessContext context) {
final Map attributes = new HashMap<>();
attributes.put(getProtocolName() + ".remote.host", context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+ attributes.put(getProtocolName() + ".remote.port", context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions().getValue());
attributes.put("file.owner", fileInfo.getOwner());
attributes.put("file.group", fileInfo.getGroup());
attributes.put("file.permissions", fileInfo.getPermissions());
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index 3b6b69efd7..925e5f8e84 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -38,6 +38,7 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
@SeeAlso({FetchSFTP.class, GetSFTP.class, PutSFTP.class})
@WritesAttributes({
@WritesAttribute(attribute = "sftp.remote.host", description = "The hostname of the SFTP Server"),
+ @WritesAttribute(attribute = "sftp.remote.port", description = "The port that was connected to on the SFTP Server"),
@WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"),
@WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"),
@WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"),
@@ -48,9 +49,11 @@ public class ListSFTP extends ListFileTransfer {
@Override
protected List getSupportedPropertyDescriptors() {
+ final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
+
final List properties = new ArrayList<>();
- properties.add(SFTPTransfer.HOSTNAME);
- properties.add(SFTPTransfer.PORT);
+ properties.add(HOSTNAME);
+ properties.add(port);
properties.add(SFTPTransfer.USERNAME);
properties.add(SFTPTransfer.PASSWORD);
properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
index 56489f0b09..2d9525f5b0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
@@ -61,7 +61,7 @@ public class EntityListing {
/**
* Sets the Collection containing the identifiers of all entities in the listing whose Timestamp was
* equal to {@link #getLatestTimestamp()}
- *
+ *
* @param matchingIdentifiers the identifiers that have last modified date matching the latest timestamp
*/
public void setMatchingIdentifiers(Collection matchingIdentifiers) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index 7f659d4ccb..a038eb7c50 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -135,7 +135,7 @@ public class FTPTransfer implements FileTransfer {
client.disconnect();
}
} catch (final Exception ex) {
- logger.warn("Failed to close FTPClient due to {}", new Object[] { ex.toString() }, ex);
+ logger.warn("Failed to close FTPClient due to {}", new Object[] {ex.toString()}, ex);
}
client = null;
}
@@ -334,9 +334,9 @@ public class FTPTransfer implements FileTransfer {
final boolean cdSuccessful = setWorkingDirectory(remoteDirectory);
if (!cdSuccessful) {
- logger.debug("Remote Directory {} does not exist; creating it", new Object[] { remoteDirectory });
+ logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
if (client.makeDirectory(remoteDirectory)) {
- logger.debug("Created {}", new Object[] { remoteDirectory });
+ logger.debug("Created {}", new Object[] {remoteDirectory});
} else {
throw new IOException("Failed to create remote directory " + remoteDirectory);
}
@@ -392,10 +392,10 @@ public class FTPTransfer implements FileTransfer {
final String time = outformat.format(fileModifyTime);
if (!client.setModificationTime(tempFilename, time)) {
// FTP server probably doesn't support MFMT command
- logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] { flowFile, lastModifiedTime });
+ logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] {flowFile, lastModifiedTime});
}
} catch (final Exception e) {
- logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] { flowFile, lastModifiedTime, e });
+ logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {flowFile, lastModifiedTime, e});
}
}
final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
@@ -404,17 +404,17 @@ public class FTPTransfer implements FileTransfer {
int perms = numberPermissions(permissions);
if (perms >= 0) {
if (!client.sendSiteCommand("chmod " + Integer.toOctalString(perms) + " " + tempFilename)) {
- logger.warn("Could not set permission on {} to {}", new Object[] { flowFile, permissions });
+ logger.warn("Could not set permission on {} to {}", new Object[] {flowFile, permissions});
}
}
} catch (final Exception e) {
- logger.error("Failed to set permission on {} to {} due to {}", new Object[] { flowFile, permissions, e });
+ logger.error("Failed to set permission on {} to {} due to {}", new Object[] {flowFile, permissions, e});
}
}
if (!filename.equals(tempFilename)) {
try {
- logger.debug("Renaming remote path from {} to {} for {}", new Object[] { tempFilename, filename, flowFile });
+ logger.debug("Renaming remote path from {} to {} for {}", new Object[] {tempFilename, filename, flowFile});
final boolean renameSuccessful = client.rename(tempFilename, filename);
if (!renameSuccessful) {
throw new IOException("Failed to rename temporary file " + tempFilename + " to " + fullPath + " due to: " + client.getReplyString());
@@ -432,6 +432,16 @@ public class FTPTransfer implements FileTransfer {
return fullPath;
}
+
+ @Override
+ public void rename(final String source, final String target) throws IOException {
+ final FTPClient client = getClient(null);
+ final boolean renameSuccessful = client.rename(source, target);
+ if (!renameSuccessful) {
+ throw new IOException("Failed to rename temporary file " + source + " to " + target + " due to: " + client.getReplyString());
+ }
+ }
+
@Override
public void deleteFile(final String path, final String remoteFileName) throws IOException {
final FTPClient client = getClient(null);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index fe277dfb08..8d48de25a6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -43,6 +43,8 @@ public interface FileTransfer extends Closeable {
String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException;
+ void rename(String source, String target) throws IOException;
+
void deleteFile(String path, String remoteFileName) throws IOException;
void deleteDirectory(String remoteDirectoryName) throws IOException;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index c28f275a9f..9bad52073c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -603,6 +603,23 @@ public class SFTPTransfer implements FileTransfer {
return fullPath;
}
+ @Override
+ public void rename(final String source, final String target) throws IOException {
+ final ChannelSftp sftp = getChannel(null);
+ try {
+ sftp.rename(source, target);
+ } catch (final SftpException e) {
+ switch (e.id) {
+ case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+ throw new FileNotFoundException();
+ case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+ throw new PermissionDeniedException("Could not rename remote file " + source + " to " + target + " due to insufficient permissions");
+ default:
+ throw new IOException(e);
+ }
+ }
+ }
+
protected int numberPermissions(String perms) {
int number = -1;
final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$");
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
index 7aa8f9c1bc..4175a77199 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
@@ -17,7 +17,9 @@
package org.apache.nifi.processors.standard;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.File;
@@ -92,8 +94,119 @@ public class TestFetchFileTransfer {
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
}
+
+ @Test
+ public void testMoveFileWithNoTrailingSlashDirName() {
+ final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+ runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+ runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+ runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
+ runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved");
+
+ proc.addContent("hello.txt", "world".getBytes());
+ final Map attrs = new HashMap<>();
+ attrs.put("filename", "hello.txt");
+ runner.enqueue(new byte[0], attrs);
+
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+
+ proc.fileContents.containsKey("/moved/hello.txt");
+ assertEquals(1, proc.fileContents.size());
+ }
+
+ @Test
+ public void testMoveFileWithTrailingSlashDirName() {
+ final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+ runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+ runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+ runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
+ runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
+
+ proc.addContent("hello.txt", "world".getBytes());
+ final Map attrs = new HashMap<>();
+ attrs.put("filename", "hello.txt");
+ runner.enqueue(new byte[0], attrs);
+
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+
+ proc.fileContents.containsKey("/moved/hello.txt");
+ assertEquals(1, proc.fileContents.size());
+ }
+
+ @Test
+ public void testDeleteFile() {
+ final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+ runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+ runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+ runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue());
+
+ proc.addContent("hello.txt", "world".getBytes());
+ final Map attrs = new HashMap<>();
+ attrs.put("filename", "hello.txt");
+ runner.enqueue(new byte[0], attrs);
+
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+ assertTrue(proc.fileContents.isEmpty());
+ }
+
+ @Test
+ public void testDeleteFails() {
+ final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+ runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+ runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+ runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue());
+
+ proc.addContent("hello.txt", "world".getBytes());
+ final Map attrs = new HashMap<>();
+ attrs.put("filename", "hello.txt");
+ runner.enqueue(new byte[0], attrs);
+ proc.allowDelete = false;
+
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+ assertFalse(proc.fileContents.isEmpty());
+ }
+
+ @Test
+ public void testRenameFails() {
+ final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+ runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+ runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+ runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
+ runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
+
+ proc.addContent("hello.txt", "world".getBytes());
+ final Map attrs = new HashMap<>();
+ attrs.put("filename", "hello.txt");
+ runner.enqueue(new byte[0], attrs);
+ proc.allowDelete = false;
+ proc.allowRename = false;
+
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+ assertEquals(1, proc.fileContents.size());
+
+ assertTrue(proc.fileContents.containsKey("hello.txt"));
+ }
+
+
private static class TestableFetchFileTransfer extends FetchFileTransfer {
private boolean allowAccess = true;
+ private boolean allowDelete = true;
+ private boolean allowRename = true;
private boolean closed = false;
private final Map fileContents = new HashMap<>();
@@ -154,6 +267,10 @@ public class TestFetchFileTransfer {
@Override
public void deleteFile(String path, String remoteFileName) throws IOException {
+ if (!allowDelete) {
+ throw new PermissionDeniedException("test permission denied");
+ }
+
if (!fileContents.containsKey(remoteFileName)) {
throw new FileNotFoundException();
}
@@ -161,6 +278,20 @@ public class TestFetchFileTransfer {
fileContents.remove(remoteFileName);
}
+ @Override
+ public void rename(String source, String target) throws IOException {
+ if (!allowRename) {
+ throw new PermissionDeniedException("test permission denied");
+ }
+
+ if (!fileContents.containsKey(source)) {
+ throw new FileNotFoundException();
+ }
+
+ final byte[] content = fileContents.remove(source);
+ fileContents.put(target, content);
+ }
+
@Override
public void deleteDirectory(String remoteDirectoryName) throws IOException {