mirror of https://github.com/apache/nifi.git
NIFI-2928 added support to validate accessibility of the file system
NIFI-2928 polishing This closes #1160
This commit is contained in:
parent
60da897e10
commit
ed035f85e3
|
@ -183,8 +183,13 @@ public class FetchFile extends AbstractProcessor {
|
||||||
final LogLevel levelPermDenied = LogLevel.valueOf(context.getProperty(PERM_DENIED_LOG_LEVEL).getValue());
|
final LogLevel levelPermDenied = LogLevel.valueOf(context.getProperty(PERM_DENIED_LOG_LEVEL).getValue());
|
||||||
final File file = new File(filename);
|
final File file = new File(filename);
|
||||||
|
|
||||||
// Verify that file exists
|
// Verify that file system is reachable and file exists
|
||||||
if (!file.exists()) {
|
Path filePath = file.toPath();
|
||||||
|
if (!Files.exists(filePath) && !Files.notExists(filePath)){ // see https://docs.oracle.com/javase/tutorial/essential/io/check.html for more details
|
||||||
|
getLogger().log(levelFileNotFound, "Could not fetch file {} from file system for {} because the existence of the file cannot be verified; routing to failure", new Object[] {file, flowFile});
|
||||||
|
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||||
|
return;
|
||||||
|
} else if (!Files.exists(filePath)) {
|
||||||
getLogger().log(levelFileNotFound, "Could not fetch file {} from file system for {} because the file does not exist; routing to not.found", new Object[] {file, flowFile});
|
getLogger().log(levelFileNotFound, "Could not fetch file {} from file system for {} because the file does not exist; routing to not.found", new Object[] {file, flowFile});
|
||||||
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
|
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
|
||||||
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
|
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
|
||||||
|
|
|
@ -47,6 +47,19 @@ public class TestFetchFile {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void notFound() throws IOException {
|
||||||
|
final File sourceFile = new File("notFound");
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
|
||||||
|
runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
|
||||||
|
runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_NONE.getValue());
|
||||||
|
|
||||||
|
runner.enqueue(new byte[0]);
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(FetchFile.REL_NOT_FOUND, 1);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleSuccess() throws IOException {
|
public void testSimpleSuccess() throws IOException {
|
||||||
final File sourceFile = new File("target/1.txt");
|
final File sourceFile = new File("target/1.txt");
|
||||||
|
|
Loading…
Reference in New Issue