NIFI-6275 ListHDFS now ignores scheme and authority when uses "Full Path" filter mode

Updated description for "Full Path" filter mode to state that it will ignore scheme and authority
Added tests to TestListHDFS for listing an empty and nonexistent dirs
Updated TestListHDFS' mock file system to track state properly when FileStatus instances are added, and updated listStatus to work properly with the underlying Map that contains FileStatus instances
Updated ListHDFS' additional details to document "Full Path" filter mode ignoring scheme and authority, with an example
Updated TestRunners, StandardProcessorTestRunner, MockProcessorInitializationContext to support passing in a logger.

NIFI-6275 Updated the "Full Path" filter mode to check the full path of a file with and without its scheme and authority against the filter regex
Added additional documentation for how ListHDFS handles scheme and authority when "Full Path" filter mode is used
Added test case for "Full Path" filter mode with a regular expression that includes scheme and authority

This closes #3483.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Jeff Storck 2019-05-13 16:26:36 -04:00 committed by Koji Kawamura
parent 39a258dc38
commit 8d748223ff
No known key found for this signature in database
GPG Key ID: 36136B0EC89E4758
6 changed files with 315 additions and 36 deletions

View File

@ -33,8 +33,12 @@ public class MockProcessorInitializationContext implements ProcessorInitializati
private final MockProcessContext context;
public MockProcessorInitializationContext(final Processor processor, final MockProcessContext context) {
this(processor, context, null);
}
public MockProcessorInitializationContext(final Processor processor, final MockProcessContext context, final MockComponentLog logger) {
processorId = UUID.randomUUID().toString();
logger = new MockComponentLog(processorId, processor);
this.logger = logger == null ? new MockComponentLog(processorId, processor) : logger;
this.context = context;
}

View File

@ -99,6 +99,10 @@ public class StandardProcessorTestRunner implements TestRunner {
}
StandardProcessorTestRunner(final Processor processor, String processorName) {
this(processor, processorName, null);
}
StandardProcessorTestRunner(final Processor processor, String processorName, MockComponentLog logger) {
this.processor = processor;
this.idGenerator = new AtomicLong(0L);
this.sharedState = new SharedSessionState(processor, idGenerator);
@ -108,9 +112,9 @@ public class StandardProcessorTestRunner implements TestRunner {
this.variableRegistry = new MockVariableRegistry();
this.context = new MockProcessContext(processor, processorName, processorStateManager, variableRegistry);
final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context);
final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context, logger);
processor.initialize(mockInitContext);
logger = mockInitContext.getLogger();
this.logger = mockInitContext.getLogger();
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);

View File

