From f3f7cdbab9248ba13c00d0e27a398e820e9ec7ed Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Wed, 17 Oct 2018 11:28:29 -0400 Subject: [PATCH] NIFI-5719 Ensuring FetchFile routes to failure if the move completion strategy can't be completed This closes #3088. Signed-off-by: Koji Kawamura --- .../nifi/processors/standard/FetchFile.java | 12 +++ .../processors/standard/TestFetchFile.java | 73 +++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java index 3bf3f5200a..b929f0772e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java @@ -236,6 +236,18 @@ public class FetchFile extends AbstractProcessor { return; } + if (!targetDir.exists()) { + try { + Files.createDirectories(targetDir.toPath()); + } catch (Exception e) { + getLogger().error("Could not fetch file {} from file system for {} because Completion Strategy is configured to move the original file to {}, " + + "but that directory does not exist and could not be created due to: {}", + new Object[] {file, flowFile, targetDir, e.getMessage()}, e); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + final String conflictStrategy = context.getProperty(CONFLICT_STRATEGY).getValue(); if (CONFLICT_FAIL.getValue().equalsIgnoreCase(conflictStrategy)) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java index cfced0a551..dcafefeb16 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java @@ -42,6 +42,8 @@ public class TestFetchFile { return; } + targetDir.setReadable(true); + for (final File file : targetDir.listFiles()) { Files.delete(file.toPath()); } @@ -110,6 +112,9 @@ public class TestFetchFile { runner.assertValid(); final File destDir = new File("target/move-target"); + destDir.mkdirs(); + assertTrue(destDir.exists()); + final File destFile = new File(destDir, sourceFile.getName()); runner.enqueue(new byte[0]); @@ -135,6 +140,11 @@ public class TestFetchFile { runner.assertValid(); final File destDir = new File("target/move-target"); + if (destDir.exists()) { + destDir.delete(); + } + assertFalse(destDir.exists()); + final File destFile = new File(destDir, sourceFile.getName()); runner.enqueue(new byte[0]); @@ -146,6 +156,69 @@ public class TestFetchFile { assertTrue(destFile.exists()); } + @Test + public void testMoveOnCompleteWithTargetExistsButNotWritable() throws IOException { + final File sourceFile = new File("target/1.txt"); + final byte[] content = "Hello, World!".getBytes(); + Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE); + + final TestRunner runner = TestRunners.newTestRunner(new FetchFile()); + runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath()); + runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_MOVE.getValue()); + runner.assertNotValid(); + runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, "target/move-target"); + runner.assertValid(); + + final File destDir = new File("target/move-target"); + if (!destDir.exists()) { + destDir.mkdirs(); + } + destDir.setWritable(false); + + assertTrue(destDir.exists()); + assertFalse(destDir.canWrite()); + + final File destFile = new File(destDir, sourceFile.getName()); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_FAILURE, 1); + runner.getFlowFilesForRelationship(FetchFile.REL_FAILURE).get(0).assertContentEquals(""); + + assertTrue(sourceFile.exists()); + assertFalse(destFile.exists()); + } + + @Test + public void testMoveOnCompleteWithParentOfTargetDirNotAccessible() throws IOException { + final File sourceFile = new File("target/1.txt"); + final byte[] content = "Hello, World!".getBytes(); + Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE); + + final String moveTargetParent = "target/fetch-file"; + final String moveTarget = moveTargetParent + "/move-target"; + + final TestRunner runner = TestRunners.newTestRunner(new FetchFile()); + runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath()); + runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_MOVE.getValue()); + runner.assertNotValid(); + runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, moveTarget); + runner.assertValid(); + + // Make the parent of move-target non-writable and non-readable + final File moveTargetParentDir = new File(moveTargetParent); + moveTargetParentDir.mkdirs(); + moveTargetParentDir.setReadable(false); + moveTargetParentDir.setWritable(false); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_FAILURE, 1); + runner.getFlowFilesForRelationship(FetchFile.REL_FAILURE).get(0).assertContentEquals(""); + + assertTrue(sourceFile.exists()); + } + @Test public void testMoveAndReplace() throws IOException { final File sourceFile = new File("target/1.txt");