diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 2ae65b26cf..38a16e4228 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -42,11 +43,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.hadoop.util.HDFSListing; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.JsonParseException; -import org.codehaus.jackson.map.JsonMappingException; -import org.codehaus.jackson.map.ObjectMapper; +import org.apache.nifi.processor.util.StandardValidators; import java.io.File; import java.io.IOException; @@ -59,6 +56,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; @TriggerSerially @@ -105,6 +103,13 @@ public class ListHDFS extends AbstractHadoopProcessor { .defaultValue("true") .build(); + public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder() + .name("File Filter") + .description("Only files whose names match the given regular expression will be picked up") + .required(true) + .defaultValue("[^\\.].*") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -135,6 +140,7 @@ public class ListHDFS extends AbstractHadoopProcessor { props.add(DISTRIBUTED_CACHE_SERVICE); props.add(DIRECTORY); props.add(RECURSE_SUBDIRS); + props.add(FILE_FILTER); return props; } @@ -152,18 +158,12 @@ public class ListHDFS extends AbstractHadoopProcessor { @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { super.onPropertyModified(descriptor, oldValue, newValue); - if (isConfigurationRestored() && descriptor.equals(DIRECTORY)) { + if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) { latestTimestampEmitted = -1L; latestTimestampListed = -1L; } } - private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException { - final ObjectMapper mapper = new ObjectMapper(); - final JsonNode jsonNode = mapper.readTree(serializedState); - return mapper.readValue(jsonNode, HDFSListing.class); - } - /** * Determines which of the given FileStatus's describes a File that should be listed. * @@ -283,7 +283,7 @@ public class ListHDFS extends AbstractHadoopProcessor { final Set statuses; try { final Path rootPath = new Path(directory); - statuses = getStatuses(rootPath, recursive, hdfs); + statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context)); getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()}); } catch (final IOException | IllegalArgumentException e) { getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e}); @@ -326,17 +326,17 @@ public class ListHDFS extends AbstractHadoopProcessor { } } - private Set getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException { + private Set getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException { final Set statusSet = new HashSet<>(); getLogger().debug("Fetching listing for {}", new Object[] {path}); - final FileStatus[] statuses = hdfs.listStatus(path); + final FileStatus[] statuses = hdfs.listStatus(path, filter); for ( final FileStatus status : statuses ) { if ( status.isDirectory() ) { if ( recursive ) { try { - statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs)); + statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter)); } catch (final IOException ioe) { getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe}); } @@ -394,4 +394,14 @@ public class ListHDFS extends AbstractHadoopProcessor { return sb.toString(); } + + private PathFilter createPathFilter(final ProcessContext context) { + final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue()); + return new PathFilter() { + @Override + public boolean accept(Path path) { + return filePattern.matcher(path.getName()).matches(); + } + }; + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index 8f8699e142..bdb058e32d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -101,6 +101,23 @@ public class TestListHDFS { mff.assertAttributeEquals("filename", "testFile.txt"); } + @Test + public void testListingWithFilter() throws InterruptedException { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):substring(0,5)}"); + runner.setProperty(ListHDFS.FILE_FILTER, "[^test].*"); + + // first iteration will not pick up files because it has to instead check timestamps. + // We must then wait long enough to ensure that the listing can be performed safely and + // run the Processor again. + runner.run(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + } + @Test public void testListingWithInvalidELFunction() throws InterruptedException { runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):foo()}");