diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 612247dbc1..a147730d86 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -33,6 +33,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -119,6 +120,35 @@ public class ListHDFS extends AbstractHadoopProcessor { .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); + private static final String FILTER_MODE_DIRECTORIES_AND_FILES = "filter-mode-directories-and-files"; + private static final String FILTER_MODE_FILES_ONLY = "filter-mode-files-only"; + private static final String FILTER_MODE_FULL_PATH = "filter-mode-full-path"; + static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES, + "Directories and Files", + "Filtering will be applied to the names of directories and files. If " + RECURSE_SUBDIRS.getName() + + " is set to true, only subdirectories with a matching name will be searched for files that match " + + "the regular expression defined in " + FILE_FILTER.getName() + "."); + static final AllowableValue FILTER_FILES_ONLY_VALUE = new AllowableValue(FILTER_MODE_FILES_ONLY, + "Files Only", + "Filtering will only be applied to the names of files. If " + RECURSE_SUBDIRS.getName() + + " is set to true, the entire subdirectory tree will be searched for files that match " + + "the regular expression defined in " + FILE_FILTER.getName() + "."); + static final AllowableValue FILTER_FULL_PATH_VALUE = new AllowableValue(FILTER_MODE_FULL_PATH, + "Full Path", + "Filtering will be applied to the full path of files. If " + RECURSE_SUBDIRS.getName() + + " is set to true, the entire subdirectory tree will be searched for files in which the full path of " + + "the file matches the regular expression defined in " + FILE_FILTER.getName() + "."); + + public static final PropertyDescriptor FILE_FILTER_MODE = new PropertyDescriptor.Builder() + .name("file-filter-mode") + .displayName("File Filter Mode") + .description("Determines how the regular expression in " + FILE_FILTER.getName() + " will be used when retrieving listings.") + .required(true) + .allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE, FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE) + .defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue()) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder() .name("minimum-file-age") .displayName("Minimum File Age") @@ -167,6 +197,7 @@ public class ListHDFS extends AbstractHadoopProcessor { props.add(DIRECTORY); props.add(RECURSE_SUBDIRS); props.add(FILE_FILTER); + props.add(FILE_FILTER_MODE); props.add(MIN_AGE); props.add(MAX_AGE); return props; @@ -340,11 +371,12 @@ public class ListHDFS extends AbstractHadoopProcessor { // Pull in any file that is newer than the timestamp that we have. final FileSystem hdfs = getFileSystem(); final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); + String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue(); final Set statuses; try { final Path rootPath = new Path(directory); - statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context)); + statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode); getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()}); } catch (final IOException | IllegalArgumentException e) { getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e}); @@ -391,29 +423,58 @@ public class ListHDFS extends AbstractHadoopProcessor { } } - private Set getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException, InterruptedException { + private Set getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) throws IOException, InterruptedException { final Set statusSet = new HashSet<>(); getLogger().debug("Fetching listing for {}", new Object[] {path}); - final FileStatus[] statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.listStatus(path, filter)); + final FileStatus[] statuses; + if (isPostListingFilterNeeded(filterMode)) { + // For this filter mode, the filter is not passed to listStatus, so that directory names will not be + // filtered out when the listing is recursive. + statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.listStatus(path)); + } else { + statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.listStatus(path, filter)); + } for ( final FileStatus status : statuses ) { if ( status.isDirectory() ) { if ( recursive ) { try { - statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter)); + statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter, filterMode)); } catch (final IOException ioe) { getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe}); } } } else { - statusSet.add(status); + if (isPostListingFilterNeeded(filterMode)) { + // Filtering explicitly performed here, since it was not able to be done when calling listStatus. + if (filter.accept(status.getPath())) { + statusSet.add(status); + } + } else { + statusSet.add(status); + } } } return statusSet; } + /** + * Determines if filtering needs to be applied, after calling {@link FileSystem#listStatus(Path)}, based on the + * given filter mode. + * Filter modes that need to be able to search directories regardless of the given filter should return true. + * FILTER_MODE_FILES_ONLY and FILTER_MODE_FULL_PATH require that {@link FileSystem#listStatus(Path)} be invoked + * without a filter so that all directories can be traversed when filtering with these modes. + * FILTER_MODE_DIRECTORIES_AND_FILES should return false, since filtering can be applied directly with + * {@link FileSystem#listStatus(Path, PathFilter)} regardless of a recursive listing. + * @param filterMode the value of one of the defined AllowableValues representing filter modes + * @return true if results need to be filtered, false otherwise + */ + private boolean isPostListingFilterNeeded(String filterMode) { + return filterMode.equals(FILTER_MODE_FILES_ONLY) || filterMode.equals(FILTER_MODE_FULL_PATH); + } + private String getAbsolutePath(final Path path) { final Path parent = path.getParent(); final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent); @@ -462,11 +523,15 @@ public class ListHDFS extends AbstractHadoopProcessor { private PathFilter createPathFilter(final ProcessContext context) { final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue()); - return new PathFilter() { - @Override - public boolean accept(Path path) { - return filePattern.matcher(path.getName()).matches(); + final String filterMode = context.getProperty(FILE_FILTER_MODE).getValue(); + return path -> { + final boolean accepted; + if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) { + accepted = filePattern.matcher(path.toString()).matches(); + } else { + accepted = filePattern.matcher(path.getName()).matches(); } + return accepted; }; } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html new file mode 100644 index 0000000000..2fd0deb2b2 --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html @@ -0,0 +1,104 @@ + + + + + + + PutHDFS + + + + + +

ListHDFS Filter Modes

+

+There are three filter modes available for ListHDFS that determine how the regular expression in the File Filter property will be applied to listings in HDFS. +

    +
  • Directories and Files
  • +Filtering will be applied to the names of directories and files. If Recurse Subdirectories is set to true, only subdirectories with a matching name will be searched for files that match the regular expression defined in File Filter. +
  • Files Only
  • +Filtering will only be applied to the names of files. If Recurse Subdirectories is set to true, the entire subdirectory tree will be searched for files that match the regular expression defined in File Filter. +
  • Full Path
  • +Filtering will be applied to the full path of files. If Recurse Subdirectories is set to true, the entire subdirectory tree will be searched for files in which the full path of the file matches the regular expression defined in File Filter. +
+

+

Examples:

+For the given examples, the following directory structure is used: +
+
+ data
+ ├── readme.txt
+ ├── bin
+ │   ├── readme.txt
+ │   ├── 1.bin
+ │   ├── 2.bin
+ │   └── 3.bin
+ ├── csv
+ │   ├── readme.txt
+ │   ├── 1.csv
+ │   ├── 2.csv
+ │   └── 3.csv
+ └── txt
+        ├── readme.txt
+        ├── 1.txt
+        ├── 2.txt
+        └── 3.txt
+

+

Directories and Files

+This mode is useful when the listing should match the names of directories and files with the regular expression defined in File Filter. When Recurse Subdirectories is true, this mode allows the user to filter for files in subdirectories with names that match the regular expression defined in File Filter. +
+
+ListHDFS configuration: +
PropertyValue
Directory/data
Recurse Subdirectoriestrue
File Filter.*txt.*
Filter ModeDirectories and Files
+

ListHDFS results: +

    +
  • /data/readme.txt
  • +
  • /data/txt/readme.txt
  • +
  • /data/txt/1.txt
  • +
  • /data/txt/2.txt
  • +
  • /data/txt/3.txt
  • +
+

Files Only

+This mode is useful when the listing should match only the names of files with the regular expression defined in File Filter. Directory names will not be matched against the regular expression defined in File Filter. When Recurse Subdirectories is true, this mode allows the user to filter for files in the entire subdirectory tree of the directory specified in the Directory property. +
+
+ListHDFS configuration: +
PropertyValue
Directory/data
Recurse Subdirectoriestrue
File Filter[^\.].*\.txt
Filter ModeFiles Only
+

ListHDFS results: +

    +
  • /data/readme.txt
  • +
  • /data/bin/readme.txt
  • +
  • /data/csv/readme.txt
  • +
  • /data/txt/readme.txt
  • +
  • /data/txt/1.txt
  • +
  • /data/txt/2.txt
  • +
  • /data/txt/3.txt
  • +
+

Full Path

+This mode is useful when the listing should match the entire path of a file with the regular expression defined in File Filter. When Recurse Subdirectories is true, this mode allows the user to filter for files in the entire subdirectory tree of the directory specified in the Directory property while allowing filtering based on the full path of each file. +
+
+ListHDFS configuration: +
PropertyValue
Directory/data
Recurse Subdirectoriestrue
File Filter(/.*/)*csv/.*
Filter ModeFull Path
+

ListHDFS results: +

    +
  • /data/csv/readme.txt
  • +
  • /data/csv/1.csv
  • +
  • /data/csv/2.csv
  • +
  • /data/csv/3.csv
  • +
+ + diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index bfa9851a77..b349962cd1 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -16,6 +16,9 @@ */ package org.apache.nifi.processors.hadoop; +import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_FILES_VALUE; +import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FILES_ONLY_VALUE; +import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FULL_PATH_VALUE; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -162,7 +165,8 @@ public class TestListHDFS { @Test - public void testRecursive() throws InterruptedException { + public void testRecursiveWithDefaultFilterAndFilterMode() throws InterruptedException { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/.testFile.txt"))); proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); @@ -192,6 +196,139 @@ public class TestListHDFS { } } + @Test + public void testRecursiveWithCustomFilterDirectoriesAndFiles() throws InterruptedException, IOException { + // set custom regex filter and filter mode + runner.setProperty(ListHDFS.FILE_FILTER, ".*txt.*"); + runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_DIRECTORIES_AND_FILES_VALUE.getValue()); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir"))); + proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir/3.out"))); + proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir/3.txt"))); + + // first iteration will not pick up files because it has to instead check timestamps. + // We must then wait long enough to ensure that the listing can be performed safely and + // run the Processor again. + runner.run(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); + + final List flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS); + for (int i = 0; i < 2; i++) { + final MockFlowFile ff = flowFiles.get(i); + final String filename = ff.getAttribute("filename"); + + if (filename.equals("testFile.txt")) { + ff.assertAttributeEquals("path", "/test"); + } else if (filename.equals("3.txt")) { + ff.assertAttributeEquals("path", "/test/txtDir"); + } else { + Assert.fail("filename was " + filename); + } + } + } + + @Test + public void testRecursiveWithCustomFilterFilesOnly() throws InterruptedException, IOException { + // set custom regex filter and filter mode + runner.setProperty(ListHDFS.FILE_FILTER, "[^\\.].*\\.txt"); + runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FILES_ONLY_VALUE.getValue()); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/.partfile.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/.txt"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt"))); + + // first iteration will not pick up files because it has to instead check timestamps. + // We must then wait long enough to ensure that the listing can be performed safely and + // run the Processor again. + runner.run(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3); + + final List flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS); + for (int i = 0; i < 2; i++) { + final MockFlowFile ff = flowFiles.get(i); + final String filename = ff.getAttribute("filename"); + + if (filename.equals("testFile.txt")) { + ff.assertAttributeEquals("path", "/test"); + } else if (filename.equals("1.txt")) { + ff.assertAttributeEquals("path", "/test/testDir"); + } else if (filename.equals("2.txt")) { + ff.assertAttributeEquals("path", "/test/testDir/anotherDir"); + } else { + Assert.fail("filename was " + filename); + } + } + } + + @Test + public void testRecursiveWithCustomFilterFullPath() throws InterruptedException, IOException { + // set custom regex filter and filter mode + runner.setProperty(ListHDFS.FILE_FILTER, "(/.*/)*anotherDir/1\\..*"); + runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FULL_PATH_VALUE.getValue()); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/1.out"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/1.txt"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt"))); + + proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/someDir"))); + proc.fileSystem.addFileStatus(new Path("/test/testDir/someDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/someDir/1.out"))); + + // first iteration will not pick up files because it has to instead check timestamps. + // We must then wait long enough to ensure that the listing can be performed safely and + // run the Processor again. + runner.run(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); + + final List flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS); + for (int i = 0; i < 2; i++) { + final MockFlowFile ff = flowFiles.get(i); + final String filename = ff.getAttribute("filename"); + + if (filename.equals("1.out")) { + ff.assertAttributeEquals("path", "/test/testDir/anotherDir"); + } else if (filename.equals("1.txt")) { + ff.assertAttributeEquals("path", "/test/testDir/anotherDir"); + } else { + Assert.fail("filename was " + filename); + } + } + } + @Test public void testNotRecursive() throws InterruptedException { runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");