From dc67bd2fdd762e48075dfa5edbe1a427025c6576 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 9 Jan 2018 17:45:51 +0100 Subject: [PATCH] NIFI-4747 - Removed directory existence check in GetHDFS This closes #2391 Signed-off-by: Jeremy Dyer --- .../nifi/processors/hadoop/GetHDFS.java | 21 +++++++++---------- .../nifi/processors/hadoop/GetHDFSTest.java | 11 ++++++++++ 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 1aefc75ff6..27b7375b8b 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -238,11 +238,6 @@ public class GetHDFS extends AbstractHadoopProcessor { abstractOnScheduled(context); // copy configuration values to pass them around cleanly processorConfig = new ProcessorConfiguration(context); - final FileSystem fs = getFileSystem(); - final Path dir = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()); - if (!fs.exists(dir)) { - throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist."); - } // forget the state of the queue in case HDFS contents changed while this processor was turned off queueLock.lock(); @@ -422,8 +417,16 @@ public class GetHDFS extends AbstractHadoopProcessor { if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) { try { final FileSystem hdfs = getFileSystem(); - // get listing - listing = selectFiles(hdfs, new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()), null); + final Path directoryPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()); + + if (!hdfs.exists(directoryPath)) { + context.yield(); + getLogger().warn("The directory {} does not exist.", new Object[]{directoryPath}); + } else { + // get listing + listing = selectFiles(hdfs, directoryPath, null); + } + lastPollTime.set(System.currentTimeMillis()); } finally { listingLock.unlock(); @@ -447,10 +450,6 @@ public class GetHDFS extends AbstractHadoopProcessor { filesVisited = new HashSet<>(); } - if (!hdfs.exists(dir)) { - throw new IOException("Selection directory " + dir.toString() + " doesn't appear to exist!"); - } - final Set files = new HashSet<>(); FileStatus[] fileStatuses = getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.listStatus(dir)); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java index 40666d9cc4..d3837a85d9 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java @@ -142,6 +142,17 @@ public class GetHDFSTest { } } + @Test + public void testDirectoryDoesNotExist() { + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutHDFS.DIRECTORY, "does/not/exist/${now():format('yyyyMMdd')}"); + runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); + runner.run(); + List flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS); + assertEquals(0, flowFiles.size()); + } + @Test public void testAutomaticDecompression() throws IOException { GetHDFS proc = new TestableGetHDFS(kerberosProperties);