NIFI-4434 Fixed recursive listing with a custom regex filter.

Filter modes are now supported to perform listings based on directory and file names, file-names only, and full path.

This closes #2937

Signed-off-by: zenfenan <zenfenan@apache.org>
This commit is contained in:
Jeff Storck 2018-08-01 13:13:40 -04:00 committed by zenfenan
parent 9a79c94f80
commit 451084e11f
3 changed files with 316 additions and 10 deletions

View File

@ -33,6 +33,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
@ -119,6 +120,35 @@ public class ListHDFS extends AbstractHadoopProcessor {
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build(); .build();
private static final String FILTER_MODE_DIRECTORIES_AND_FILES = "filter-mode-directories-and-files";
private static final String FILTER_MODE_FILES_ONLY = "filter-mode-files-only";
private static final String FILTER_MODE_FULL_PATH = "filter-mode-full-path";
static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES,
"Directories and Files",
"Filtering will be applied to the names of directories and files. If " + RECURSE_SUBDIRS.getName()
+ " is set to true, only subdirectories with a matching name will be searched for files that match "
+ "the regular expression defined in " + FILE_FILTER.getName() + ".");
static final AllowableValue FILTER_FILES_ONLY_VALUE = new AllowableValue(FILTER_MODE_FILES_ONLY,
"Files Only",
"Filtering will only be applied to the names of files. If " + RECURSE_SUBDIRS.getName()
+ " is set to true, the entire subdirectory tree will be searched for files that match "
+ "the regular expression defined in " + FILE_FILTER.getName() + ".");
static final AllowableValue FILTER_FULL_PATH_VALUE = new AllowableValue(FILTER_MODE_FULL_PATH,
"Full Path",
"Filtering will be applied to the full path of files. If " + RECURSE_SUBDIRS.getName()
+ " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+ "the file matches the regular expression defined in " + FILE_FILTER.getName() + ".");
public static final PropertyDescriptor FILE_FILTER_MODE = new PropertyDescriptor.Builder()
.name("file-filter-mode")
.displayName("File Filter Mode")
.description("Determines how the regular expression in " + FILE_FILTER.getName() + " will be used when retrieving listings.")
.required(true)
.allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE, FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE)
.defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue())
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder() public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
.name("minimum-file-age") .name("minimum-file-age")
.displayName("Minimum File Age") .displayName("Minimum File Age")
@ -167,6 +197,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
props.add(DIRECTORY); props.add(DIRECTORY);
props.add(RECURSE_SUBDIRS); props.add(RECURSE_SUBDIRS);
props.add(FILE_FILTER); props.add(FILE_FILTER);
props.add(FILE_FILTER_MODE);
props.add(MIN_AGE); props.add(MIN_AGE);
props.add(MAX_AGE); props.add(MAX_AGE);
return props; return props;
@ -340,11 +371,12 @@ public class ListHDFS extends AbstractHadoopProcessor {
// Pull in any file that is newer than the timestamp that we have. // Pull in any file that is newer than the timestamp that we have.
final FileSystem hdfs = getFileSystem(); final FileSystem hdfs = getFileSystem();
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
final Set<FileStatus> statuses; final Set<FileStatus> statuses;
try { try {
final Path rootPath = new Path(directory); final Path rootPath = new Path(directory);
statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context)); statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()}); getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
} catch (final IOException | IllegalArgumentException e) { } catch (final IOException | IllegalArgumentException e) {
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e}); getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
@ -391,29 +423,58 @@ public class ListHDFS extends AbstractHadoopProcessor {
} }
} }
private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException, InterruptedException { private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) throws IOException, InterruptedException {
final Set<FileStatus> statusSet = new HashSet<>(); final Set<FileStatus> statusSet = new HashSet<>();
getLogger().debug("Fetching listing for {}", new Object[] {path}); getLogger().debug("Fetching listing for {}", new Object[] {path});
final FileStatus[] statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path, filter)); final FileStatus[] statuses;
if (isPostListingFilterNeeded(filterMode)) {
// For this filter mode, the filter is not passed to listStatus, so that directory names will not be
// filtered out when the listing is recursive.
statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path));
} else {
statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path, filter));
}
for ( final FileStatus status : statuses ) { for ( final FileStatus status : statuses ) {
if ( status.isDirectory() ) { if ( status.isDirectory() ) {
if ( recursive ) { if ( recursive ) {
try { try {
statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter)); statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter, filterMode));
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe}); getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
} }
} }
} else { } else {
statusSet.add(status); if (isPostListingFilterNeeded(filterMode)) {
// Filtering explicitly performed here, since it was not able to be done when calling listStatus.
if (filter.accept(status.getPath())) {
statusSet.add(status);
}
} else {
statusSet.add(status);
}
} }
} }
return statusSet; return statusSet;
} }
/**
* Determines if filtering needs to be applied, after calling {@link FileSystem#listStatus(Path)}, based on the
* given filter mode.
* Filter modes that need to be able to search directories regardless of the given filter should return true.
* FILTER_MODE_FILES_ONLY and FILTER_MODE_FULL_PATH require that {@link FileSystem#listStatus(Path)} be invoked
* without a filter so that all directories can be traversed when filtering with these modes.
* FILTER_MODE_DIRECTORIES_AND_FILES should return false, since filtering can be applied directly with
* {@link FileSystem#listStatus(Path, PathFilter)} regardless of a recursive listing.
* @param filterMode the value of one of the defined AllowableValues representing filter modes
* @return true if results need to be filtered, false otherwise
*/
private boolean isPostListingFilterNeeded(String filterMode) {
return filterMode.equals(FILTER_MODE_FILES_ONLY) || filterMode.equals(FILTER_MODE_FULL_PATH);
}
private String getAbsolutePath(final Path path) { private String getAbsolutePath(final Path path) {
final Path parent = path.getParent(); final Path parent = path.getParent();
final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent); final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
@ -462,11 +523,15 @@ public class ListHDFS extends AbstractHadoopProcessor {
private PathFilter createPathFilter(final ProcessContext context) { private PathFilter createPathFilter(final ProcessContext context) {
final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue()); final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
return new PathFilter() { final String filterMode = context.getProperty(FILE_FILTER_MODE).getValue();
@Override return path -> {
public boolean accept(Path path) { final boolean accepted;
return filePattern.matcher(path.getName()).matches(); if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
accepted = filePattern.matcher(path.toString()).matches();
} else {
accepted = filePattern.matcher(path.getName()).matches();
} }
return accepted;
}; };
} }