@ -30,6 +30,17 @@ public class TestRunners {
return newTestRunner(processor,processor.getClass().getName());
}
/**
* Returns a {@code TestRunner} for the given {@code Processor}.
* The processor name available from {@code TestRunner.getProcessContext().getName()} will have the default name of {@code processor.getClass().getName()}
* @param processor the {@code Processor} under test
* @param logger the {@code ComponentLog} used for logging
* @return a {@code TestRunner}
*/
public static TestRunner newTestRunner(final Processor processor, MockComponentLog logger) {
return newTestRunner(processor,processor.getClass().getName(), logger);
}
/**
* Returns a {@code TestRunner} for the given {@code Processor}.
* The processor name available from {@code TestRunner.getProcessContext().getName()} will be the passed name.
@ -41,6 +52,18 @@ public class TestRunners {
return new StandardProcessorTestRunner(processor, name);
}
/**
* Returns a {@code TestRunner} for the given {@code Processor}.
* The processor name available from {@code TestRunner.getProcessContext().getName()} will be the passed name.
* @param processor the {@code Processor} under test
* @param name the name to give the {@code Processor}
* @param logger the {@code ComponentLog} used for logging
* @return a {@code TestRunner}
*/
public static TestRunner newTestRunner(final Processor processor, String name, MockComponentLog logger) {
return new StandardProcessorTestRunner(processor, name, logger);
}
/**
* Returns a {@code TestRunner} for the given {@code Processor} class.
* The processor name available from {@code TestRunner.getProcessContext().getName()} will have the default name of {@code processor.getClass().getName()}
@ -51,6 +74,17 @@ public class TestRunners {
return newTestRunner(processorClass, processorClass.getName());
}
/**
* Returns a {@code TestRunner} for the given {@code Processor} class.
* The processor name available from {@code TestRunner.getProcessContext().getName()} will have the default name of {@code processor.getClass().getName()}
* @param processorClass the {@code Processor} class
* @param logger the {@code ComponentLog} used for logging
* @return a {@code TestRunner}
*/
public static TestRunner newTestRunner(final Class<? extends Processor> processorClass, MockComponentLog logger) {
return newTestRunner(processorClass, processorClass.getName(), logger);
}
/**
* Returns a {@code TestRunner} for the given {@code Processor} class.
* The processor name available from {@code TestRunner.getProcessContext().getName()} will have the default name of {@code processor.getClass().getName()}
@ -67,4 +101,20 @@ public class TestRunners {
}
}
/**
* Returns a {@code TestRunner} for the given {@code Processor} class.
* The processor name available from {@code TestRunner.getProcessContext().getName()} will have the default name of {@code processor.getClass().getName()}
* @param processorClass the {@code Processor} class
* @param name the name to give the {@code Processor}
* @param logger the {@code ComponentLog} used for logging
* @return a {@code TestRunner}
*/
public static TestRunner newTestRunner(final Class<? extends Processor> processorClass, String name, MockComponentLog logger) {
try {
return newTestRunner(processorClass.newInstance(), name, logger);
} catch (final Exception e) {
System.err.println("Could not instantiate instance of class " + processorClass.getName() + " due to: " + e);
throw new RuntimeException(e);
}
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -126,24 +127,25 @@ public class ListHDFS extends AbstractHadoopProcessor {
private static final String FILTER_MODE_FULL_PATH = "filter-mode-full-path";
static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES,
"Directories and Files",
"Filtering will be applied to the names of directories and files. If " + RECURSE_SUBDIRS.getName()
"Filtering will be applied to the names of directories and files. If " + RECURSE_SUBDIRS.getDisplayName()
+ " is set to true, only subdirectories with a matching name will be searched for files that match "
+ "the regular expression defined in " + FILE_FILTER.getName() + ".");
+ "the regular expression defined in " + FILE_FILTER.getDisplayName() + ".");
static final AllowableValue FILTER_FILES_ONLY_VALUE = new AllowableValue(FILTER_MODE_FILES_ONLY,
"Files Only",
"Filtering will only be applied to the names of files. If " + RECURSE_SUBDIRS.getName()
"Filtering will only be applied to the names of files. If " + RECURSE_SUBDIRS.getDisplayName()
+ " is set to true, the entire subdirectory tree will be searched for files that match "
+ "the regular expression defined in " + FILE_FILTER.getName() + ".");
+ "the regular expression defined in " + FILE_FILTER.getDisplayName() + ".");
static final AllowableValue FILTER_FULL_PATH_VALUE = new AllowableValue(FILTER_MODE_FULL_PATH,
"Full Path",
"Filtering will be applied to the full path of files. If " + RECURSE_SUBDIRS.getName()
+ " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+ "the file matches the regular expression defined in " + FILE_FILTER.getName() + ".");
"Filtering will be applied by evaluating the regular expression defined in " + FILE_FILTER.getDisplayName()
+ " against the full path of files with and without the scheme and authority. If "
+ RECURSE_SUBDIRS.getDisplayName() + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+ "the file matches the regular expression defined in " + FILE_FILTER.getDisplayName() + ". See 'Additional Details' for more information.");
public static final PropertyDescriptor FILE_FILTER_MODE = new PropertyDescriptor.Builder()
.name("file-filter-mode")
.displayName("File Filter Mode")
.description("Determines how the regular expression in " + FILE_FILTER.getName() + " will be used when retrieving listings.")
.description("Determines how the regular expression in " + FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
.required(true)
.allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE, FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE)
.defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue())
@ -181,12 +183,21 @@ public class ListHDFS extends AbstractHadoopProcessor {
static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
private Pattern fileFilterRegexPattern;
@Override
protected void init(final ProcessorInitializationContext context) {
super.init(context);
}
@Override
protected void preProcessConfiguration(Configuration config, ProcessContext context) {
super.preProcessConfiguration(config, context);
// Since this processor is marked as INPUT_FORBIDDEN, the FILE_FILTER regex can be compiled here rather than during onTrigger processing
fileFilterRegexPattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
}
protected File getPersistenceFile() {
return new File("conf/state/" + getIdentifier());
}
@ -222,7 +233,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
if (minimumAge > maximumAge) {
problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
.explanation(MIN_AGE.getName() + " cannot be greater than " + MAX_AGE.getName()).build());
.explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
}
return problems;
@ -526,14 +537,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
private PathFilter createPathFilter(final ProcessContext context) {
final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
final String filterMode = context.getProperty(FILE_FILTER_MODE).getValue();
return path -> {
final boolean accepted;
if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
accepted = filePattern.matcher(path.toString()).matches();
accepted = fileFilterRegexPattern.matcher(path.toString()).matches()
|| fileFilterRegexPattern.matcher(Path.getPathWithoutSchemeAndAuthority(path).toString()).matches();
} else {
accepted = filePattern.matcher(path.getName()).matches();
accepted = fileFilterRegexPattern.matcher(path.getName()).matches();
}
return accepted;
};

