diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java new file mode 100644 index 0000000000..6775c6299d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.CopyOption; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"local", "files", "filesystem", "ingest", "ingress", "get", "source", "input"}) +@CapabilityDescription("Reads the contents of a file from disk and streams it into the contents of an incoming FlowFile. Once this is done, the file is optionally moved elsewhere or deleted " + + "to help keep the file system organized.") +@SeeAlso({GetFile.class, PutFile.class, ListFile.class}) +public class FetchFile extends AbstractProcessor { + static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is"); + static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Moves the file to the directory specified by the property"); + static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the file system"); + + static final AllowableValue CONFLICT_REPLACE = new AllowableValue("Replace File", "Replace File", "The newly ingested file should replace the existing file in the Destination Directory"); + static final AllowableValue CONFLICT_KEEP_INTACT = new AllowableValue("Keep Existing", "Keep Existing", "The existing file should in the Destination Directory should stay intact and the newly " + + "ingested file should be deleted"); + static final AllowableValue CONFLICT_FAIL = new AllowableValue("Fail", "Fail", "The existing destination file should remain intact and the incoming FlowFile should be routed to failure"); + static final AllowableValue CONFLICT_RENAME = new AllowableValue("Rename", "Rename", "The existing destination file should remain intact. The newly ingested file should be moved to the " + + "destination directory but be renamed to a random filename"); + + static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder() + .name("File to Fetch") + .description("The fully-qualified filename of the file to fetch from the file system") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("${path}/${filename}") + .required(true) + .build(); + static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder() + .name("Completion Strategy") + .description("Specifies what to do with the original file on the file system once it has been pulled into NiFi") + .expressionLanguageSupported(false) + .allowableValues(COMPLETION_NONE, COMPLETION_MOVE, COMPLETION_DELETE) + .defaultValue(COMPLETION_NONE.getValue()) + .required(true) + .build(); + static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder() + .name("Move Destination Directory") + .description("The directory to the move the original file to once it has been fetched from the file system. This property is ignored unless the Completion Strategy is set to \"Move File\". " + + "If the directory does not exist, it will be created.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + static final PropertyDescriptor CONFLICT_STRATEGY = new PropertyDescriptor.Builder() + .name("Move Conflict Strategy") + .description("If Completion Strategy is set to Move File and a file already exists in the destination directory with the same name, this property specifies " + + "how that naming conflict should be resolved") + .allowableValues(CONFLICT_RENAME, CONFLICT_REPLACE, CONFLICT_KEEP_INTACT, CONFLICT_FAIL) + .defaultValue(CONFLICT_RENAME.getValue()) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully fetched from the file system will be transferred to this Relationship.") + .build(); + static final Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not.found") + .description("Any FlowFile that could not be fetched from the file system because the file could not be found will be transferred to this Relationship.") + .build(); + static final Relationship REL_PERMISSION_DENIED = new Relationship.Builder() + .name("permission.denied") + .description("Any FlowFile that could not be fetched from the file system due to the user running NiFi not having sufficient permissions will be transferred to this Relationship.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description( + "Any FlowFile that could not be fetched from the file system for any reason other than insufficient permissions or the file not existing will be transferred to this Relationship.") + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(FILENAME); + properties.add(COMPLETION_STRATEGY); + properties.add(MOVE_DESTINATION_DIR); + properties.add(CONFLICT_STRATEGY); + return properties; + } + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_NOT_FOUND); + relationships.add(REL_PERMISSION_DENIED); + relationships.add(REL_FAILURE); + return relationships; + } + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final List results = new ArrayList<>(); + + if (COMPLETION_MOVE.getValue().equalsIgnoreCase(validationContext.getProperty(COMPLETION_STRATEGY).getValue())) { + if (!validationContext.getProperty(MOVE_DESTINATION_DIR).isSet()) { + results.add(new ValidationResult.Builder().subject(MOVE_DESTINATION_DIR.getName()).input(null).valid(false).explanation( + MOVE_DESTINATION_DIR.getName() + " must be specified if " + COMPLETION_STRATEGY.getName() + " is set to " + COMPLETION_MOVE.getDisplayName()).build()); + } + } + + return results; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final StopWatch stopWatch = new StopWatch(true); + final String filename = context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue(); + final File file = new File(filename); + + // Verify that file exists + if (!file.exists()) { + getLogger().error("Could not fetch file {} from file system for {} because the file does not exist; routing to not.found", new Object[] {file, flowFile}); + session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND); + session.transfer(session.penalize(flowFile), REL_NOT_FOUND); + return; + } + + // Verify read permission on file + final String user = System.getProperty("user.name"); + if (!isReadable(file)) { + getLogger().error("Could not fetch file {} from file system for {} due to user {} not having sufficient permissions to read the file; routing to permission.denied", + new Object[] {file, flowFile, user}); + session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED); + session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED); + return; + } + + // If configured to move the file and fail if unable to do so, check that the existing file does not exist and that we have write permissions + // for the parent file. + final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue(); + final String targetDirectoryName = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue(); + if (targetDirectoryName != null) { + final File targetDir = new File(targetDirectoryName); + if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) { + if (targetDir.exists() && (!isWritable(targetDir) || !isDirectory(targetDir))) { + getLogger().error("Could not fetch file {} from file system for {} because Completion Strategy is configured to move the original file to {}, " + + "but that is not a directory or user {} does not have permissions to write to that directory", + new Object[] {file, flowFile, targetDir, user}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String conflictStrategy = context.getProperty(CONFLICT_STRATEGY).getValue(); + + if (CONFLICT_FAIL.getValue().equalsIgnoreCase(conflictStrategy)) { + final File targetFile = new File(targetDir, file.getName()); + if (targetFile.exists()) { + getLogger().error("Could not fetch file {} from file system for {} because Completion Strategy is configured to move the original file to {}, " + + "but a file with name {} already exists in that directory and the Move Conflict Strategy is configured for failure", + new Object[] {file, flowFile, targetDir, file.getName()}); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + } + } + + // import content from file system + try (final FileInputStream fis = new FileInputStream(file)) { + flowFile = session.importFrom(fis, flowFile); + } catch (final IOException ioe) { + getLogger().error("Could not fetch file {} from file system for {} due to {}; routing to failure", new Object[] {file, flowFile, ioe.toString()}, ioe); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + session.getProvenanceReporter().modifyContent(flowFile, "Replaced content of FlowFile with contents of " + file.toURI(), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + + // It is critical that we commit the session before we perform the Completion Strategy. Otherwise, we could have a case where we + // ingest the file, delete/move the file, and then NiFi is restarted before the session is committed. That would result in data loss. + // As long as we commit the session right here, before we perform the Completion Strategy, we are safe. + session.commit(); + + // Attempt to perform the Completion Strategy action + Exception completionFailureException = null; + if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) { + // convert to path and use Files.delete instead of file.delete so that if we fail, we know why + try { + delete(file); + } catch (final IOException ioe) { + completionFailureException = ioe; + } + } else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) { + final File targetDirectory = new File(targetDirectoryName); + final File targetFile = new File(targetDirectory, file.getName()); + try { + if (targetFile.exists()) { + final String conflictStrategy = context.getProperty(CONFLICT_STRATEGY).getValue(); + if (CONFLICT_KEEP_INTACT.getValue().equalsIgnoreCase(conflictStrategy)) { + // don't move, just delete the original + Files.delete(file.toPath()); + } else if (CONFLICT_RENAME.getValue().equalsIgnoreCase(conflictStrategy)) { + // rename to add a random UUID but keep the file extension if it has one. + final String simpleFilename = targetFile.getName(); + final String newName; + if (simpleFilename.contains(".")) { + newName = StringUtils.substringBeforeLast(simpleFilename, ".") + "-" + UUID.randomUUID().toString() + "." + StringUtils.substringAfterLast(simpleFilename, "."); + } else { + newName = simpleFilename + "-" + UUID.randomUUID().toString(); + } + + move(file, new File(targetDirectory, newName), false); + } else if (CONFLICT_REPLACE.getValue().equalsIgnoreCase(conflictStrategy)) { + move(file, targetFile, true); + } + } else { + move(file, targetFile, false); + } + } catch (final IOException ioe) { + completionFailureException = ioe; + } + } + + // Handle completion failures + if (completionFailureException != null) { + getLogger().warn("Successfully fetched the content from {} for {} but failed to perform Completion Action due to {}; routing to success", + new Object[] {file, flowFile, completionFailureException}, completionFailureException); + } + } + + + // + // The following set of methods exist purely for testing purposes + // + protected void move(final File source, final File target, final boolean overwrite) throws IOException { + final File targetDirectory = target.getParentFile(); + + // convert to path and use Files.move instead of file.renameTo so that if we fail, we know why + final Path targetPath = target.toPath(); + if (!targetDirectory.exists()) { + Files.createDirectories(targetDirectory.toPath()); + } + + final CopyOption[] copyOptions = overwrite ? new CopyOption[] {StandardCopyOption.REPLACE_EXISTING} : new CopyOption[] {}; + Files.move(source.toPath(), targetPath, copyOptions); + } + + protected void delete(final File file) throws IOException { + Files.delete(file.toPath()); + } + + protected boolean isReadable(final File file) { + return file.canRead(); + } + + protected boolean isWritable(final File file) { + return file.canWrite(); + } + + protected boolean isDirectory(final File file) { + return file.isDirectory(); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java index ced79cddaa..74b6761bea 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java @@ -91,7 +91,7 @@ import org.apache.nifi.processor.util.StandardValidators; @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the file. May not work on all file systems"), @WritesAttribute(attribute = "absolute.path", description = "The full/absolute path from where a file was picked up. The current 'path' " + "attribute is still populated, but may be a relative path")}) -@SeeAlso(PutFile.class) +@SeeAlso({PutFile.class, FetchFile.class}) public class GetFile extends AbstractProcessor { public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java index f35d9be4da..0cb5e0172f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java @@ -86,7 +86,7 @@ import org.apache.nifi.processors.standard.util.FileInfo; "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " + "rw-rw-r--") }) -@SeeAlso({GetFile.class, PutFile.class}) +@SeeAlso({GetFile.class, PutFile.class, FetchFile.class}) public class ListFile extends AbstractListProcessor { public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java index 8c4b00faaa..d32427e086 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java @@ -57,7 +57,7 @@ import org.apache.nifi.util.StopWatch; @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"put", "local", "copy", "archive", "files", "filesystem"}) @CapabilityDescription("Writes the contents of a FlowFile to the local file system") -@SeeAlso(GetFile.class) +@SeeAlso({FetchFile.class, GetFile.class}) public class PutFile extends AbstractProcessor { public static final String REPLACE_RESOLUTION = "replace"; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index eb104312ef..56265a9dd7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -30,6 +30,7 @@ org.apache.nifi.processors.standard.ExecuteStreamCommand org.apache.nifi.processors.standard.ExecuteProcess org.apache.nifi.processors.standard.ExtractText org.apache.nifi.processors.standard.FetchSFTP +org.apache.nifi.processors.standard.FetchFile org.apache.nifi.processors.standard.GenerateFlowFile org.apache.nifi.processors.standard.GetFile org.apache.nifi.processors.standard.GetFTP @@ -43,11 +44,11 @@ org.apache.nifi.processors.standard.IdentifyMimeType org.apache.nifi.processors.standard.InvokeHTTP org.apache.nifi.processors.standard.GetJMSQueue org.apache.nifi.processors.standard.GetJMSTopic +org.apache.nifi.processors.standard.ListFile org.apache.nifi.processors.standard.ListenHTTP org.apache.nifi.processors.standard.ListenSyslog org.apache.nifi.processors.standard.ListenUDP org.apache.nifi.processors.standard.ListSFTP -org.apache.nifi.processors.standard.ListFile org.apache.nifi.processors.standard.LogAttribute org.apache.nifi.processors.standard.MergeContent org.apache.nifi.processors.standard.ModifyBytes diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java new file mode 100644 index 0000000000..0e69216dd3 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestFetchFile { + + @Before + public void prepDestDirectory() throws IOException { + final File targetDir = new File("target/move-target"); + if (!targetDir.exists()) { + Files.createDirectories(targetDir.toPath()); + return; + } + + for (final File file : targetDir.listFiles()) { + Files.delete(file.toPath()); + } + } + + @Test + public void testSimpleSuccess() throws IOException { + final File sourceFile = new File("target/1.txt"); + final byte[] content = "Hello, World!".getBytes(); + Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE); + + final TestRunner runner = TestRunners.newTestRunner(new FetchFile()); + runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath()); + runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_NONE.getValue()); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content); + + assertTrue(sourceFile.exists()); + } + + @Test + public void testDeleteOnComplete() throws IOException { + final File sourceFile = new File("target/1.txt"); + final byte[] content = "Hello, World!".getBytes(); + Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE); + + final TestRunner runner = TestRunners.newTestRunner(new FetchFile()); + runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath()); + runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_DELETE.getValue()); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content); + + assertFalse(sourceFile.exists()); + } + + @Test + public void testMoveOnCompleteWithTargetDirExisting() throws IOException { + final File sourceFile = new File("target/1.txt"); + final byte[] content = "Hello, World!".getBytes(); + Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE); + + final TestRunner runner = TestRunners.newTestRunner(new FetchFile()); + runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath()); + runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_MOVE.getValue()); + runner.assertNotValid(); + runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, "target/move-target"); + runner.assertValid(); + + final File destDir = new File("target/move-target"); + final File destFile = new File(destDir, sourceFile.getName()); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content); + + assertFalse(sourceFile.exists()); + assertTrue(destFile.exists()); + } + + @Test + public void testMoveOnCompleteWithTargetDirMissing() throws IOException { + final File sourceFile = new File("target/1.txt"); + final byte[] content = "Hello, World!".getBytes(); + Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE); + + final TestRunner runner = TestRunners.newTestRunner(new FetchFile()); + runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath()); + runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_MOVE.getValue()); + runner.assertNotValid(); + runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, "target/move-target"); + runner.assertValid(); + + final File destDir = new File("target/move-target"); + final File destFile = new File(destDir, sourceFile.getName()); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content); + + assertFalse(sourceFile.exists()); + assertTrue(destFile.exists()); + } + + @Test + public void testMoveAndReplace() throws IOException { + final File sourceFile = new File("target/1.txt"); + final byte[] content = "Hello, World!".getBytes(); + Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE); + + final TestRunner runner = TestRunners.newTestRunner(new FetchFile()); + runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath()); + runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_MOVE.getValue()); + runner.assertNotValid(); + runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, "target/move-target"); + runner.setProperty(FetchFile.CONFLICT_STRATEGY, FetchFile.CONFLICT_REPLACE.getValue()); + runner.assertValid(); + + final File destDir = new File("target/move-target"); + final File destFile = new File(destDir, sourceFile.getName()); + Files.write(destFile.toPath(), "Good-bye".getBytes(), StandardOpenOption.CREATE); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content); + + final byte[] replacedContent = Files.readAllBytes(destFile.toPath()); + assertTrue(Arrays.equals(content, replacedContent)); + assertFalse(sourceFile.exists()); + assertTrue(destFile.exists()); + } + + @Test + public void testMoveAndKeep() throws IOException { + final File sourceFile = new File("target/1.txt"); + final byte[] content = "Hello, World!".getBytes(); + Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE); + + final TestRunner runner = TestRunners.newTestRunner(new FetchFile()); + runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath()); + runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_MOVE.getValue()); + runner.assertNotValid(); + runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, "target/move-target"); + runner.setProperty(FetchFile.CONFLICT_STRATEGY, FetchFile.CONFLICT_KEEP_INTACT.getValue()); + runner.assertValid(); + + final File destDir = new File("target/move-target"); + final File destFile = new File(destDir, sourceFile.getName()); + + final byte[] goodBye = "Good-bye".getBytes(); + Files.write(destFile.toPath(), goodBye); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content); + + final byte[] replacedContent = Files.readAllBytes(destFile.toPath()); + assertTrue(Arrays.equals(goodBye, replacedContent)); + assertFalse(sourceFile.exists()); + assertTrue(destFile.exists()); + } + + @Test + public void testMoveAndFail() throws IOException { + final File sourceFile = new File("target/1.txt"); + final byte[] content = "Hello, World!".getBytes(); + Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE); + + final TestRunner runner = TestRunners.newTestRunner(new FetchFile()); + runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath()); + runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_MOVE.getValue()); + runner.assertNotValid(); + runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, "target/move-target"); + runner.setProperty(FetchFile.CONFLICT_STRATEGY, FetchFile.CONFLICT_FAIL.getValue()); + runner.assertValid(); + + final File destDir = new File("target/move-target"); + final File destFile = new File(destDir, sourceFile.getName()); + + final byte[] goodBye = "Good-bye".getBytes(); + Files.write(destFile.toPath(), goodBye); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_FAILURE, 1); + + final byte[] replacedContent = Files.readAllBytes(destFile.toPath()); + assertTrue(Arrays.equals(goodBye, replacedContent)); + assertTrue(sourceFile.exists()); + assertTrue(destFile.exists()); + } + + + @Test + public void testMoveAndRename() throws IOException { + final File sourceFile = new File("target/1.txt"); + final byte[] content = "Hello, World!".getBytes(); + Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE); + + final TestRunner runner = TestRunners.newTestRunner(new FetchFile()); + runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath()); + runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_MOVE.getValue()); + runner.assertNotValid(); + runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, "target/move-target"); + runner.setProperty(FetchFile.CONFLICT_STRATEGY, FetchFile.CONFLICT_RENAME.getValue()); + runner.assertValid(); + + final File destDir = new File("target/move-target"); + final File destFile = new File(destDir, sourceFile.getName()); + + final byte[] goodBye = "Good-bye".getBytes(); + Files.write(destFile.toPath(), goodBye); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + + final byte[] replacedContent = Files.readAllBytes(destFile.toPath()); + assertTrue(Arrays.equals(goodBye, replacedContent)); + assertFalse(sourceFile.exists()); + assertTrue(destFile.exists()); + + assertEquals(2, destDir.list().length); + } +}