View File

@ -0,0 +1,104 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>PutHDFS</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h1>ListHDFS Filter Modes</h1>
<p>
There are three filter modes available for ListHDFS that determine how the regular expression in the <b><code>File Filter</code></b> property will be applied to listings in HDFS.
<ul>
<li><b><code>Directories and Files</code></b></li>
Filtering will be applied to the names of directories and files. If <b><code>Recurse Subdirectories</code></b> is set to true, only subdirectories with a matching name will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
<li><b><code>Files Only</code></b></li>
Filtering will only be applied to the names of files. If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
<li><b><code>Full Path</code></b></li>
Filtering will be applied to the full path of files. If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files in which the full path of the file matches the regular expression defined in <b><code>File Filter</code></b>.
</ul>
<p>
<h2>Examples:</h2>
For the given examples, the following directory structure is used:
<br>
<br>
data<br>
├── readme.txt<br>
├── bin<br>
│   ├── readme.txt<br>
│   ├── 1.bin<br>
│   ├── 2.bin<br>
│   └── 3.bin<br>
├── csv<br>
│   ├── readme.txt<br>
│   ├── 1.csv<br>
│   ├── 2.csv<br>
│   └── 3.csv<br>
└── txt<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ├── readme.txt<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ├── 1.txt<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ├── 2.txt<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; └── 3.txt<br>
<br><br>
<h3><b>Directories and Files</b></h3>
This mode is useful when the listing should match the names of directories and files with the regular expression defined in <b><code>File Filter</code></b>. When <b><code>Recurse Subdirectories</code></b> is true, this mode allows the user to filter for files in subdirectories with names that match the regular expression defined in <b><code>File Filter</code></b>.
<br>
<br>
ListHDFS configuration:
<table><tr><th><b><code>Property</code></b></th><th><b><code>Value</code></b></th></tr><tr><td><b><code>Directory</code></b></td><td><code>/data</code></td></tr><tr><td><b><code>Recurse Subdirectories</code></b></td><td>true</td><tr><td><b><code>File Filter</code></b></td><td><code>.*txt.*</code></td></tr><tr><td><code><b>Filter Mode</b></code></td><td><code>Directories and Files</code></td></tr></table>
<p>ListHDFS results:
<ul>
<li>/data/readme.txt</li>
<li>/data/txt/readme.txt</li>
<li>/data/txt/1.txt</li>
<li>/data/txt/2.txt</li>
<li>/data/txt/3.txt</li>
</ul>
<h3><b>Files Only</b></h3>
This mode is useful when the listing should match only the names of files with the regular expression defined in <b><code>File Filter</code></b>. Directory names will not be matched against the regular expression defined in <b><code>File Filter</code></b>. When <b><code>Recurse Subdirectories</code></b> is true, this mode allows the user to filter for files in the entire subdirectory tree of the directory specified in the <b><code>Directory</code></b> property.
<br>
<br>
ListHDFS configuration:
<table><tr><th><b><code>Property</code></b></th><th><b><code>Value</code></b></th></tr><tr><td><b><code>Directory</code></b></td><td><code>/data</code></td></tr><tr><td><b><code>Recurse Subdirectories</code></b></td><td>true</td><tr><td><b><code>File Filter</code></b></td><td><code>[^\.].*\.txt</code></td></tr><tr><td><code><b>Filter Mode</b></code></td><td><code>Files Only</code></td></tr></table>
<p>ListHDFS results:
<ul>
<li>/data/readme.txt</li>
<li>/data/bin/readme.txt</li>
<li>/data/csv/readme.txt</li>
<li>/data/txt/readme.txt</li>
<li>/data/txt/1.txt</li>
<li>/data/txt/2.txt</li>
<li>/data/txt/3.txt</li>
</ul>
<h3><b>Full Path</b></h3>
This mode is useful when the listing should match the entire path of a file with the regular expression defined in <b><code>File Filter</code></b>. When <b><code>Recurse Subdirectories</code></b> is true, this mode allows the user to filter for files in the entire subdirectory tree of the directory specified in the <b><code>Directory</code></b> property while allowing filtering based on the full path of each file.
<br>
<br>
ListHDFS configuration:
<table><tr><th><b><code>Property</code></b></th><th><b><code>Value</code></b></th></tr><tr><td><b><code>Directory</code></b></td><td><code>/data</code></td></tr><tr><td><b><code>Recurse Subdirectories</code></b></td><td>true</td><tr><td><b><code>File Filter</code></b></td><td><code>(/.*/)*csv/.*</code></td></tr><tr><td><code><b>Filter Mode</b></code></td><td><code>Full Path</code></td></tr></table>
<p>ListHDFS results:
<ul>
<li>/data/csv/readme.txt</li>
<li>/data/csv/1.csv</li>
<li>/data/csv/2.csv</li>
<li>/data/csv/3.csv</li>
</ul>
</body>
</html>

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_FILES_VALUE;
import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FILES_ONLY_VALUE;
import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FULL_PATH_VALUE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -162,7 +165,8 @@ public class TestListHDFS {
@Test @Test
public void testRecursive() throws InterruptedException { public void testRecursiveWithDefaultFilterAndFilterMode() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/.testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir"))); proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
@ -192,6 +196,139 @@ public class TestListHDFS {
} }
} }
@Test
public void testRecursiveWithCustomFilterDirectoriesAndFiles() throws InterruptedException, IOException {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, ".*txt.*");
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_DIRECTORIES_AND_FILES_VALUE.getValue());
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir")));
proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir/3.out")));
proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir/3.txt")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
for (int i = 0; i < 2; i++) {
final MockFlowFile ff = flowFiles.get(i);
final String filename = ff.getAttribute("filename");
if (filename.equals("testFile.txt")) {
ff.assertAttributeEquals("path", "/test");
} else if (filename.equals("3.txt")) {
ff.assertAttributeEquals("path", "/test/txtDir");
} else {
Assert.fail("filename was " + filename);
}
}
}
@Test
public void testRecursiveWithCustomFilterFilesOnly() throws InterruptedException, IOException {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, "[^\\.].*\\.txt");
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FILES_ONLY_VALUE.getValue());
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/.partfile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
for (int i = 0; i < 2; i++) {
final MockFlowFile ff = flowFiles.get(i);
final String filename = ff.getAttribute("filename");
if (filename.equals("testFile.txt")) {
ff.assertAttributeEquals("path", "/test");
} else if (filename.equals("1.txt")) {
ff.assertAttributeEquals("path", "/test/testDir");
} else if (filename.equals("2.txt")) {
ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
} else {
Assert.fail("filename was " + filename);
}
}
}
@Test
public void testRecursiveWithCustomFilterFullPath() throws InterruptedException, IOException {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, "(/.*/)*anotherDir/1\\..*");
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FULL_PATH_VALUE.getValue());
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/1.out")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/someDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/someDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/someDir/1.out")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
for (int i = 0; i < 2; i++) {
final MockFlowFile ff = flowFiles.get(i);
final String filename = ff.getAttribute("filename");
if (filename.equals("1.out")) {
ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
} else if (filename.equals("1.txt")) {
ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
} else {
Assert.fail("filename was " + filename);
}
}
}
@Test @Test
public void testNotRecursive() throws InterruptedException { public void testNotRecursive() throws InterruptedException {
runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false"); runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");