View File

@ -28,11 +28,16 @@
There are three filter modes available for ListHDFS that determine how the regular expression in the <b><code>File Filter</code></b> property will be applied to listings in HDFS.
<ul>
<li><b><code>Directories and Files</code></b></li>
Filtering will be applied to the names of directories and files. If <b><code>Recurse Subdirectories</code></b> is set to true, only subdirectories with a matching name will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
Filtering will be applied to the names of directories and files. If <b><code>Recurse Subdirectories</code></b> is set to true, only subdirectories with a matching name will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
<li><b><code>Files Only</code></b></li>
Filtering will only be applied to the names of files. If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
Filtering will only be applied to the names of files. If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
<li><b><code>Full Path</code></b></li>
Filtering will be applied to the full path of files. If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files in which the full path of the file matches the regular expression defined in <b><code>File Filter</code></b>.
Filtering will be applied to the full path of files. If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files in which the full path of the file matches the regular expression defined in <b><code>File Filter</code></b>.<br>
Regarding <code>scheme</code> and <code>authority</code>, if a given file has a full path of <code>hdfs://hdfscluster:8020/data/txt/1.txt</code>, the filter will evaluate the regular expression defined in <b><code>File Filter</code></b> against two cases, matching if either is true:<br>
<ul>
<li>the full path including the scheme (<code>hdfs</code>), authority (<code>hdfscluster:8020</code>), and the remaining path components (<code>/data/txt/1.txt</code>)</li>
<li>only the path components (<code>/data/txt/1.txt</code>)</li>
</ul>
</ul>
<p>
<h2>Examples:</h2>

View File

