diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index b6625b8353..dfe57097b7 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -154,6 +154,11 @@
org.glassfish.jaxb
jaxb-runtime
+
+ org.apache.nifi
+ nifi-mock-record-utils
+ test
+
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 40abeb16f9..330506d14d 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -17,12 +17,9 @@
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -36,50 +33,40 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.deprecation.log.DeprecationLogger;
-import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+import org.apache.nifi.processors.hadoop.util.FilterMode;
+import org.apache.nifi.processors.hadoop.util.writer.FlowFileObjectWriter;
+import org.apache.nifi.processors.hadoop.util.writer.HadoopFileStatusWriter;
+import org.apache.nifi.processors.hadoop.util.writer.RecordObjectWriter;
import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import java.io.File;
import java.io.IOException;
-import java.io.OutputStream;
-import java.security.PrivilegedExceptionAction;
-import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_DIRECTORIES_AND_FILES;
@PrimaryNodeOnly
@TriggerSerially
@@ -87,189 +74,116 @@ import java.util.regex.Pattern;
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"hadoop", "HCFS", "HDFS", "get", "list", "ingest", "source", "filesystem"})
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
-@CapabilityDescription("Retrieves a listing of files from HDFS. Each time a listing is performed, the files with the latest timestamp will be excluded "
- + "and picked up during the next execution of the processor. This is done to ensure that we do not miss any files, or produce duplicates, in the "
- + "cases where files with the same timestamp are written immediately before and after a single execution of the processor. For each file that is "
- + "listed in HDFS, this processor creates a FlowFile that represents the HDFS file to be fetched in conjunction with FetchHDFS. This Processor is "
- + "designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left "
- + "off without duplicating all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
+@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, this processor creates a FlowFile that represents "
+ + "the HDFS file to be fetched in conjunction with FetchHDFS. This Processor is designed to run on Primary Node only in a cluster. If the primary "
+ + "node changes, the new Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike GetHDFS, this "
+ + "Processor does not delete any data from HDFS.")
@WritesAttributes({
- @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."),
- @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, "
- + "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up "
- + "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
- @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"),
- @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"),
- @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
- @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"),
- @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"),
- @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, "
- + "3 for the group, and 3 for other users. For example rw-rw-r--")
+ @WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."),
+ @WritesAttribute(attribute = "path", description = "The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, "
+ + "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up "
+ + "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
+ @WritesAttribute(attribute = "hdfs.owner", description = "The user that owns the file in HDFS"),
+ @WritesAttribute(attribute = "hdfs.group", description = "The group that owns the file in HDFS"),
+ @WritesAttribute(attribute = "hdfs.lastModified", description = "The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
+ @WritesAttribute(attribute = "hdfs.length", description = "The number of bytes in the file in HDFS"),
+ @WritesAttribute(attribute = "hdfs.replication", description = "The number of HDFS replicas for hte file"),
+ @WritesAttribute(attribute = "hdfs.permissions", description = "The permissions for the file in HDFS. This is formatted as 3 characters for the owner, "
+ + "3 for the group, and 3 for other users. For example rw-rw-r--")
})
-@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of HDFS files, the latest timestamp of all the files listed and the latest "
- + "timestamp of all the files transferred are both stored. This allows the Processor to list only files that have been added or modified after "
- + "this date the next time that the Processor is run, without having to store all of the actual filenames/paths which could lead to performance "
- + "problems. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
- + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of HDFS files, the latest timestamp of all the files listed is stored. "
+ + "This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run, "
+ + "without having to store all of the actual filenames/paths which could lead to performance problems. State is stored across the cluster "
+ + "so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up where the previous "
+ + "node left off, without duplicating the data.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class ListHDFS extends AbstractHadoopProcessor {
- private static final RecordSchema RECORD_SCHEMA;
- private static final String FILENAME = "filename";
- private static final String PATH = "path";
- private static final String IS_DIRECTORY = "directory";
- private static final String SIZE = "size";
- private static final String LAST_MODIFIED = "lastModified";
- private static final String PERMISSIONS = "permissions";
- private static final String OWNER = "owner";
- private static final String GROUP = "group";
- private static final String REPLICATION = "replication";
- private static final String IS_SYM_LINK = "symLink";
- private static final String IS_ENCRYPTED = "encrypted";
- private static final String IS_ERASURE_CODED = "erasureCoded";
-
- static {
- final List recordFields = new ArrayList<>();
- recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
- recordFields.add(new RecordField(PATH, RecordFieldType.STRING.getDataType(), false));
- recordFields.add(new RecordField(IS_DIRECTORY, RecordFieldType.BOOLEAN.getDataType(), false));
- recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
- recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
- recordFields.add(new RecordField(PERMISSIONS, RecordFieldType.STRING.getDataType()));
- recordFields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
- recordFields.add(new RecordField(GROUP, RecordFieldType.STRING.getDataType()));
- recordFields.add(new RecordField(REPLICATION, RecordFieldType.INT.getDataType()));
- recordFields.add(new RecordField(IS_SYM_LINK, RecordFieldType.BOOLEAN.getDataType()));
- recordFields.add(new RecordField(IS_ENCRYPTED, RecordFieldType.BOOLEAN.getDataType()));
- recordFields.add(new RecordField(IS_ERASURE_CODED, RecordFieldType.BOOLEAN.getDataType()));
- RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
- }
+ private static final String NON_HIDDEN_FILES_REGEX = "[^\\.].*";
public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
- .name("Recurse Subdirectories")
- .description("Indicates whether to list files from subdirectories of the HDFS directory")
- .required(true)
- .allowableValues("true", "false")
- .defaultValue("true")
- .build();
+ .name("Recurse Subdirectories")
+ .description("Indicates whether to list files from subdirectories of the HDFS directory")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
- .name("record-writer")
- .displayName("Record Writer")
- .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
- "all entities will be written to a single FlowFile.")
- .required(false)
- .identifiesControllerService(RecordSetWriterFactory.class)
- .build();
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each "
+ + "entity that is listed. If the Record Writer is specified, all entities will be written to a single FlowFile.")
+ .required(false)
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .build();
public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
- .name("File Filter")
- .description("Only files whose names match the given regular expression will be picked up")
- .required(true)
- .defaultValue("[^\\.].*")
- .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .build();
-
- private static final String FILTER_MODE_DIRECTORIES_AND_FILES = "filter-mode-directories-and-files";
- private static final String FILTER_MODE_FILES_ONLY = "filter-mode-files-only";
- private static final String FILTER_MODE_FULL_PATH = "filter-mode-full-path";
- static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES,
- "Directories and Files",
- "Filtering will be applied to the names of directories and files. If " + RECURSE_SUBDIRS.getDisplayName()
- + " is set to true, only subdirectories with a matching name will be searched for files that match "
- + "the regular expression defined in " + FILE_FILTER.getDisplayName() + ".");
- static final AllowableValue FILTER_FILES_ONLY_VALUE = new AllowableValue(FILTER_MODE_FILES_ONLY,
- "Files Only",
- "Filtering will only be applied to the names of files. If " + RECURSE_SUBDIRS.getDisplayName()
- + " is set to true, the entire subdirectory tree will be searched for files that match "
- + "the regular expression defined in " + FILE_FILTER.getDisplayName() + ".");
- static final AllowableValue FILTER_FULL_PATH_VALUE = new AllowableValue(FILTER_MODE_FULL_PATH,
- "Full Path",
- "Filtering will be applied by evaluating the regular expression defined in " + FILE_FILTER.getDisplayName()
- + " against the full path of files with and without the scheme and authority. If "
- + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
- + "the file matches the regular expression defined in " + FILE_FILTER.getDisplayName() + ". See 'Additional Details' for more information.");
+ .name("File Filter")
+ .description("Only files whose names match the given regular expression will be picked up")
+ .required(true)
+ .defaultValue(NON_HIDDEN_FILES_REGEX)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
public static final PropertyDescriptor FILE_FILTER_MODE = new PropertyDescriptor.Builder()
- .name("file-filter-mode")
- .displayName("File Filter Mode")
- .description("Determines how the regular expression in " + FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
- .required(true)
- .allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE, FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE)
- .defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue())
- .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .build();
+ .name("file-filter-mode")
+ .displayName("File Filter Mode")
+ .description("Determines how the regular expression in " + FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
+ .required(true)
+ .allowableValues(FilterMode.class)
+ .defaultValue(FILTER_DIRECTORIES_AND_FILES.getValue())
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
- public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
- .name("minimum-file-age")
- .displayName("Minimum File Age")
- .description("The minimum age that a file must be in order to be pulled; any file younger than this "
- + "amount of time (based on last modification date) will be ignored")
- .required(false)
- .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
- .build();
+ public static final PropertyDescriptor MINIMUM_FILE_AGE = new PropertyDescriptor.Builder()
+ .name("minimum-file-age")
+ .displayName("Minimum File Age")
+ .description("The minimum age that a file must be in order to be pulled; any file younger than this "
+ + "amount of time (based on last modification date) will be ignored")
+ .required(false)
+ .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+ .build();
- public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
- .name("maximum-file-age")
- .displayName("Maximum File Age")
- .description("The maximum age that a file must be in order to be pulled; any file older than this "
- + "amount of time (based on last modification date) will be ignored. Minimum value is 100ms.")
- .required(false)
- .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
- .build();
+ public static final PropertyDescriptor MAXIMUM_FILE_AGE = new PropertyDescriptor.Builder()
+ .name("maximum-file-age")
+ .displayName("Maximum File Age")
+ .description("The maximum age that a file must be in order to be pulled; any file older than this "
+ + "amount of time (based on last modification date) will be ignored. Minimum value is 100ms.")
+ .required(false)
+ .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("All FlowFiles are transferred to this relationship")
- .build();
+ .name("success")
+ .description("All FlowFiles are transferred to this relationship")
+ .build();
+ public static final String LEGACY_EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
+ public static final String LEGACY_LISTING_TIMESTAMP_KEY = "listing.timestamp";
+ public static final String LATEST_TIMESTAMP_KEY = "latest.timestamp";
+ public static final String LATEST_FILES_KEY = "latest.file.%d";
- private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(ListHDFS.class);
-
- private volatile long latestTimestampListed = -1L;
- private volatile long latestTimestampEmitted = -1L;
- private volatile long lastRunTimestamp = -1L;
- private volatile boolean resetState = false;
- static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
- static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
-
- static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
+ private static final Set RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
private Pattern fileFilterRegexPattern;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- super.init(context);
- }
+ private volatile boolean resetState = false;
@Override
protected void preProcessConfiguration(Configuration config, ProcessContext context) {
super.preProcessConfiguration(config, context);
// Since this processor is marked as INPUT_FORBIDDEN, the FILE_FILTER regex can be compiled here rather than during onTrigger processing
fileFilterRegexPattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
-
- }
-
- protected File getPersistenceFile() {
- return new File("conf/state/" + getIdentifier());
}
@Override
protected List getSupportedPropertyDescriptors() {
final List props = new ArrayList<>(properties);
- props.add(DIRECTORY);
- props.add(RECURSE_SUBDIRS);
- props.add(RECORD_WRITER);
- props.add(FILE_FILTER);
- props.add(FILE_FILTER_MODE);
- props.add(MIN_AGE);
- props.add(MAX_AGE);
+ props.addAll(Arrays.asList(DIRECTORY, RECURSE_SUBDIRS, RECORD_WRITER, FILE_FILTER, FILE_FILTER_MODE, MINIMUM_FILE_AGE, MAXIMUM_FILE_AGE));
return props;
}
@Override
public Set getRelationships() {
- final Set relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- return relationships;
+ return RELATIONSHIPS;
}
@Override
@@ -277,401 +191,148 @@ public class ListHDFS extends AbstractHadoopProcessor {
final List problems = new ArrayList<>(super.customValidate(context));
- final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
if (minimumAge > maximumAge) {
- problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
- .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
+ problems.add(new ValidationResult.Builder().valid(false).subject("ListHDFS Configuration")
+ .explanation(MINIMUM_FILE_AGE.getDisplayName() + " cannot be greater than " + MAXIMUM_FILE_AGE.getDisplayName()).build());
}
-
return problems;
}
- protected String getKey(final String directory) {
- return getIdentifier() + ".lastListingTime." + directory;
- }
-
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
- this.resetState = true;
+ resetState = true;
}
}
- /**
- * Determines which of the given FileStatus's describes a File that should be listed.
- *
- * @param statuses the eligible FileStatus objects that we could potentially list
- * @param context processor context with properties values
- * @return a Set containing only those FileStatus objects that we want to list
- */
- Set determineListable(final Set statuses, ProcessContext context) {
- final long minTimestamp = this.latestTimestampListed;
- final TreeMap> orderedEntries = new TreeMap<>();
-
- final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
- // the future relative to the nifi instance, files are not skipped.
- final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
- final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
- final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
- // Build a sorted map to determine the latest possible entries
- for (final FileStatus status : statuses) {
- if (status.getPath().getName().endsWith("_COPYING_")) {
- continue;
- }
-
- final long fileAge = System.currentTimeMillis() - status.getModificationTime();
- if (minimumAge > fileAge || fileAge > maximumAge) {
- continue;
- }
-
- final long entityTimestamp = status.getModificationTime();
-
- if (entityTimestamp > latestTimestampListed) {
- latestTimestampListed = entityTimestamp;
- }
-
- // New entries are all those that occur at or after the associated timestamp
- final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
- if (newEntry) {
- List entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
- if (entitiesForTimestamp == null) {
- entitiesForTimestamp = new ArrayList();
- orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
- }
- entitiesForTimestamp.add(status);
- }
- }
-
- final Set toList = new HashSet<>();
-
- if (orderedEntries.size() > 0) {
- long latestListingTimestamp = orderedEntries.lastKey();
-
- // If the last listing time is equal to the newest entries previously seen,
- // another iteration has occurred without new files and special handling is needed to avoid starvation
- if (latestListingTimestamp == minTimestamp) {
- // We are done if the latest listing timestamp is equal to the last processed time,
- // meaning we handled those items originally passed over
- if (latestListingTimestamp == latestTimestampEmitted) {
- return Collections.emptySet();
- }
- } else {
- // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
- orderedEntries.remove(latestListingTimestamp);
- }
-
- for (List timestampEntities : orderedEntries.values()) {
- for (FileStatus status : timestampEntities) {
- toList.add(status);
- }
- }
- }
-
- return toList;
- }
-
@OnScheduled
public void resetStateIfNecessary(final ProcessContext context) throws IOException {
if (resetState) {
- getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+ getLogger().debug("Property has been modified. Resetting the state values.");
context.getStateManager().clear(Scope.CLUSTER);
- this.resetState = false;
+ resetState = false;
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- // We have to ensure that we don't continually perform listings, because if we perform two listings within
- // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
- // not let that happen.
- final long now = System.nanoTime();
- if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
- lastRunTimestamp = now;
- context.yield();
- return;
- }
- lastRunTimestamp = now;
-
// Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
+ final long latestTimestamp;
+ final List latestFiles;
try {
final StateMap stateMap = session.getState(Scope.CLUSTER);
- if (!stateMap.getStateVersion().isPresent()) {
- latestTimestampEmitted = -1L;
- latestTimestampListed = -1L;
- getLogger().debug("Found no state stored");
- } else {
- // Determine if state is stored in the 'new' format or the 'old' format
- final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
- if (emittedString == null) {
- latestTimestampEmitted = -1L;
- latestTimestampListed = -1L;
- getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
- } else {
- // state is stored in the new format, using just two timestamps
- latestTimestampEmitted = Long.parseLong(emittedString);
- final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
- if (listingTimestmapString != null) {
- latestTimestampListed = Long.parseLong(listingTimestmapString);
- }
+ final String latestTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
- getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
- new Object[] {latestTimestampEmitted, latestTimestampListed});
- }
+ final String legacyLatestListingTimestampString = stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+ final String legacyLatestEmittedTimestampString = stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+ if (legacyLatestListingTimestampString != null) {
+ final long legacyLatestListingTimestamp = Long.parseLong(legacyLatestListingTimestampString);
+ final long legacyLatestEmittedTimestamp = Long.parseLong(legacyLatestEmittedTimestampString);
+ latestTimestamp = legacyLatestListingTimestamp == legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : legacyLatestListingTimestamp;
+ latestFiles = new ArrayList<>();
+ getLogger().debug("Transitioned from legacy state to new state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp': {}'," +
+ "'latestTimestamp': {}", legacyLatestListingTimestamp, legacyLatestEmittedTimestamp, latestTimestamp);
+ } else if (latestTimestampString != null) {
+ latestTimestamp = Long.parseLong(latestTimestampString);
+ latestFiles = stateMap.toMap().entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith("latest.file"))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ } else {
+ latestTimestamp = 0L;
+ latestFiles = new ArrayList<>();
}
- } catch (final IOException ioe) {
+ } catch (IOException e) {
getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
context.yield();
return;
}
// Pull in any file that is newer than the timestamp that we have.
- final FileSystem hdfs = getFileSystem();
- final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
- String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
+ try (final FileSystem hdfs = getFileSystem()) {
+ final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+ final PathFilter pathFilter = createPathFilter(context);
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final Set statuses;
- try {
+ final FileStatusManager fileStatusManager = new FileStatusManager(latestTimestamp, latestFiles);
final Path rootPath = getNormalizedPath(context, DIRECTORY);
- statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
- getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
- } catch (final IOException | IllegalArgumentException e) {
- getLogger().error("Failed to perform listing of HDFS", e);
- return;
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- getLogger().error("Interrupted while performing listing of HDFS", e);
- return;
- }
+ final FileStatusIterable fileStatuses = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
- final Set listable = determineListable(statuses, context);
- getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()});
+ final Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
+ final Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
- // Create FlowFile(s) for the listing, if there are any
- if (!listable.isEmpty()) {
- if (context.getProperty(RECORD_WRITER).isSet()) {
- try {
- createRecords(listable, context, session);
- } catch (final IOException | SchemaNotFoundException e) {
- getLogger().error("Failed to write listing of HDFS", e);
- return;
- }
+ final HadoopFileStatusWriter writer;
+ if (writerFactory == null) {
+ writer = new FlowFileObjectWriter(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp, latestFiles);
} else {
- createFlowFiles(listable, session);
- }
- }
-
- for (final FileStatus status : listable) {
- final long fileModTime = status.getModificationTime();
- if (fileModTime > latestTimestampEmitted) {
- latestTimestampEmitted = fileModTime;
- }
- }
-
- final Map updatedState = new HashMap<>(1);
- updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(latestTimestampListed));
- updatedState.put(EMITTED_TIMESTAMP_KEY, String.valueOf(latestTimestampEmitted));
- getLogger().debug("New state map: {}", new Object[] {updatedState});
-
- try {
- session.setState(updatedState, Scope.CLUSTER);
- } catch (final IOException ioe) {
- getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
- }
-
- final int listCount = listable.size();
- if ( listCount > 0 ) {
- getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
- session.commitAsync();
- } else {
- getLogger().debug("There is no data to list. Yielding.");
- context.yield();
- }
- }
-
- private void createFlowFiles(final Set fileStatuses, final ProcessSession session) {
- for (final FileStatus status : fileStatuses) {
- final Map attributes = createAttributes(status);
- FlowFile flowFile = session.create();
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, getSuccessRelationship());
- }
- }
-
- private void createRecords(final Set fileStatuses, final ProcessContext context, final ProcessSession session) throws IOException, SchemaNotFoundException {
- final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
- FlowFile flowFile = session.create();
- final WriteResult writeResult;
- try (final OutputStream out = session.write(flowFile);
- final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
-
- recordSetWriter.beginRecordSet();
- for (final FileStatus fileStatus : fileStatuses) {
- final Record record = createRecord(fileStatus);
- recordSetWriter.write(record);
+ writer = new RecordObjectWriter(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp,
+ latestFiles, writerFactory, getLogger());
}
- writeResult = recordSetWriter.finishRecordSet();
- }
+ writer.write();
- final Map attributes = new HashMap<>(writeResult.getAttributes());
- attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
- flowFile = session.putAllAttributes(flowFile, attributes);
+ getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatuses.getTotalFileCount(), writer.getListedFileCount());
- session.transfer(flowFile, getSuccessRelationship());
- }
-
- private Record createRecord(final FileStatus fileStatus) {
- final Map values = new HashMap<>();
- values.put(FILENAME, fileStatus.getPath().getName());
- values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
- values.put(OWNER, fileStatus.getOwner());
- values.put(GROUP, fileStatus.getGroup());
- values.put(LAST_MODIFIED, new Timestamp(fileStatus.getModificationTime()));
- values.put(SIZE, fileStatus.getLen());
- values.put(REPLICATION, fileStatus.getReplication());
-
- final FsPermission permission = fileStatus.getPermission();
- final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
- values.put(PERMISSIONS, perms);
-
- values.put(IS_DIRECTORY, fileStatus.isDirectory());
- values.put(IS_SYM_LINK, fileStatus.isSymlink());
- values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
- values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
-
- return new MapRecord(getRecordSchema(), values);
- }
-
- private RecordSchema getRecordSchema() {
- return RECORD_SCHEMA;
- }
-
- private Set getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) throws IOException, InterruptedException {
- final Set statusSet = new HashSet<>();
-
- getLogger().debug("Fetching listing for {}", new Object[] {path});
- final FileStatus[] statuses;
- if (isPostListingFilterNeeded(filterMode)) {
- // For this filter mode, the filter is not passed to listStatus, so that directory names will not be
- // filtered out when the listing is recursive.
- statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.listStatus(path));
- } else {
- statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction) () -> hdfs.listStatus(path, filter));
- }
-
- for ( final FileStatus status : statuses ) {
- if ( status.isDirectory() ) {
- if ( recursive ) {
- try {
- statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter, filterMode));
- } catch (final IOException ioe) {
- getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
- }
+ if (writer.getListedFileCount() > 0) {
+ final Map updatedState = new HashMap<>();
+ updatedState.put(LATEST_TIMESTAMP_KEY, String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
+ final List files = fileStatusManager.getCurrentLatestFiles();
+ for (int i = 0; i < files.size(); i++) {
+ final String currentFilePath = files.get(i);
+ updatedState.put(String.format(LATEST_FILES_KEY, i), currentFilePath);
}
+ getLogger().debug("New state map: {}", updatedState);
+ updateState(session, updatedState);
+
+ getLogger().info("Successfully created listing with {} new files from HDFS", writer.getListedFileCount());
} else {
- if (isPostListingFilterNeeded(filterMode)) {
- // Filtering explicitly performed here, since it was not able to be done when calling listStatus.
- if (filter.accept(status.getPath())) {
- statusSet.add(status);
- }
- } else {
- statusSet.add(status);
- }
+ getLogger().debug("There is no data to list. Yielding.");
+ context.yield();
}
+ } catch (IOException e) {
+ throw new ProcessException("IO error occurred when closing HDFS file system", e);
}
-
- return statusSet;
- }
-
- /**
- * Determines if filtering needs to be applied, after calling {@link FileSystem#listStatus(Path)}, based on the
- * given filter mode.
- * Filter modes that need to be able to search directories regardless of the given filter should return true.
- * FILTER_MODE_FILES_ONLY and FILTER_MODE_FULL_PATH require that {@link FileSystem#listStatus(Path)} be invoked
- * without a filter so that all directories can be traversed when filtering with these modes.
- * FILTER_MODE_DIRECTORIES_AND_FILES should return false, since filtering can be applied directly with
- * {@link FileSystem#listStatus(Path, PathFilter)} regardless of a recursive listing.
- * @param filterMode the value of one of the defined AllowableValues representing filter modes
- * @return true if results need to be filtered, false otherwise
- */
- private boolean isPostListingFilterNeeded(String filterMode) {
- return filterMode.equals(FILTER_MODE_FILES_ONLY) || filterMode.equals(FILTER_MODE_FULL_PATH);
- }
-
- private String getAbsolutePath(final Path path) {
- final Path parent = path.getParent();
- final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
- return prefix + "/" + path.getName();
- }
-
- private Map createAttributes(final FileStatus status) {
- final Map attributes = new HashMap<>();
- attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
- attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
-
- attributes.put(getAttributePrefix() + ".owner", status.getOwner());
- attributes.put(getAttributePrefix() + ".group", status.getGroup());
- attributes.put(getAttributePrefix() + ".lastModified", String.valueOf(status.getModificationTime()));
- attributes.put(getAttributePrefix() + ".length", String.valueOf(status.getLen()));
- attributes.put(getAttributePrefix() + ".replication", String.valueOf(status.getReplication()));
-
- final FsPermission permission = status.getPermission();
- final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
- attributes.put(getAttributePrefix() + ".permissions", perms);
- return attributes;
- }
-
- private String getPerms(final FsAction action) {
- final StringBuilder sb = new StringBuilder();
- if (action.implies(FsAction.READ)) {
- sb.append("r");
- } else {
- sb.append("-");
- }
-
- if (action.implies(FsAction.WRITE)) {
- sb.append("w");
- } else {
- sb.append("-");
- }
-
- if (action.implies(FsAction.EXECUTE)) {
- sb.append("x");
- } else {
- sb.append("-");
- }
-
- return sb.toString();
}
private PathFilter createPathFilter(final ProcessContext context) {
- final String filterMode = context.getProperty(FILE_FILTER_MODE).getValue();
- return path -> {
- final boolean accepted;
- if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
- accepted = fileFilterRegexPattern.matcher(path.toString()).matches()
+ final FilterMode filterMode = FilterMode.forName(context.getProperty(FILE_FILTER_MODE).getValue());
+ final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+
+ switch (filterMode) {
+ case FILTER_MODE_FILES_ONLY:
+ return path -> fileFilterRegexPattern.matcher(path.getName()).matches();
+ case FILTER_MODE_FULL_PATH:
+ return path -> fileFilterRegexPattern.matcher(path.toString()).matches()
|| fileFilterRegexPattern.matcher(Path.getPathWithoutSchemeAndAuthority(path).toString()).matches();
- } else {
- accepted = fileFilterRegexPattern.matcher(path.getName()).matches();
- }
- return accepted;
- };
+ // FILTER_DIRECTORIES_AND_FILES
+ default:
+ return path -> Stream.of(Path.getPathWithoutSchemeAndAuthority(path).toString().split("/"))
+ .skip(getPathSegmentsToSkip(recursive))
+ .allMatch(v -> fileFilterRegexPattern.matcher(v).matches());
+ }
}
- protected Relationship getSuccessRelationship() {
- return REL_SUCCESS;
+ private int getPathSegmentsToSkip(final boolean recursive) {
+ // We need to skip the first leading '/' of the path and if the traverse is recursive
+ // the filter will be applied only to the subdirectories.
+ return recursive ? 2 : 1;
}
- protected String getAttributePrefix() {
- return "hdfs";
+ private void updateState(final ProcessSession session, final Map newState) {
+ // In case of legacy state we update the state even if there are no listable files.
+ try {
+ session.setState(newState, Scope.CLUSTER);
+ } catch (IOException e) {
+ getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", e);
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java
new file mode 100644
index 0000000000..67d64de94b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class FileStatusIterable implements Iterable {
+ private final Path path;
+ private final boolean recursive;
+ private final FileSystem fileSystem;
+ private final UserGroupInformation userGroupInformation;
+ private long totalFileCount;
+
+ public FileStatusIterable(final Path path, final boolean recursive, final FileSystem fileSystem, final UserGroupInformation userGroupInformation) {
+ this.path = path;
+ this.recursive = recursive;
+ this.fileSystem = fileSystem;
+ this.userGroupInformation = userGroupInformation;
+ }
+
+ @Override
+ public Iterator iterator() {
+ return new FileStatusIterator();
+ }
+
+ public long getTotalFileCount() {
+ return totalFileCount;
+ }
+
+ class FileStatusIterator implements Iterator {
+
+ private static final String IO_ERROR_MESSAGE = "IO error occurred while iterating Hadoop File System";
+ private static final String THREAD_INTERRUPT_ERROR_MESSAGE = "Thread was interrupted while iterating Hadoop File System";
+
+ private final Deque dirPaths;
+ private FileStatus nextFileStatus;
+ private RemoteIterator remoteIterator;
+
+ public FileStatusIterator() {
+ dirPaths = new ArrayDeque<>();
+ remoteIterator = getRemoteIterator(path);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nextFileStatus != null) {
+ return true;
+ }
+ try {
+ while (remoteIterator.hasNext() || !dirPaths.isEmpty()) {
+ if (remoteIterator.hasNext()) {
+ FileStatus fs = remoteIterator.next();
+ if (fs.isDirectory()) {
+ if (recursive) {
+ dirPaths.push(fs.getPath());
+ }
+ // if not recursive, continue
+ } else {
+ nextFileStatus = fs;
+ return true;
+ }
+ } else {
+ remoteIterator = getRemoteIterator(dirPaths.pop());
+ }
+ }
+ return false;
+ } catch (IOException e) {
+ throw new ProcessException(IO_ERROR_MESSAGE, e);
+ }
+ }
+
+ @Override
+ public FileStatus next() {
+ if (nextFileStatus == null) {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ }
+ totalFileCount++;
+ final FileStatus nextFileStatus = this.nextFileStatus;
+ this.nextFileStatus = null;
+ return nextFileStatus;
+ }
+
+ private RemoteIterator getRemoteIterator(final Path currentPath) {
+ try {
+ return userGroupInformation.doAs((PrivilegedExceptionAction>) () -> fileSystem.listStatusIterator(currentPath));
+ } catch (IOException e) {
+ throw new ProcessException(IO_ERROR_MESSAGE, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ProcessException(THREAD_INTERRUPT_ERROR_MESSAGE, e);
+ }
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusManager.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusManager.java
new file mode 100644
index 0000000000..b6730d7868
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Keeps a list of the latest modified file paths and the latest modified timestamp of the current run.
+ */
+public class FileStatusManager {
+
+ private final List currentLatestFiles;
+ private long currentLatestTimestamp;
+
+ public FileStatusManager(final long initialLatestTimestamp, final List initialLatestFiles) {
+ currentLatestTimestamp = initialLatestTimestamp;
+ currentLatestFiles = new ArrayList<>(initialLatestFiles);
+ }
+
+ public void update(final FileStatus status) {
+ if (status.getModificationTime() > currentLatestTimestamp) {
+ currentLatestTimestamp = status.getModificationTime();
+ currentLatestFiles.clear();
+ currentLatestFiles.add(status.getPath().toString());
+ } else if (status.getModificationTime() == currentLatestTimestamp) {
+ currentLatestFiles.add(status.getPath().toString());
+ }
+ }
+
+ public List getCurrentLatestFiles() {
+ return Collections.unmodifiableList(currentLatestFiles);
+ }
+
+ public long getCurrentLatestTimestamp() {
+ return currentLatestTimestamp;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FilterMode.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FilterMode.java
new file mode 100644
index 0000000000..fed64ea7a2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FilterMode.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.nifi.components.DescribedValue;
+
+import java.util.stream.Stream;
+
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILE_FILTER;
+import static org.apache.nifi.processors.hadoop.ListHDFS.RECURSE_SUBDIRS;
+
+public enum FilterMode implements DescribedValue {
+
+ FILTER_DIRECTORIES_AND_FILES(
+ "filter-mode-directories-and-files",
+ "Directories and Files",
+ "Filtering will be applied to the names of directories and files. If " + RECURSE_SUBDIRS.getDisplayName()
+ + " is set to true, only subdirectories with a matching name will be searched for files that match "
+ + "the regular expression defined in " + FILE_FILTER.getDisplayName() + "."
+ ),
+ FILTER_MODE_FILES_ONLY(
+ "filter-mode-files-only",
+ "Files Only",
+ "Filtering will only be applied to the names of files. If " + RECURSE_SUBDIRS.getDisplayName()
+ + " is set to true, the entire subdirectory tree will be searched for files that match "
+ + "the regular expression defined in " + FILE_FILTER.getDisplayName() + "."
+ ),
+
+ FILTER_MODE_FULL_PATH(
+ "filter-mode-full-path",
+ "Full Path",
+ "Filtering will be applied by evaluating the regular expression defined in " + FILE_FILTER.getDisplayName()
+ + " against the full path of files with and without the scheme and authority. If "
+ + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+ + "the file matches the regular expression defined in " + FILE_FILTER.getDisplayName() + ". See 'Additional Details' for more information."
+ );
+
+ private final String value;
+ private final String displayName;
+ private final String description;
+
+ FilterMode(final String value, final String displayName, final String description) {
+ this.value = value;
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ public static FilterMode forName(String filterMode) {
+ return Stream.of(values())
+ .filter(fm -> fm.getValue().equalsIgnoreCase(filterMode))
+ .findFirst()
+ .orElseThrow(
+ () -> new IllegalArgumentException("Invalid filter mode: " + filterMode));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileObjectWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileObjectWriter.java
new file mode 100644
index 0000000000..4b9b5608bf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileObjectWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.hadoop.ListHDFS;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FlowFileObjectWriter extends HadoopFileStatusWriter {
+
+ private static final String HDFS_ATTRIBUTE_PREFIX = "hdfs";
+
+ public FlowFileObjectWriter(final ProcessSession session,
+ final FileStatusIterable fileStatuses,
+ final long minimumAge,
+ final long maximumAge,
+ final PathFilter pathFilter,
+ final FileStatusManager fileStatusManager,
+ final long previousLatestModificationTime,
+ final List previousLatestFiles) {
+ super(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, previousLatestModificationTime, previousLatestFiles);
+ }
+
+ @Override
+ public void write() {
+ for (FileStatus status : fileStatusIterable) {
+ if (determineListable(status)) {
+
+ final Map attributes = createAttributes(status);
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(flowFile, ListHDFS.REL_SUCCESS);
+
+ fileStatusManager.update(status);
+ fileCount++;
+ }
+ }
+ }
+
+ private Map createAttributes(final FileStatus status) {
+ final Map attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
+ attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
+ attributes.put(HDFS_ATTRIBUTE_PREFIX + ".owner", status.getOwner());
+ attributes.put(HDFS_ATTRIBUTE_PREFIX + ".group", status.getGroup());
+ attributes.put(HDFS_ATTRIBUTE_PREFIX + ".lastModified", String.valueOf(status.getModificationTime()));
+ attributes.put(HDFS_ATTRIBUTE_PREFIX + ".length", String.valueOf(status.getLen()));
+ attributes.put(HDFS_ATTRIBUTE_PREFIX + ".replication", String.valueOf(status.getReplication()));
+ attributes.put(HDFS_ATTRIBUTE_PREFIX + ".permissions", getPermissionsString(status.getPermission()));
+ return attributes;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HadoopFileStatusWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HadoopFileStatusWriter.java
new file mode 100644
index 0000000000..9adf0d3652
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HadoopFileStatusWriter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+
+import java.util.List;
+
+/**
+ * Interface for common management of writing to records and to FlowFiles.
+ */
+public abstract class HadoopFileStatusWriter {
+
+ protected final ProcessSession session;
+ protected final FileStatusIterable fileStatusIterable;
+ protected final long minimumAge;
+ protected final long maximumAge;
+ protected final PathFilter pathFilter;
+ protected final FileStatusManager fileStatusManager;
+ protected final long previousLatestTimestamp;
+ protected final List previousLatestFiles;
+ protected long fileCount;
+ private final long currentTimeMillis;
+
+
+ HadoopFileStatusWriter(final ProcessSession session,
+ final FileStatusIterable fileStatusIterable,
+ final long minimumAge,
+ final long maximumAge,
+ final PathFilter pathFilter,
+ final FileStatusManager fileStatusManager,
+ final long previousLatestTimestamp,
+ final List previousLatestFiles) {
+ this.session = session;
+ this.fileStatusIterable = fileStatusIterable;
+ this.minimumAge = minimumAge;
+ this.maximumAge = maximumAge;
+ this.pathFilter = pathFilter;
+ this.fileStatusManager = fileStatusManager;
+ this.previousLatestTimestamp = previousLatestTimestamp;
+ this.previousLatestFiles = previousLatestFiles;
+ currentTimeMillis = System.currentTimeMillis();
+ fileCount = 0L;
+ }
+
+ public abstract void write();
+
+ public long getListedFileCount() {
+ return fileCount;
+ }
+
+ protected boolean determineListable(final FileStatus status) {
+
+ final boolean isCopyInProgress = status.getPath().getName().endsWith("_COPYING_");
+ final boolean isFilterAccepted = pathFilter.accept(status.getPath());
+ if (isCopyInProgress || !isFilterAccepted) {
+ return false;
+ }
+ // If the file was created during the processor's last iteration we have to check if it was already listed
+ if (status.getModificationTime() == previousLatestTimestamp) {
+ return !previousLatestFiles.contains(status.getPath().toString());
+ }
+
+ final long fileAge = currentTimeMillis - status.getModificationTime();
+ if (minimumAge > fileAge || fileAge > maximumAge) {
+ return false;
+ }
+
+ return status.getModificationTime() > previousLatestTimestamp;
+ }
+
+ String getAbsolutePath(final Path path) {
+ final Path parent = path.getParent();
+ final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
+ return prefix + "/" + path.getName();
+ }
+
+ String getPerms(final FsAction action) {
+ final StringBuilder sb = new StringBuilder();
+
+ sb.append(action.implies(FsAction.READ) ? "r" : "-");
+ sb.append(action.implies(FsAction.WRITE) ? "w" : "-");
+ sb.append(action.implies(FsAction.EXECUTE) ? "x" : "-");
+
+ return sb.toString();
+ }
+
+ String getPermissionsString(final FsPermission permission) {
+ return String.format("%s%s%s", getPerms(permission.getUserAction()),
+ getPerms(permission.getGroupAction()), getPerms(permission.getOtherAction()));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordObjectWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordObjectWriter.java
new file mode 100644
index 0000000000..066460a022
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordObjectWriter.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.OutputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.hadoop.ListHDFS.REL_SUCCESS;
+
+public class RecordObjectWriter extends HadoopFileStatusWriter {
+
+ private static final RecordSchema RECORD_SCHEMA;
+
+ private static final String FILENAME = "filename";
+ private static final String PATH = "path";
+ private static final String IS_DIRECTORY = "directory";
+ private static final String SIZE = "size";
+ private static final String LAST_MODIFIED = "lastModified";
+ private static final String PERMISSIONS = "permissions";
+ private static final String OWNER = "owner";
+ private static final String GROUP = "group";
+ private static final String REPLICATION = "replication";
+ private static final String IS_SYM_LINK = "symLink";
+ private static final String IS_ENCRYPTED = "encrypted";
+ private static final String IS_ERASURE_CODED = "erasureCoded";
+
+ static {
+ final List recordFields = new ArrayList<>();
+ recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
+ recordFields.add(new RecordField(PATH, RecordFieldType.STRING.getDataType(), false));
+ recordFields.add(new RecordField(IS_DIRECTORY, RecordFieldType.BOOLEAN.getDataType(), false));
+ recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
+ recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
+ recordFields.add(new RecordField(PERMISSIONS, RecordFieldType.STRING.getDataType()));
+ recordFields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
+ recordFields.add(new RecordField(GROUP, RecordFieldType.STRING.getDataType()));
+ recordFields.add(new RecordField(REPLICATION, RecordFieldType.INT.getDataType()));
+ recordFields.add(new RecordField(IS_SYM_LINK, RecordFieldType.BOOLEAN.getDataType()));
+ recordFields.add(new RecordField(IS_ENCRYPTED, RecordFieldType.BOOLEAN.getDataType()));
+ recordFields.add(new RecordField(IS_ERASURE_CODED, RecordFieldType.BOOLEAN.getDataType()));
+ RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
+ }
+
+
+ private final RecordSetWriterFactory writerFactory;
+ private final ComponentLog logger;
+
+ public RecordObjectWriter(final ProcessSession session,
+ final FileStatusIterable fileStatuses,
+ final long minimumAge,
+ final long maximumAge,
+ final PathFilter pathFilter,
+ final FileStatusManager fileStatusManager,
+ final long previousLatestModificationTime,
+ final List previousLatestFiles,
+ final RecordSetWriterFactory writerFactory,
+ final ComponentLog logger) {
+ super(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, previousLatestModificationTime, previousLatestFiles);
+ this.writerFactory = writerFactory;
+ this.logger = logger;
+ }
+
+ @Override
+ public void write() {
+ FlowFile flowFile = session.create();
+
+ final WriteResult writeResult;
+ try (
+ final OutputStream out = session.write(flowFile);
+ final RecordSetWriter recordWriter = writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile)
+ ) {
+ recordWriter.beginRecordSet();
+ for (FileStatus status : fileStatusIterable) {
+ if (determineListable(status)) {
+ recordWriter.write(createRecordForListing(status));
+ fileStatusManager.update(status);
+ }
+ }
+ writeResult = recordWriter.finishRecordSet();
+ } catch (Exception e) {
+ throw new ProcessException("An error occurred while writing results", e);
+ }
+
+ fileCount = writeResult.getRecordCount();
+ if (fileCount == 0) {
+ session.remove(flowFile);
+ } else {
+ final Map attributes = new HashMap<>(writeResult.getAttributes());
+ attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+ }
+
+ private Record createRecordForListing(final FileStatus fileStatus) {
+ final Map values = new HashMap<>();
+ values.put(FILENAME, fileStatus.getPath().getName());
+ values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
+ values.put(OWNER, fileStatus.getOwner());
+ values.put(GROUP, fileStatus.getGroup());
+ values.put(LAST_MODIFIED, new Timestamp(fileStatus.getModificationTime()));
+ values.put(SIZE, fileStatus.getLen());
+ values.put(REPLICATION, fileStatus.getReplication());
+
+ final FsPermission permission = fileStatus.getPermission();
+ final String perms = getPermissionsString(permission);
+ values.put(PERMISSIONS, perms);
+
+ values.put(IS_DIRECTORY, fileStatus.isDirectory());
+ values.put(IS_SYM_LINK, fileStatus.isSymlink());
+ values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
+ values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
+
+ return new MapRecord(RECORD_SCHEMA, values);
+ }
+}
+
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index 6f3ca245de..eb84fd33cf 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -26,10 +26,11 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
@@ -45,39 +46,35 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_FILES_VALUE;
-import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FILES_ONLY_VALUE;
-import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FULL_PATH_VALUE;
+import static org.apache.nifi.processors.hadoop.ListHDFS.LATEST_TIMESTAMP_KEY;
+import static org.apache.nifi.processors.hadoop.ListHDFS.REL_SUCCESS;
+import static org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_DIRECTORIES_AND_FILES;
+import static org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_MODE_FILES_ONLY;
+import static org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_MODE_FULL_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-public class TestListHDFS {
+class TestListHDFS {
private TestRunner runner;
private ListHDFSWithMockedFileSystem proc;
- private NiFiProperties mockNiFiProperties;
- private KerberosProperties kerberosProperties;
private MockComponentLog mockLogger;
@BeforeEach
public void setup() throws InitializationException {
- mockNiFiProperties = mock(NiFiProperties.class);
- when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
- kerberosProperties = new KerberosProperties(null);
+ final KerberosProperties kerberosProperties = new KerberosProperties(null);
proc = new ListHDFSWithMockedFileSystem(kerberosProperties);
mockLogger = spy(new MockComponentLog(UUID.randomUUID().toString(), proc));
@@ -88,16 +85,11 @@ public class TestListHDFS {
}
@Test
- public void testListingWithValidELFunction() throws InterruptedException {
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+ void testListingWithValidELFunction() {
+ addFileStatus("/test", "testFile.txt", false);
runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):substring(0,5)}");
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@@ -107,53 +99,39 @@ public class TestListHDFS {
}
@Test
- public void testListingWithFilter() throws InterruptedException {
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+ void testListingWithFilter() {
+ addFileStatus("/test", "testFile.txt", false);
runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):substring(0,5)}");
runner.setProperty(ListHDFS.FILE_FILTER, "[^test].*");
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
}
@Test
- public void testListingWithInvalidELFunction() throws InterruptedException {
+ void testListingWithInvalidELFunction() {
runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):foo()}");
runner.assertNotValid();
}
@Test
- public void testListingWithUnrecognizedELFunction() throws InterruptedException {
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+ void testListingWithUnrecognizedELFunction() {
+ addFileStatus("/test", "testFile.txt", false);
runner.setProperty(ListHDFS.DIRECTORY, "data_${literal('testing'):substring(0,4)%7D");
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
- runner.run();
+ final AssertionError assertionError = assertThrows(AssertionError.class, () -> runner.run());
+ assertEquals(IllegalArgumentException.class, assertionError.getCause().getClass());
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
}
@Test
- public void testListingHasCorrectAttributes() throws InterruptedException {
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+ void testListingHasCorrectAttributes() {
+ addFileStatus("/test", "testFile.txt", false);
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@@ -164,30 +142,24 @@ public class TestListHDFS {
@Test
- public void testRecursiveWithDefaultFilterAndFilterMode() throws InterruptedException {
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/.testFile.txt")));
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+ void testRecursiveWithDefaultFilterAndFilterMode() {
+ addFileStatus("/test", ".testFile.txt", false);
+ addFileStatus("/test", "testFile.txt", false);
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
-
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+ addFileStatus("/test", "testDir", true);
+ addFileStatus("/test/testDir", "1.txt", false);
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
final List flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
- for (int i=0; i < 2; i++) {
+ for (int i = 0; i < 2; i++) {
final MockFlowFile ff = flowFiles.get(i);
final String filename = ff.getAttribute("filename");
if (filename.equals("testFile.txt")) {
ff.assertAttributeEquals("path", "/test");
- } else if ( filename.equals("1.txt")) {
+ } else if (filename.equals("1.txt")) {
ff.assertAttributeEquals("path", "/test/testDir");
} else {
fail("filename was " + filename);
@@ -196,30 +168,22 @@ public class TestListHDFS {
}
@Test
- public void testRecursiveWithCustomFilterDirectoriesAndFiles() throws InterruptedException, IOException {
+ void testRecursiveWithCustomFilterDirectoriesAndFiles() {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, ".*txt.*");
- runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_DIRECTORIES_AND_FILES_VALUE.getValue());
+ runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_DIRECTORIES_AND_FILES.getValue());
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+ addFileStatus("/test", "testFile.out", false);
+ addFileStatus("/test", "testFile.txt", false);
+ addFileStatus("/test", "testDir", true);
+ addFileStatus("/test/testDir", "1.txt", false);
+ addFileStatus("/test/testDir", "anotherDir", true);
+ addFileStatus("/test/testDir/anotherDir", "2.out", false);
+ addFileStatus("/test/testDir/anotherDir", "2.txt", false);
+ addFileStatus("/test", "txtDir", true);
+ addFileStatus("/test/txtDir", "3.out", false);
+ addFileStatus("/test/txtDir", "3.txt", false);
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
-
- proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
-
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir")));
- proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir/3.out")));
- proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir/3.txt")));
-
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@@ -240,28 +204,21 @@ public class TestListHDFS {
}
@Test
- public void testRecursiveWithCustomFilterFilesOnly() throws InterruptedException, IOException {
+ void testRecursiveWithCustomFilterFilesOnly() {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, "[^\\.].*\\.txt");
- runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FILES_ONLY_VALUE.getValue());
+ runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_MODE_FILES_ONLY.getValue());
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/.partfile.txt")));
+ addFileStatus("/test", "testFile.out", false);
+ addFileStatus("/test", "testFile.txt", false);
+ addFileStatus("/test", ".partfile.txt", false);
+ addFileStatus("/test", "testDir", true);
+ addFileStatus("/test/testDir", "1.txt", false);
+ addFileStatus("/test/testDir", "anotherDir", true);
+ addFileStatus("/test/testDir/anotherDir", ".txt", false);
+ addFileStatus("/test/testDir/anotherDir", "2.out", false);
+ addFileStatus("/test/testDir/anotherDir", "2.txt", false);
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
-
- proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/.txt")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
-
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
@@ -271,55 +228,41 @@ public class TestListHDFS {
final MockFlowFile ff = flowFiles.get(i);
final String filename = ff.getAttribute("filename");
- if (filename.equals("testFile.txt")) {
- ff.assertAttributeEquals("path", "/test");
- } else if (filename.equals("1.txt")) {
- ff.assertAttributeEquals("path", "/test/testDir");
- } else if (filename.equals("2.txt")) {
- ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
- } else {
- fail("filename was " + filename);
+ switch (filename) {
+ case "testFile.txt":
+ ff.assertAttributeEquals("path", "/test");
+ break;
+ case "1.txt":
+ ff.assertAttributeEquals("path", "/test/testDir");
+ break;
+ case "2.txt":
+ ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
+ break;
+ default:
+ fail("filename was " + filename);
+ break;
}
}
}
@Test
- public void testRecursiveWithCustomFilterFullPathWithoutSchemeAndAuthority() throws InterruptedException, IOException {
+ void testRecursiveWithCustomFilterFullPathWithoutSchemeAndAuthority() {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, "(/.*/)*anotherDir/1\\..*");
- runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FULL_PATH_VALUE.getValue());
+ runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_MODE_FULL_PATH.getValue());
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
+ addFileStatus("/test", "testFile.out", false);
+ addFileStatus("/test", "testFile.txt", false);
+ addFileStatus("/test", "testDir", true);
+ addFileStatus("/test/testDir", "1.txt", false);
+ addFileStatus("/test/testDir", "anotherDir", true);
+ addFileStatus("/test/testDir/anotherDir", "1.out", false);
+ addFileStatus("/test/testDir/anotherDir", "1.txt", false);
+ addFileStatus("/test/testDir/anotherDir", "2.out", false);
+ addFileStatus("/test/testDir/anotherDir", "2.txt", false);
+ addFileStatus("/test/testDir", "someDir", true);
+ addFileStatus("/test/testDir/someDir", "1.out", false);
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
-
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
-
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
-
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@@ -340,42 +283,23 @@ public class TestListHDFS {
}
@Test
- public void testRecursiveWithCustomFilterFullPathWithSchemeAndAuthority() throws InterruptedException, IOException {
+ void testRecursiveWithCustomFilterFullPathWithSchemeAndAuthority() {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, "hdfs://hdfscluster:8020(/.*/)*anotherDir/1\\..*");
- runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FULL_PATH_VALUE.getValue());
+ runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_MODE_FULL_PATH.getValue());
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
+ addFileStatus("/test", "testFile.out", false);
+ addFileStatus("/test", "testFile.txt", false);
+ addFileStatus("/test", "testDir", true);
+ addFileStatus("/test/testDir", "1.txt", false);
+ addFileStatus("/test/testDir", "anotherDir", true);
+ addFileStatus("/test/testDir/anotherDir", "1.out", false);
+ addFileStatus("/test/testDir/anotherDir", "1.txt", false);
+ addFileStatus("/test/testDir/anotherDir", "2.out", false);
+ addFileStatus("/test/testDir/anotherDir", "2.txt", false);
+ addFileStatus("/test/testDir", "someDir", true);
+ addFileStatus("/test/testDir/someDir", "1.out", false);
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
-
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
-
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
-
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@@ -396,18 +320,11 @@ public class TestListHDFS {
}
@Test
- public void testNotRecursive() throws InterruptedException {
+ void testNotRecursive() {
runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
-
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
-
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+ addFileStatus("/test", "testFile.txt", false);
+ addFileStatus("/test", "testDir", true);
+ addFileStatus("/test/testDir", "1.txt", false);
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@@ -419,14 +336,10 @@ public class TestListHDFS {
@Test
- public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws IOException, InterruptedException {
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+ void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws IOException {
+ addFileStatus("/test", "testFile.txt", false, 1999L, 0L);
+
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@@ -438,7 +351,7 @@ public class TestListHDFS {
runner.clearTransferState();
// add new file to pull
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 2000L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
+ addFileStatus("/test", "testFile2.txt", false, 2000L, 0L);
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
@@ -453,66 +366,43 @@ public class TestListHDFS {
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, false);
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
- Map newState = runner.getStateManager().getState(Scope.CLUSTER).toMap();
- assertEquals("2000", newState.get(ListHDFS.LISTING_TIMESTAMP_KEY));
- assertEquals("1999", newState.get(ListHDFS.EMITTED_TIMESTAMP_KEY));
-
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
- runner.run();
-
- newState = runner.getStateManager().getState(Scope.CLUSTER).toMap();
- assertEquals("2000", newState.get(ListHDFS.LISTING_TIMESTAMP_KEY));
- assertEquals("2000", newState.get(ListHDFS.EMITTED_TIMESTAMP_KEY));
-
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+ Map newState = runner.getStateManager().getState(Scope.CLUSTER).toMap();
+ assertEquals("2000", newState.get(LATEST_TIMESTAMP_KEY));
}
@Test
- public void testOnlyNewestEntriesHeldBack() throws InterruptedException {
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 8L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
+ void testEntriesWithSameTimestampOnlyAddedOnce() {
+ addFileStatus("/test", "testFile.txt", false, 1L, 0L);
+ addFileStatus("/test", "testFile2.txt", false, 1L, 8L);
// this is a directory, so it won't be counted toward the entries
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 8L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 100L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
- proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 100L, 0L, create777(), "owner", "group", new Path("/test/testDir/2.txt")));
+ addFileStatus("/test", "testDir", true, 1L, 8L);
+ addFileStatus("/test/testDir", "1.txt", false, 1L, 100L);
- // The first iteration should pick up 2 files with the smaller timestamps.
+ // The first iteration should pick up 3 files with the smaller timestamps.
runner.run();
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
+
+ addFileStatus("/test/testDir", "2.txt", false, 1L, 100L);
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
-
- // Next iteration should pick up the other 2 files, since nothing else was added.
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 4);
-
- proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 110L, 0L, create777(), "owner", "group", new Path("/test/testDir/3.txt")));
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
- runner.run();
-
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 4);
-
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
- runner.run();
-
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 5);
}
@Test
- public void testMinAgeMaxAge() throws IOException, InterruptedException {
- long now = new Date().getTime();
- long oneHourAgo = now - 3600000;
- long twoHoursAgo = now - 2*3600000;
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, now, now, create777(), "owner", "group", new Path("/test/willBeIgnored.txt")));
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, now-5, now-5, create777(), "owner", "group", new Path("/test/testFile.txt")));
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, oneHourAgo, oneHourAgo, create777(), "owner", "group", new Path("/test/testFile1.txt")));
- proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, twoHoursAgo, twoHoursAgo, create777(), "owner", "group", new Path("/test/testFile2.txt")));
+ void testMinAgeMaxAge() throws IOException {
+
+ final long now = new Date().getTime();
+ final long oneHourAgo = now - 3600000;
+ final long twoHoursAgo = now - 2 * 3600000;
+
+ addFileStatus("/test", "testFile.txt", false, now - 5, now - 5);
+ addFileStatus("/test", "testFile1.txt", false, oneHourAgo, oneHourAgo);
+ addFileStatus("/test", "testFile2.txt", false, twoHoursAgo, twoHoursAgo);
// all files
runner.run();
@@ -522,38 +412,27 @@ public class TestListHDFS {
runner.getStateManager().clear(Scope.CLUSTER);
// invalid min_age > max_age
- runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
- runner.setProperty(ListHDFS.MAX_AGE, "1 sec");
+ runner.setProperty(ListHDFS.MINIMUM_FILE_AGE, "30 sec");
+ runner.setProperty(ListHDFS.MAXIMUM_FILE_AGE, "1 sec");
runner.assertNotValid();
// only one file (one hour ago)
- runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
- runner.setProperty(ListHDFS.MAX_AGE, "90 min");
+ runner.setProperty(ListHDFS.MINIMUM_FILE_AGE, "30 sec");
+ runner.setProperty(ListHDFS.MAXIMUM_FILE_AGE, "90 min");
runner.assertValid();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
- runner.run(); // will ignore the file for this cycle
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
- // Next iteration should pick up the file, since nothing else was added.
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0).assertAttributeEquals("filename", "testFile1.txt");
runner.clearTransferState();
runner.getStateManager().clear(Scope.CLUSTER);
// two files (one hour ago and two hours ago)
- runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
- runner.removeProperty(ListHDFS.MAX_AGE);
+ runner.setProperty(ListHDFS.MINIMUM_FILE_AGE, "30 sec");
+ runner.removeProperty(ListHDFS.MAXIMUM_FILE_AGE);
runner.assertValid();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
- runner.run();
-
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
-
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@@ -561,79 +440,47 @@ public class TestListHDFS {
runner.getStateManager().clear(Scope.CLUSTER);
// two files (now and one hour ago)
- runner.setProperty(ListHDFS.MIN_AGE, "0 sec");
- runner.setProperty(ListHDFS.MAX_AGE, "90 min");
+ runner.setProperty(ListHDFS.MINIMUM_FILE_AGE, "0 sec");
+ runner.setProperty(ListHDFS.MAXIMUM_FILE_AGE, "90 min");
runner.assertValid();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
}
@Test
- public void testListAfterDirectoryChange() throws InterruptedException {
- proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, false, 1, 1L, 100L,0L, create777(), "owner", "group", new Path("/test1/testFile-1_1.txt")));
- proc.fileSystem.addFileStatus(new Path("/test2"), new FileStatus(1L, false, 1, 1L, 150L,0L, create777(), "owner", "group", new Path("/test2/testFile-2_1.txt")));
- proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, false, 1, 1L, 200L,0L, create777(), "owner", "group", new Path("/test1/testFile-1_2.txt")));
+ void testListAfterDirectoryChange() {
+ addFileStatus("/test1", "testFile-1_1.txt", false, 100L, 0L);
+ addFileStatus("/test1", "testFile-1_2.txt", false, 200L, 0L);
+ addFileStatus("/test2", "testFile-2_1.txt", false, 150L, 0L);
runner.setProperty(ListHDFS.DIRECTORY, "/test1");
runner.run(); // Initial run, latest file from /test1 will be ignored
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
- runner.run(); // Latest file i.e. testFile-1_2.txt from /test1 should also be picked up now
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
runner.setProperty(ListHDFS.DIRECTORY, "/test2"); // Changing directory should reset the state
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
- runner.run(); // Will ignore the files for this cycle
- runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
-
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run(); // Since state has been reset, testFile-2_1.txt from /test2 should be picked up
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
}
@Test
- public void testListingEmptyDir() throws InterruptedException, IOException {
+ void testListingEmptyDir() {
runner.setProperty(ListHDFS.DIRECTORY, "/test/emptyDir");
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
+ addFileStatus("/test", "testFile.out", false);
+ addFileStatus("/test", "testFile.txt", false);
+ addFileStatus("/test", "emptyDir", true);
+ addFileStatus("/test/testDir", "1.txt", false);
+ addFileStatus("/test/testDir/anotherDir", "1.out", false);
+ addFileStatus("/test/testDir/anotherDir", "1.txt", false);
+ addFileStatus("/test/testDir/anotherDir", "2.out", false);
+ addFileStatus("/test/testDir/anotherDir", "2.txt", false);
+ addFileStatus("/test/testDir/someDir", "1.out", false);
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/emptyDir")));
-
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
-
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
-
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
-
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
// verify that no messages were logged at the error level
@@ -641,9 +488,9 @@ public class TestListHDFS {
final ArgumentCaptor throwableArgumentCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(mockLogger, atLeast(0)).error(anyString(), throwableArgumentCaptor.capture());
// if error.(message, throwable) was called, ignore JobConf CNFEs since mapreduce libs are not included as dependencies
- assertTrue(throwableArgumentCaptor.getAllValues().stream().flatMap(Stream::of)
+ assertTrue(throwableArgumentCaptor.getAllValues().stream()
// check that there are no throwables that are not of JobConf CNFE exceptions
- .noneMatch(throwable -> !(throwable instanceof ClassNotFoundException && throwable.getMessage().contains("JobConf"))));
+ .allMatch(throwable -> throwable instanceof ClassNotFoundException && throwable.getMessage().contains("JobConf")));
verify(mockLogger, never()).error(anyString(), any(Object[].class));
verify(mockLogger, never()).error(anyString(), any(Object[].class), any(Throwable.class));
@@ -654,45 +501,22 @@ public class TestListHDFS {
}
@Test
- public void testListingNonExistingDir() throws InterruptedException, IOException {
- String nonExistingPath = "/test/nonExistingDir";
+ void testListingNonExistingDir() {
+ final String nonExistingPath = "/test/nonExistingDir";
runner.setProperty(ListHDFS.DIRECTORY, nonExistingPath);
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
+ addFileStatus("/test", "testFile.out", false);
+ addFileStatus("/test", "testFile.txt", false);
+ addFileStatus("/test", "emptyDir", true);
+ addFileStatus("/test/testDir", "1.txt", false);
+ addFileStatus("/test/testDir/anotherDir", "1.out", false);
+ addFileStatus("/test/testDir/anotherDir", "1.txt", false);
+ addFileStatus("/test/testDir/anotherDir", "2.out", false);
+ addFileStatus("/test/testDir/anotherDir", "2.txt", false);
+ addFileStatus("/test/testDir/someDir", "1.out", false);
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/emptyDir")));
-
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
-
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
-
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
- new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
- proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
- new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
-
- // first iteration will not pick up files because it has to instead check timestamps.
- // We must then wait long enough to ensure that the listing can be performed safely and
- // run the Processor again.
- runner.run();
- Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
- runner.run();
+ final AssertionError assertionError = assertThrows(AssertionError.class, () -> runner.run());
+ assertEquals(ProcessException.class, assertionError.getCause().getClass());
// assert that no files were listed
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
@@ -700,12 +524,41 @@ public class TestListHDFS {
runner.assertPenalizeCount(0);
}
+ @Test
+ void testRecordWriter() throws InitializationException {
+ runner.setProperty(ListHDFS.DIRECTORY, "/test");
+
+ final MockRecordWriter recordWriter = new MockRecordWriter(null, false);
+ runner.addControllerService("record-writer", recordWriter);
+ runner.enableControllerService(recordWriter);
+ runner.setProperty(ListHDFS.RECORD_WRITER, "record-writer");
+
+ addFileStatus("/test", "testFile.out", false);
+ addFileStatus("/test", "testFile.txt", false);
+ addFileStatus("/test", "testDir", true);
+ addFileStatus("/test/testDir", "1.txt", false);
+ addFileStatus("/test/testDir", "anotherDir", true);
+ addFileStatus("/test/testDir/anotherDir", "1.out", false);
+ addFileStatus("/test/testDir/anotherDir", "1.txt", false);
+ addFileStatus("/test/testDir/anotherDir", "2.out", false);
+ addFileStatus("/test/testDir/anotherDir", "2.txt", false);
+ addFileStatus("/test/testDir", "someDir", true);
+ addFileStatus("/test/testDir/someDir", "1.out", false);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+ final List flowFilesForRelationship = runner.getFlowFilesForRelationship(REL_SUCCESS);
+ final MockFlowFile flowFile = flowFilesForRelationship.get(0);
+ flowFile.assertAttributeEquals("record.count", "8");
+ }
+
private FsPermission create777() {
- return new FsPermission((short) 0777);
+ return new FsPermission((short) 0x777);
}
- private class ListHDFSWithMockedFileSystem extends ListHDFS {
+ private static class ListHDFSWithMockedFileSystem extends ListHDFS {
private final MockFileSystem fileSystem = new MockFileSystem();
private final KerberosProperties testKerberosProps;
@@ -724,25 +577,16 @@ public class TestListHDFS {
}
@Override
- protected File getPersistenceFile() {
- return new File("target/conf/state-file");
- }
-
- @Override
- protected FileSystem getFileSystem(final Configuration config) throws IOException {
+ protected FileSystem getFileSystem(final Configuration config) {
return fileSystem;
}
}
- private class MockFileSystem extends FileSystem {
+ private static class MockFileSystem extends FileSystem {
private final Map> fileStatuses = new HashMap<>();
public void addFileStatus(final Path parent, final FileStatus child) {
- Set children = fileStatuses.get(parent);
- if (children == null) {
- children = new HashSet<>();
- fileStatuses.put(parent, children);
- }
+ final Set children = fileStatuses.computeIfAbsent(parent, k -> new HashSet<>());
children.add(child);
@@ -770,33 +614,33 @@ public class TestListHDFS {
}
@Override
- public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
+ public FSDataInputStream open(final Path f, final int bufferSize) {
return null;
}
@Override
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
- final long blockSize, final Progressable progress) throws IOException {
+ final long blockSize, final Progressable progress) {
return null;
}
@Override
- public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException {
+ public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) {
return null;
}
@Override
- public boolean rename(final Path src, final Path dst) throws IOException {
+ public boolean rename(final Path src, final Path dst) {
return false;
}
@Override
- public boolean delete(final Path f, final boolean recursive) throws IOException {
+ public boolean delete(final Path f, final boolean recursive) {
return false;
}
@Override
- public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException {
+ public FileStatus[] listStatus(final Path f) throws IOException {
return fileStatuses.keySet().stream()
// find the key in fileStatuses that matches the given Path f
.filter(pathKey -> f.isAbsoluteAndSchemeAuthorityNull()
@@ -825,14 +669,31 @@ public class TestListHDFS {
}
@Override
- public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
+ public boolean mkdirs(final Path f, final FsPermission permission) {
return false;
}
@Override
- public FileStatus getFileStatus(final Path f) throws IOException {
- return null;
+ public FileStatus getFileStatus(final Path path) {
+ final Optional fileStatus = fileStatuses.values().stream()
+ .flatMap(Set::stream)
+ .filter(fs -> fs.getPath().equals(path))
+ .findFirst();
+ if (fileStatus.isEmpty()) {
+ throw new IllegalArgumentException("Could not find FileStatus");
+ }
+ return fileStatus.get();
}
+ }
+ private void addFileStatus(final String path, final String filename, final boolean isDirectory, final long modificationTime, final long accessTime) {
+ final Path fullPath = new Path("hdfs", "hdfscluster:8020", path);
+ final Path filePath = new Path(fullPath, filename);
+ final FileStatus fileStatus = new FileStatus(1L, isDirectory, 1, 1L, modificationTime, accessTime, create777(), "owner", "group", filePath);
+ proc.fileSystem.addFileStatus(fullPath, fileStatus);
+ }
+
+ private void addFileStatus(final String path, final String filename, final boolean isDirectory) {
+ addFileStatus(path, filename, isDirectory, 0L, 0L);
}
}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFSPerformanceIT.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFSPerformanceIT.java
new file mode 100644
index 0000000000..64ed58cea2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFSPerformanceIT.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.processors.hadoop.util.FilterMode;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.UUID;
+
+import static org.mockito.Mockito.spy;
+
+/**
+ * In order to test different ListHDFS implementations change the ListHDFSWithMockedFileSystem ancestor class to the one in question.
+ * Provide the HADOOP_RESOURCE_CONFIG, the ROOT_DIR and set the depth of the HDFS tree structure (k-ary complete tree) with the number of files.
+ * First create the structure by running createHdfsNaryCompleteTree() test case. Then run the testListHdfsTimeElapsed() test case with
+ * the implementation to test.
+ */
+@Disabled("This is a performance test and should be run manually")
+class TestListHDFSPerformanceIT {
+
+ private static final long BYTE_TO_MB = 1024 * 1024;
+ private static final String HADOOP_RESOURCE_CONFIG = "???";
+ private static final FileSystem FILE_SYSTEM = getFileSystem();
+ private static final String ROOT_DIR = "???";
+ private static final int NUM_CHILDREN = 3;
+ private static final int NUM_OF_FILES = 1000;
+
+
+ private TestRunner runner;
+ private MockComponentLog mockLogger;
+ private ListHDFSWithMockedFileSystem proc;
+
+
+ @BeforeEach
+ public void setup() throws InitializationException {
+ final KerberosProperties kerberosProperties = new KerberosProperties(null);
+
+ proc = new ListHDFSWithMockedFileSystem(kerberosProperties);
+ mockLogger = spy(new MockComponentLog(UUID.randomUUID().toString(), proc));
+ runner = TestRunners.newTestRunner(proc, mockLogger);
+
+ runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, HADOOP_RESOURCE_CONFIG);
+ runner.setProperty(ListHDFS.DIRECTORY, ROOT_DIR);
+ runner.setProperty(ListHDFS.FILE_FILTER_MODE, FilterMode.FILTER_DIRECTORIES_AND_FILES.getValue());
+ runner.setProperty(ListHDFS.FILE_FILTER, "[^\\.].*\\.txt");
+ }
+
+ @Test
+ @Disabled("Enable this test to create an HDFS file tree")
+ void createHdfsNaryCompleteTree() throws IOException {
+ createTree(FILE_SYSTEM, new Path(ROOT_DIR), 0);
+ }
+
+ /**
+ * This only measures an estimate memory usage.
+ */
+ @Test
+ void testListHdfsTimeElapsed() {
+ final Runtime runtime = Runtime.getRuntime();
+ long usedMemoryBefore = getCurrentlyUsedMemory(runtime);
+ Instant start = Instant.now();
+
+ runner.run();
+
+ Instant finish = Instant.now();
+ long timeElapsed = Duration.between(start, finish).toMillis();
+ System.out.println("TIME ELAPSED: " + timeElapsed);
+
+ long usedMemoryAfter = getCurrentlyUsedMemory(runtime);
+ System.out.println("Memory increased (MB):" + (usedMemoryAfter - usedMemoryBefore));
+ }
+
+
+ private long getCurrentlyUsedMemory(final Runtime runtime) {
+ return (runtime.totalMemory() - runtime.freeMemory()) / BYTE_TO_MB;
+ }
+
+ private void createTree(FileSystem fileSystem, Path currentPath, int depth) throws IOException {
+ if (depth >= NUM_CHILDREN) {
+ for (int j = 0; j < NUM_OF_FILES; j++) {
+ fileSystem.createNewFile(new Path(currentPath + "/file_" + j));
+ }
+ return;
+ }
+
+ for (int i = 0; i < NUM_CHILDREN; i++) {
+ String childPath = currentPath.toString() + "/dir_" + i;
+ Path newPath = new Path(childPath);
+ fileSystem.mkdirs(newPath);
+ for (int j = 0; j < NUM_OF_FILES; j++) {
+ fileSystem.createNewFile(new Path(currentPath + "/file_" + j));
+ System.out.println(i + " | " + j + " | File: " + newPath);
+ }
+ System.out.println(i + " | Directory: " + newPath);
+ createTree(fileSystem, newPath, depth + 1);
+ }
+ }
+
+
+ private static FileSystem getFileSystem() {
+ String[] locations = HADOOP_RESOURCE_CONFIG.split(",");
+ Configuration config = new Configuration();
+ for (String resource : locations) {
+ config.addResource(new Path(resource.trim()));
+ }
+ try {
+ return FileSystem.get(config);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to get FileSystem", e);
+ }
+ }
+
+ private static class ListHDFSWithMockedFileSystem extends ListHDFS {
+ private final KerberosProperties testKerberosProps;
+
+ public ListHDFSWithMockedFileSystem(KerberosProperties kerberosProperties) {
+ this.testKerberosProps = kerberosProperties;
+ }
+
+ @Override
+ protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
+ return testKerberosProps;
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/TestFileStatusIterator.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/TestFileStatusIterator.java
new file mode 100644
index 0000000000..7b0bf3da81
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/TestFileStatusIterator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class TestFileStatusIterator {
+ @Mock
+ private FileSystem mockHdfs;
+
+ @Mock
+ private UserGroupInformation mockUserGroupInformation;
+
+ @Mock
+ private FileStatus mockFileStatus1;
+
+ @Mock
+ private FileStatus mockFileStatus2;
+
+ @Mock
+ private FileStatus mockFileStatus3;
+
+ private FileStatusIterable fileStatusIterable;
+
+ @BeforeEach
+ void setup() {
+ fileStatusIterable = new FileStatusIterable(new Path("/path/to/files"), false, mockHdfs, mockUserGroupInformation);
+ }
+
+ @Test
+ void pathWithNoFilesShouldReturnEmptyIterator() throws Exception {
+ when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenReturn(new MockRemoteIterator());
+
+ final Iterator iterator = fileStatusIterable.iterator();
+
+ assertFalse(iterator.hasNext());
+ assertThrows(NoSuchElementException.class, iterator::next);
+ }
+
+ @Test
+ void pathWithMultipleFilesShouldReturnIteratorWithCorrectFiles() throws Exception {
+ final FileStatus[] fileStatuses = {mockFileStatus1, mockFileStatus2, mockFileStatus3};
+ setupFileStatusMocks(fileStatuses);
+
+
+ final Iterator iterator = fileStatusIterable.iterator();
+ final Set expectedFileStatuses = new HashSet<>(Arrays.asList(fileStatuses));
+ final Set actualFileStatuses = new HashSet<>();
+
+ assertTrue(iterator.hasNext());
+ actualFileStatuses.add(iterator.next());
+ assertTrue(iterator.hasNext());
+ actualFileStatuses.add(iterator.next());
+ assertTrue(iterator.hasNext());
+ actualFileStatuses.add(iterator.next());
+
+ assertEquals(expectedFileStatuses, actualFileStatuses);
+
+ assertFalse(iterator.hasNext());
+ assertThrows(NoSuchElementException.class, iterator::next);
+ }
+
+ @Test
+ void getTotalFileCountWithMultipleFilesShouldReturnCorrectCount() throws Exception {
+ final FileStatus[] fileStatuses = {mockFileStatus1, mockFileStatus2, mockFileStatus3};
+ setupFileStatusMocks(fileStatuses);
+
+ assertEquals(0, fileStatusIterable.getTotalFileCount());
+
+ for (FileStatus ignored : fileStatusIterable) {
+ // count files
+ }
+
+ assertEquals(3, fileStatusIterable.getTotalFileCount());
+ }
+
+ private void setupFileStatusMocks(FileStatus[] fileStatuses) throws IOException, InterruptedException {
+ when(mockHdfs.listStatusIterator(any(Path.class))).thenReturn(new MockRemoteIterator(fileStatuses));
+
+ when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenAnswer(invocation -> {
+ // Get the provided lambda expression
+ PrivilegedExceptionAction action = invocation.getArgument(0);
+
+ // Invoke the lambda expression and return the result
+ return action.run();
+ });
+ }
+
+ private static class MockRemoteIterator implements RemoteIterator {
+
+ final Deque deque;
+
+ public MockRemoteIterator(FileStatus... fileStatuses) {
+ deque = new ArrayDeque<>();
+ Collections.addAll(deque, fileStatuses);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !deque.isEmpty();
+ }
+
+ @Override
+ public FileStatus next() {
+ return deque.pop();
+ }
+ }
+}
+