mirror of https://github.com/apache/nifi.git
NIFI-5719 Ensuring FetchFile routes to failure if the move completion strategy can't be completed
This closes #3088. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
5ec85299e7
commit
f3f7cdbab9
|
@ -236,6 +236,18 @@ public class FetchFile extends AbstractProcessor {
|
|||
return;
|
||||
}
|
||||
|
||||
if (!targetDir.exists()) {
|
||||
try {
|
||||
Files.createDirectories(targetDir.toPath());
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Could not fetch file {} from file system for {} because Completion Strategy is configured to move the original file to {}, "
|
||||
+ "but that directory does not exist and could not be created due to: {}",
|
||||
new Object[] {file, flowFile, targetDir, e.getMessage()}, e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
final String conflictStrategy = context.getProperty(CONFLICT_STRATEGY).getValue();
|
||||
|
||||
if (CONFLICT_FAIL.getValue().equalsIgnoreCase(conflictStrategy)) {
|
||||
|
|
|
@ -42,6 +42,8 @@ public class TestFetchFile {
|
|||
return;
|
||||
}
|
||||
|
||||
targetDir.setReadable(true);
|
||||
|
||||
for (final File file : targetDir.listFiles()) {
|
||||
Files.delete(file.toPath());
|
||||
}
|
||||
|
@ -110,6 +112,9 @@ public class TestFetchFile {
|
|||
runner.assertValid();
|
||||
|
||||
final File destDir = new File("target/move-target");
|
||||
destDir.mkdirs();
|
||||
assertTrue(destDir.exists());
|
||||
|
||||
final File destFile = new File(destDir, sourceFile.getName());
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
|
@ -135,6 +140,11 @@ public class TestFetchFile {
|
|||
runner.assertValid();
|
||||
|
||||
final File destDir = new File("target/move-target");
|
||||
if (destDir.exists()) {
|
||||
destDir.delete();
|
||||
}
|
||||
assertFalse(destDir.exists());
|
||||
|
||||
final File destFile = new File(destDir, sourceFile.getName());
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
|
@ -146,6 +156,69 @@ public class TestFetchFile {
|
|||
assertTrue(destFile.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveOnCompleteWithTargetExistsButNotWritable() throws IOException {
|
||||
final File sourceFile = new File("target/1.txt");
|
||||
final byte[] content = "Hello, World!".getBytes();
|
||||
Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
|
||||
runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
|
||||
runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_MOVE.getValue());
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, "target/move-target");
|
||||
runner.assertValid();
|
||||
|
||||
final File destDir = new File("target/move-target");
|
||||
if (!destDir.exists()) {
|
||||
destDir.mkdirs();
|
||||
}
|
||||
destDir.setWritable(false);
|
||||
|
||||
assertTrue(destDir.exists());
|
||||
assertFalse(destDir.canWrite());
|
||||
|
||||
final File destFile = new File(destDir, sourceFile.getName());
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(FetchFile.REL_FAILURE, 1);
|
||||
runner.getFlowFilesForRelationship(FetchFile.REL_FAILURE).get(0).assertContentEquals("");
|
||||
|
||||
assertTrue(sourceFile.exists());
|
||||
assertFalse(destFile.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveOnCompleteWithParentOfTargetDirNotAccessible() throws IOException {
|
||||
final File sourceFile = new File("target/1.txt");
|
||||
final byte[] content = "Hello, World!".getBytes();
|
||||
Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
|
||||
|
||||
final String moveTargetParent = "target/fetch-file";
|
||||
final String moveTarget = moveTargetParent + "/move-target";
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
|
||||
runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
|
||||
runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_MOVE.getValue());
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, moveTarget);
|
||||
runner.assertValid();
|
||||
|
||||
// Make the parent of move-target non-writable and non-readable
|
||||
final File moveTargetParentDir = new File(moveTargetParent);
|
||||
moveTargetParentDir.mkdirs();
|
||||
moveTargetParentDir.setReadable(false);
|
||||
moveTargetParentDir.setWritable(false);
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(FetchFile.REL_FAILURE, 1);
|
||||
runner.getFlowFilesForRelationship(FetchFile.REL_FAILURE).get(0).assertContentEquals("");
|
||||
|
||||
assertTrue(sourceFile.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveAndReplace() throws IOException {
|
||||
final File sourceFile = new File("target/1.txt");
|
||||
|
|
Loading…
Reference in New Issue