@ -20,7 +20,15 @@ import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_
import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FILES_ONLY_VALUE;
import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FULL_PATH_VALUE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
@ -34,11 +42,14 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -54,6 +65,7 @@ import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
@ -61,6 +73,7 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class TestListHDFS {
@ -68,6 +81,7 @@ public class TestListHDFS {
private ListHDFSWithMockedFileSystem proc;
private NiFiProperties mockNiFiProperties;
private KerberosProperties kerberosProperties;
private MockComponentLog mockLogger;
@Before
public void setup() throws InitializationException {
@ -76,7 +90,8 @@ public class TestListHDFS {
kerberosProperties = new KerberosProperties(null);
proc = new ListHDFSWithMockedFileSystem(kerberosProperties);
runner = TestRunners.newTestRunner(proc);
mockLogger = spy(new MockComponentLog(UUID.randomUUID().toString(), proc));
runner = TestRunners.newTestRunner(proc, mockLogger);
runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml");
runner.setProperty(ListHDFS.DIRECTORY, "/test");
@ -279,25 +294,92 @@ public class TestListHDFS {
}
@Test
public void testRecursiveWithCustomFilterFullPath() throws InterruptedException, IOException {
public void testRecursiveWithCustomFilterFullPathWithoutSchemeAndAuthority() throws InterruptedException, IOException {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, "(/.*/)*anotherDir/1\\..*");
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FULL_PATH_VALUE.getValue());
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/1.out")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/someDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/someDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/someDir/1.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
for (int i = 0; i < 2; i++) {
final MockFlowFile ff = flowFiles.get(i);
final String filename = ff.getAttribute("filename");
if (filename.equals("1.out")) {
ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
} else if (filename.equals("1.txt")) {
ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
} else {
Assert.fail("filename was " + filename);
}
}
}
@Test
public void testRecursiveWithCustomFilterFullPathWithSchemeAndAuthority() throws InterruptedException, IOException {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, "hdfs://hdfscluster:8020(/.*/)*anotherDir/1\\..*");
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FULL_PATH_VALUE.getValue());
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
@ -532,6 +614,115 @@ public class TestListHDFS {
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
}
@Test
public void testListingEmptyDir() throws InterruptedException, IOException {
runner.setProperty(ListHDFS.DIRECTORY, "/test/emptyDir");
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/emptyDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
// verify that no messages were logged at the error level
verify(mockLogger, never()).error(anyString());
final ArgumentCaptor<Throwable> throwableArgumentCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(mockLogger, atLeast(0)).error(anyString(), throwableArgumentCaptor.capture());
// if error.(message, throwable) was called, ignore JobConf CNFEs since mapreduce libs are not included as dependencies
assertTrue(throwableArgumentCaptor.getAllValues().stream().flatMap(Stream::of)
// check that there are no throwables that are not of JobConf CNFE exceptions
.noneMatch(throwable -> !(throwable instanceof ClassNotFoundException && throwable.getMessage().contains("JobConf"))));
verify(mockLogger, never()).error(anyString(), any(Object[].class));
verify(mockLogger, never()).error(anyString(), any(Object[].class), any(Throwable.class));
// assert that no files were listed
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
// assert that no files were penalized
runner.assertPenalizeCount(0);
}
@Test
public void testListingNonExistingDir() throws InterruptedException, IOException {
String nonExistingPath = "/test/nonExistingDir";
runner.setProperty(ListHDFS.DIRECTORY, nonExistingPath);
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/emptyDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
final ArgumentCaptor<Object[]> loggingArgsCaptor = ArgumentCaptor.forClass(Object[].class);
verify(mockLogger, atLeastOnce()).error(anyString(), loggingArgsCaptor.capture());
// assert that FNFE exceptions were logged for the Directory property's value.
assertTrue(loggingArgsCaptor.getAllValues().stream().flatMap(Stream::of)
.anyMatch(o -> o instanceof FileNotFoundException && ((FileNotFoundException)o).getMessage().contains(nonExistingPath)));
// assert that no files were listed
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
// assert that no files were penalized
runner.assertPenalizeCount(0);
}
private FsPermission create777() {
return new FsPermission((short) 0777);
@ -578,6 +769,11 @@ public class TestListHDFS {
}
children.add(child);
// if the child is directory and a key for it does not exist, create it with an empty set of children
if (child.isDirectory() && !fileStatuses.containsKey(child.getPath())) {
fileStatuses.put(child.getPath(), new HashSet<>());
}
}
@Override
@ -625,12 +821,21 @@ public class TestListHDFS {
@Override
public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException {
final Set<FileStatus> statuses = fileStatuses.get(f);
if (statuses == null) {
return new FileStatus[0];
}
return fileStatuses.keySet().stream()
// find the key in fileStatuses that matches the given Path f
.filter(pathKey -> f.isAbsoluteAndSchemeAuthorityNull()
// f is an absolute path with no scheme and no authority, compare with the keys of fileStatuses without their scheme and authority
? Path.getPathWithoutSchemeAndAuthority(pathKey).equals(Path.getPathWithoutSchemeAndAuthority(f)) :
// f is absolute, but contains a scheme or authority, compare directly to the keys of fileStatuses
// if f is not absolute, false will be returned;
f.isAbsolute() && pathKey.equals(f))
// get the set of FileStatus objects for the filtered paths in the stream
.map(fileStatuses::get)
// return the first set of FileStatus objects in the stream; there should only be one, since fileStatuses is a Map
.findFirst()
return statuses.toArray(new FileStatus[statuses.size()]);
// if no set of FileStatus objects was found, throw a FNFE
.orElseThrow(() -> new FileNotFoundException(String.format("%s instance does not contain an key for %s", this.getClass().getSimpleName(), f))).toArray(new FileStatus[0]);
}
@Override