NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

This commit is contained in:
Lehel 2023-05-10 13:33:46 +02:00 committed by Tamas Palfy
parent 063ab5cba8
commit 5bd4bc5190
11 changed files with 1303 additions and 867 deletions

View File

@ -154,6 +154,11 @@
<groupId>org.glassfish.jaxb</groupId>
<artifactId>jaxb-runtime</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -17,12 +17,9 @@
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@ -36,50 +33,40 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.deprecation.log.DeprecationLogger;
import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
import org.apache.nifi.processors.hadoop.util.FileStatusManager;
import org.apache.nifi.processors.hadoop.util.FilterMode;
import org.apache.nifi.processors.hadoop.util.writer.FlowFileObjectWriter;
import org.apache.nifi.processors.hadoop.util.writer.HadoopFileStatusWriter;
import org.apache.nifi.processors.hadoop.util.writer.RecordObjectWriter;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_DIRECTORIES_AND_FILES;
@PrimaryNodeOnly
@TriggerSerially
@ -87,189 +74,116 @@ import java.util.regex.Pattern;
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"hadoop", "HCFS", "HDFS", "get", "list", "ingest", "source", "filesystem"})
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
@CapabilityDescription("Retrieves a listing of files from HDFS. Each time a listing is performed, the files with the latest timestamp will be excluded "
+ "and picked up during the next execution of the processor. This is done to ensure that we do not miss any files, or produce duplicates, in the "
+ "cases where files with the same timestamp are written immediately before and after a single execution of the processor. For each file that is "
+ "listed in HDFS, this processor creates a FlowFile that represents the HDFS file to be fetched in conjunction with FetchHDFS. This Processor is "
+ "designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left "
+ "off without duplicating all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, this processor creates a FlowFile that represents "
+ "the HDFS file to be fetched in conjunction with FetchHDFS. This Processor is designed to run on Primary Node only in a cluster. If the primary "
+ "node changes, the new Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike GetHDFS, this "
+ "Processor does not delete any data from HDFS.")
@WritesAttributes({
@WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."),
@WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, "
+ "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up "
+ "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
@WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"),
@WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"),
@WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
@WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"),
@WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"),
@WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, "
+ "3 for the group, and 3 for other users. For example rw-rw-r--")
@WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."),
@WritesAttribute(attribute = "path", description = "The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, "
+ "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up "
+ "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
@WritesAttribute(attribute = "hdfs.owner", description = "The user that owns the file in HDFS"),
@WritesAttribute(attribute = "hdfs.group", description = "The group that owns the file in HDFS"),
@WritesAttribute(attribute = "hdfs.lastModified", description = "The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
@WritesAttribute(attribute = "hdfs.length", description = "The number of bytes in the file in HDFS"),
@WritesAttribute(attribute = "hdfs.replication", description = "The number of HDFS replicas for hte file"),
@WritesAttribute(attribute = "hdfs.permissions", description = "The permissions for the file in HDFS. This is formatted as 3 characters for the owner, "
+ "3 for the group, and 3 for other users. For example rw-rw-r--")
})
@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of HDFS files, the latest timestamp of all the files listed and the latest "
+ "timestamp of all the files transferred are both stored. This allows the Processor to list only files that have been added or modified after "
+ "this date the next time that the Processor is run, without having to store all of the actual filenames/paths which could lead to performance "
+ "problems. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
+ "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of HDFS files, the latest timestamp of all the files listed is stored. "
+ "This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run, "
+ "without having to store all of the actual filenames/paths which could lead to performance problems. State is stored across the cluster "
+ "so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up where the previous "
+ "node left off, without duplicating the data.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class ListHDFS extends AbstractHadoopProcessor {
private static final RecordSchema RECORD_SCHEMA;
private static final String FILENAME = "filename";
private static final String PATH = "path";
private static final String IS_DIRECTORY = "directory";
private static final String SIZE = "size";
private static final String LAST_MODIFIED = "lastModified";
private static final String PERMISSIONS = "permissions";
private static final String OWNER = "owner";
private static final String GROUP = "group";
private static final String REPLICATION = "replication";
private static final String IS_SYM_LINK = "symLink";
private static final String IS_ENCRYPTED = "encrypted";
private static final String IS_ERASURE_CODED = "erasureCoded";
static {
final List<RecordField> recordFields = new ArrayList<>();
recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
recordFields.add(new RecordField(PATH, RecordFieldType.STRING.getDataType(), false));
recordFields.add(new RecordField(IS_DIRECTORY, RecordFieldType.BOOLEAN.getDataType(), false));
recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
recordFields.add(new RecordField(PERMISSIONS, RecordFieldType.STRING.getDataType()));
recordFields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
recordFields.add(new RecordField(GROUP, RecordFieldType.STRING.getDataType()));
recordFields.add(new RecordField(REPLICATION, RecordFieldType.INT.getDataType()));
recordFields.add(new RecordField(IS_SYM_LINK, RecordFieldType.BOOLEAN.getDataType()));
recordFields.add(new RecordField(IS_ENCRYPTED, RecordFieldType.BOOLEAN.getDataType()));
recordFields.add(new RecordField(IS_ERASURE_CODED, RecordFieldType.BOOLEAN.getDataType()));
RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
}
private static final String NON_HIDDEN_FILES_REGEX = "[^\\.].*";
public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
.name("Recurse Subdirectories")
.description("Indicates whether to list files from subdirectories of the HDFS directory")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
.name("Recurse Subdirectories")
.description("Indicates whether to list files from subdirectories of the HDFS directory")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
"all entities will be written to a single FlowFile.")
.required(false)
.identifiesControllerService(RecordSetWriterFactory.class)
.build();
.name("record-writer")
.displayName("Record Writer")
.description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each "
+ "entity that is listed. If the Record Writer is specified, all entities will be written to a single FlowFile.")
.required(false)
.identifiesControllerService(RecordSetWriterFactory.class)
.build();
public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
.name("File Filter")
.description("Only files whose names match the given regular expression will be picked up")
.required(true)
.defaultValue("[^\\.].*")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
private static final String FILTER_MODE_DIRECTORIES_AND_FILES = "filter-mode-directories-and-files";
private static final String FILTER_MODE_FILES_ONLY = "filter-mode-files-only";
private static final String FILTER_MODE_FULL_PATH = "filter-mode-full-path";
static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES,
"Directories and Files",
"Filtering will be applied to the names of directories and files. If " + RECURSE_SUBDIRS.getDisplayName()
+ " is set to true, only subdirectories with a matching name will be searched for files that match "
+ "the regular expression defined in " + FILE_FILTER.getDisplayName() + ".");
static final AllowableValue FILTER_FILES_ONLY_VALUE = new AllowableValue(FILTER_MODE_FILES_ONLY,
"Files Only",
"Filtering will only be applied to the names of files. If " + RECURSE_SUBDIRS.getDisplayName()
+ " is set to true, the entire subdirectory tree will be searched for files that match "
+ "the regular expression defined in " + FILE_FILTER.getDisplayName() + ".");
static final AllowableValue FILTER_FULL_PATH_VALUE = new AllowableValue(FILTER_MODE_FULL_PATH,
"Full Path",
"Filtering will be applied by evaluating the regular expression defined in " + FILE_FILTER.getDisplayName()
+ " against the full path of files with and without the scheme and authority. If "
+ RECURSE_SUBDIRS.getDisplayName() + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+ "the file matches the regular expression defined in " + FILE_FILTER.getDisplayName() + ". See 'Additional Details' for more information.");
.name("File Filter")
.description("Only files whose names match the given regular expression will be picked up")
.required(true)
.defaultValue(NON_HIDDEN_FILES_REGEX)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor FILE_FILTER_MODE = new PropertyDescriptor.Builder()
.name("file-filter-mode")
.displayName("File Filter Mode")
.description("Determines how the regular expression in " + FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
.required(true)
.allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE, FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE)
.defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue())
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
.name("file-filter-mode")
.displayName("File Filter Mode")
.description("Determines how the regular expression in " + FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
.required(true)
.allowableValues(FilterMode.class)
.defaultValue(FILTER_DIRECTORIES_AND_FILES.getValue())
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
.name("minimum-file-age")
.displayName("Minimum File Age")
.description("The minimum age that a file must be in order to be pulled; any file younger than this "
+ "amount of time (based on last modification date) will be ignored")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
.build();
public static final PropertyDescriptor MINIMUM_FILE_AGE = new PropertyDescriptor.Builder()
.name("minimum-file-age")
.displayName("Minimum File Age")
.description("The minimum age that a file must be in order to be pulled; any file younger than this "
+ "amount of time (based on last modification date) will be ignored")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
.build();
public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
.name("maximum-file-age")
.displayName("Maximum File Age")
.description("The maximum age that a file must be in order to be pulled; any file older than this "
+ "amount of time (based on last modification date) will be ignored. Minimum value is 100ms.")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
.build();
public static final PropertyDescriptor MAXIMUM_FILE_AGE = new PropertyDescriptor.Builder()
.name("maximum-file-age")
.displayName("Maximum File Age")
.description("The maximum age that a file must be in order to be pulled; any file older than this "
+ "amount of time (based on last modification date) will be ignored. Minimum value is 100ms.")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles are transferred to this relationship")
.build();
.name("success")
.description("All FlowFiles are transferred to this relationship")
.build();
public static final String LEGACY_EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
public static final String LEGACY_LISTING_TIMESTAMP_KEY = "listing.timestamp";
public static final String LATEST_TIMESTAMP_KEY = "latest.timestamp";
public static final String LATEST_FILES_KEY = "latest.file.%d";
private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(ListHDFS.class);
private volatile long latestTimestampListed = -1L;
private volatile long latestTimestampEmitted = -1L;
private volatile long lastRunTimestamp = -1L;
private volatile boolean resetState = false;
static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
private Pattern fileFilterRegexPattern;
@Override
protected void init(final ProcessorInitializationContext context) {
super.init(context);
}
private volatile boolean resetState = false;
@Override
protected void preProcessConfiguration(Configuration config, ProcessContext context) {
super.preProcessConfiguration(config, context);
// Since this processor is marked as INPUT_FORBIDDEN, the FILE_FILTER regex can be compiled here rather than during onTrigger processing
fileFilterRegexPattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
}
protected File getPersistenceFile() {
return new File("conf/state/" + getIdentifier());
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> props = new ArrayList<>(properties);
props.add(DIRECTORY);
props.add(RECURSE_SUBDIRS);
props.add(RECORD_WRITER);
props.add(FILE_FILTER);
props.add(FILE_FILTER_MODE);
props.add(MIN_AGE);
props.add(MAX_AGE);
props.addAll(Arrays.asList(DIRECTORY, RECURSE_SUBDIRS, RECORD_WRITER, FILE_FILTER, FILE_FILTER_MODE, MINIMUM_FILE_AGE, MAXIMUM_FILE_AGE));
return props;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
return relationships;
return RELATIONSHIPS;
}
@Override
@ -277,401 +191,148 @@ public class ListHDFS extends AbstractHadoopProcessor {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
if (minimumAge > maximumAge) {
problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
.explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
problems.add(new ValidationResult.Builder().valid(false).subject("ListHDFS Configuration")
.explanation(MINIMUM_FILE_AGE.getDisplayName() + " cannot be greater than " + MAXIMUM_FILE_AGE.getDisplayName()).build());
}
return problems;
}
protected String getKey(final String directory) {
return getIdentifier() + ".lastListingTime." + directory;
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
this.resetState = true;
resetState = true;
}
}
/**
* Determines which of the given FileStatus's describes a File that should be listed.
*
* @param statuses the eligible FileStatus objects that we could potentially list
* @param context processor context with properties values
* @return a Set containing only those FileStatus objects that we want to list
*/
Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
final long minTimestamp = this.latestTimestampListed;
final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
// NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
// the future relative to the nifi instance, files are not skipped.
final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
// Build a sorted map to determine the latest possible entries
for (final FileStatus status : statuses) {
if (status.getPath().getName().endsWith("_COPYING_")) {
continue;
}
final long fileAge = System.currentTimeMillis() - status.getModificationTime();
if (minimumAge > fileAge || fileAge > maximumAge) {
continue;
}
final long entityTimestamp = status.getModificationTime();
if (entityTimestamp > latestTimestampListed) {
latestTimestampListed = entityTimestamp;
}
// New entries are all those that occur at or after the associated timestamp
final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
if (newEntry) {
List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
if (entitiesForTimestamp == null) {
entitiesForTimestamp = new ArrayList<FileStatus>();
orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
}
entitiesForTimestamp.add(status);
}
}
final Set<FileStatus> toList = new HashSet<>();
if (orderedEntries.size() > 0) {
long latestListingTimestamp = orderedEntries.lastKey();
// If the last listing time is equal to the newest entries previously seen,
// another iteration has occurred without new files and special handling is needed to avoid starvation
if (latestListingTimestamp == minTimestamp) {
// We are done if the latest listing timestamp is equal to the last processed time,
// meaning we handled those items originally passed over
if (latestListingTimestamp == latestTimestampEmitted) {
return Collections.emptySet();
}
} else {
// Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
orderedEntries.remove(latestListingTimestamp);
}
for (List<FileStatus> timestampEntities : orderedEntries.values()) {
for (FileStatus status : timestampEntities) {
toList.add(status);
}
}
}
return toList;
}
@OnScheduled
public void resetStateIfNecessary(final ProcessContext context) throws IOException {
if (resetState) {
getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
getLogger().debug("Property has been modified. Resetting the state values.");
context.getStateManager().clear(Scope.CLUSTER);
this.resetState = false;
resetState = false;
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// We have to ensure that we don't continually perform listings, because if we perform two listings within
// the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
// not let that happen.
final long now = System.nanoTime();
if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
lastRunTimestamp = now;
context.yield();
return;
}
lastRunTimestamp = now;
// Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
final long latestTimestamp;
final List<String> latestFiles;
try {
final StateMap stateMap = session.getState(Scope.CLUSTER);
if (!stateMap.getStateVersion().isPresent()) {
latestTimestampEmitted = -1L;
latestTimestampListed = -1L;
getLogger().debug("Found no state stored");
} else {
// Determine if state is stored in the 'new' format or the 'old' format
final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
if (emittedString == null) {
latestTimestampEmitted = -1L;
latestTimestampListed = -1L;
getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
} else {
// state is stored in the new format, using just two timestamps
latestTimestampEmitted = Long.parseLong(emittedString);
final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
if (listingTimestmapString != null) {
latestTimestampListed = Long.parseLong(listingTimestmapString);
}
final String latestTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
new Object[] {latestTimestampEmitted, latestTimestampListed});
}
final String legacyLatestListingTimestampString = stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
final String legacyLatestEmittedTimestampString = stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
if (legacyLatestListingTimestampString != null) {
final long legacyLatestListingTimestamp = Long.parseLong(legacyLatestListingTimestampString);
final long legacyLatestEmittedTimestamp = Long.parseLong(legacyLatestEmittedTimestampString);
latestTimestamp = legacyLatestListingTimestamp == legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : legacyLatestListingTimestamp;
latestFiles = new ArrayList<>();
getLogger().debug("Transitioned from legacy state to new state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp': {}'," +
"'latestTimestamp': {}", legacyLatestListingTimestamp, legacyLatestEmittedTimestamp, latestTimestamp);
} else if (latestTimestampString != null) {
latestTimestamp = Long.parseLong(latestTimestampString);
latestFiles = stateMap.toMap().entrySet().stream()
.filter(entry -> entry.getKey().startsWith("latest.file"))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
} else {
latestTimestamp = 0L;
latestFiles = new ArrayList<>();
}
} catch (final IOException ioe) {
} catch (IOException e) {
getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
context.yield();
return;
}
// Pull in any file that is newer than the timestamp that we have.
final FileSystem hdfs = getFileSystem();
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
try (final FileSystem hdfs = getFileSystem()) {
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
final PathFilter pathFilter = createPathFilter(context);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final Set<FileStatus> statuses;
try {
final FileStatusManager fileStatusManager = new FileStatusManager(latestTimestamp, latestFiles);
final Path rootPath = getNormalizedPath(context, DIRECTORY);
statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
} catch (final IOException | IllegalArgumentException e) {
getLogger().error("Failed to perform listing of HDFS", e);
return;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
getLogger().error("Interrupted while performing listing of HDFS", e);
return;
}
final FileStatusIterable fileStatuses = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
final Set<FileStatus> listable = determineListable(statuses, context);
getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()});
final Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
final Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
// Create FlowFile(s) for the listing, if there are any
if (!listable.isEmpty()) {
if (context.getProperty(RECORD_WRITER).isSet()) {
try {
createRecords(listable, context, session);
} catch (final IOException | SchemaNotFoundException e) {
getLogger().error("Failed to write listing of HDFS", e);
return;
}
final HadoopFileStatusWriter writer;
if (writerFactory == null) {
writer = new FlowFileObjectWriter(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp, latestFiles);
} else {
createFlowFiles(listable, session);
}
}
for (final FileStatus status : listable) {
final long fileModTime = status.getModificationTime();
if (fileModTime > latestTimestampEmitted) {
latestTimestampEmitted = fileModTime;
}
}
final Map<String, String> updatedState = new HashMap<>(1);
updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(latestTimestampListed));
updatedState.put(EMITTED_TIMESTAMP_KEY, String.valueOf(latestTimestampEmitted));
getLogger().debug("New state map: {}", new Object[] {updatedState});
try {
session.setState(updatedState, Scope.CLUSTER);
} catch (final IOException ioe) {
getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
}
final int listCount = listable.size();
if ( listCount > 0 ) {
getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
session.commitAsync();
} else {
getLogger().debug("There is no data to list. Yielding.");
context.yield();
}
}
private void createFlowFiles(final Set<FileStatus> fileStatuses, final ProcessSession session) {
for (final FileStatus status : fileStatuses) {
final Map<String, String> attributes = createAttributes(status);
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, getSuccessRelationship());
}
}
private void createRecords(final Set<FileStatus> fileStatuses, final ProcessContext context, final ProcessSession session) throws IOException, SchemaNotFoundException {
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
FlowFile flowFile = session.create();
final WriteResult writeResult;
try (final OutputStream out = session.write(flowFile);
final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
recordSetWriter.beginRecordSet();
for (final FileStatus fileStatus : fileStatuses) {
final Record record = createRecord(fileStatus);
recordSetWriter.write(record);
writer = new RecordObjectWriter(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp,
latestFiles, writerFactory, getLogger());
}
writeResult = recordSetWriter.finishRecordSet();
}
writer.write();
final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
flowFile = session.putAllAttributes(flowFile, attributes);
getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatuses.getTotalFileCount(), writer.getListedFileCount());
session.transfer(flowFile, getSuccessRelationship());
}
private Record createRecord(final FileStatus fileStatus) {
final Map<String, Object> values = new HashMap<>();
values.put(FILENAME, fileStatus.getPath().getName());
values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
values.put(OWNER, fileStatus.getOwner());
values.put(GROUP, fileStatus.getGroup());
values.put(LAST_MODIFIED, new Timestamp(fileStatus.getModificationTime()));
values.put(SIZE, fileStatus.getLen());
values.put(REPLICATION, fileStatus.getReplication());
final FsPermission permission = fileStatus.getPermission();
final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
values.put(PERMISSIONS, perms);
values.put(IS_DIRECTORY, fileStatus.isDirectory());
values.put(IS_SYM_LINK, fileStatus.isSymlink());
values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
return new MapRecord(getRecordSchema(), values);
}
private RecordSchema getRecordSchema() {
return RECORD_SCHEMA;
}
private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) throws IOException, InterruptedException {
final Set<FileStatus> statusSet = new HashSet<>();
getLogger().debug("Fetching listing for {}", new Object[] {path});
final FileStatus[] statuses;
if (isPostListingFilterNeeded(filterMode)) {
// For this filter mode, the filter is not passed to listStatus, so that directory names will not be
// filtered out when the listing is recursive.
statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path));
} else {
statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path, filter));
}
for ( final FileStatus status : statuses ) {
if ( status.isDirectory() ) {
if ( recursive ) {
try {
statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter, filterMode));
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
}
if (writer.getListedFileCount() > 0) {
final Map<String, String> updatedState = new HashMap<>();
updatedState.put(LATEST_TIMESTAMP_KEY, String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
final List<String> files = fileStatusManager.getCurrentLatestFiles();
for (int i = 0; i < files.size(); i++) {
final String currentFilePath = files.get(i);
updatedState.put(String.format(LATEST_FILES_KEY, i), currentFilePath);
}
getLogger().debug("New state map: {}", updatedState);
updateState(session, updatedState);
getLogger().info("Successfully created listing with {} new files from HDFS", writer.getListedFileCount());
} else {
if (isPostListingFilterNeeded(filterMode)) {
// Filtering explicitly performed here, since it was not able to be done when calling listStatus.
if (filter.accept(status.getPath())) {
statusSet.add(status);
}
} else {
statusSet.add(status);
}
getLogger().debug("There is no data to list. Yielding.");
context.yield();
}
} catch (IOException e) {
throw new ProcessException("IO error occurred when closing HDFS file system", e);
}
return statusSet;
}
/**
* Determines if filtering needs to be applied, after calling {@link FileSystem#listStatus(Path)}, based on the
* given filter mode.
* Filter modes that need to be able to search directories regardless of the given filter should return true.
* FILTER_MODE_FILES_ONLY and FILTER_MODE_FULL_PATH require that {@link FileSystem#listStatus(Path)} be invoked
* without a filter so that all directories can be traversed when filtering with these modes.
* FILTER_MODE_DIRECTORIES_AND_FILES should return false, since filtering can be applied directly with
* {@link FileSystem#listStatus(Path, PathFilter)} regardless of a recursive listing.
* @param filterMode the value of one of the defined AllowableValues representing filter modes
* @return true if results need to be filtered, false otherwise
*/
private boolean isPostListingFilterNeeded(String filterMode) {
return filterMode.equals(FILTER_MODE_FILES_ONLY) || filterMode.equals(FILTER_MODE_FULL_PATH);
}
private String getAbsolutePath(final Path path) {
final Path parent = path.getParent();
final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
return prefix + "/" + path.getName();
}
private Map<String, String> createAttributes(final FileStatus status) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
attributes.put(getAttributePrefix() + ".owner", status.getOwner());
attributes.put(getAttributePrefix() + ".group", status.getGroup());
attributes.put(getAttributePrefix() + ".lastModified", String.valueOf(status.getModificationTime()));
attributes.put(getAttributePrefix() + ".length", String.valueOf(status.getLen()));
attributes.put(getAttributePrefix() + ".replication", String.valueOf(status.getReplication()));
final FsPermission permission = status.getPermission();
final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
attributes.put(getAttributePrefix() + ".permissions", perms);
return attributes;
}
private String getPerms(final FsAction action) {
final StringBuilder sb = new StringBuilder();
if (action.implies(FsAction.READ)) {
sb.append("r");
} else {
sb.append("-");
}
if (action.implies(FsAction.WRITE)) {
sb.append("w");
} else {
sb.append("-");
}
if (action.implies(FsAction.EXECUTE)) {
sb.append("x");
} else {
sb.append("-");
}
return sb.toString();
}
private PathFilter createPathFilter(final ProcessContext context) {
final String filterMode = context.getProperty(FILE_FILTER_MODE).getValue();
return path -> {
final boolean accepted;
if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
accepted = fileFilterRegexPattern.matcher(path.toString()).matches()
final FilterMode filterMode = FilterMode.forName(context.getProperty(FILE_FILTER_MODE).getValue());
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
switch (filterMode) {
case FILTER_MODE_FILES_ONLY:
return path -> fileFilterRegexPattern.matcher(path.getName()).matches();
case FILTER_MODE_FULL_PATH:
return path -> fileFilterRegexPattern.matcher(path.toString()).matches()
|| fileFilterRegexPattern.matcher(Path.getPathWithoutSchemeAndAuthority(path).toString()).matches();
} else {
accepted = fileFilterRegexPattern.matcher(path.getName()).matches();
}
return accepted;
};
// FILTER_DIRECTORIES_AND_FILES
default:
return path -> Stream.of(Path.getPathWithoutSchemeAndAuthority(path).toString().split("/"))
.skip(getPathSegmentsToSkip(recursive))
.allMatch(v -> fileFilterRegexPattern.matcher(v).matches());
}
}
protected Relationship getSuccessRelationship() {
return REL_SUCCESS;
private int getPathSegmentsToSkip(final boolean recursive) {
// We need to skip the first leading '/' of the path and if the traverse is recursive
// the filter will be applied only to the subdirectories.
return recursive ? 2 : 1;
}
protected String getAttributePrefix() {
return "hdfs";
private void updateState(final ProcessSession session, final Map<String, String> newState) {
// In case of legacy state we update the state even if there are no listable files.
try {
session.setState(newState, Scope.CLUSTER);
} catch (IOException e) {
getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", e);
}
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.util;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.processor.exception.ProcessException;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.NoSuchElementException;
public class FileStatusIterable implements Iterable<FileStatus> {
private final Path path;
private final boolean recursive;
private final FileSystem fileSystem;
private final UserGroupInformation userGroupInformation;
private long totalFileCount;
public FileStatusIterable(final Path path, final boolean recursive, final FileSystem fileSystem, final UserGroupInformation userGroupInformation) {
this.path = path;
this.recursive = recursive;
this.fileSystem = fileSystem;
this.userGroupInformation = userGroupInformation;
}
@Override
public Iterator<FileStatus> iterator() {
return new FileStatusIterator();
}
public long getTotalFileCount() {
return totalFileCount;
}
class FileStatusIterator implements Iterator<FileStatus> {
private static final String IO_ERROR_MESSAGE = "IO error occurred while iterating Hadoop File System";
private static final String THREAD_INTERRUPT_ERROR_MESSAGE = "Thread was interrupted while iterating Hadoop File System";
private final Deque<Path> dirPaths;
private FileStatus nextFileStatus;
private RemoteIterator<FileStatus> remoteIterator;
public FileStatusIterator() {
dirPaths = new ArrayDeque<>();
remoteIterator = getRemoteIterator(path);
}
@Override
public boolean hasNext() {
if (nextFileStatus != null) {
return true;
}
try {
while (remoteIterator.hasNext() || !dirPaths.isEmpty()) {
if (remoteIterator.hasNext()) {
FileStatus fs = remoteIterator.next();
if (fs.isDirectory()) {
if (recursive) {
dirPaths.push(fs.getPath());
}
// if not recursive, continue
} else {
nextFileStatus = fs;
return true;
}
} else {
remoteIterator = getRemoteIterator(dirPaths.pop());
}
}
return false;
} catch (IOException e) {
throw new ProcessException(IO_ERROR_MESSAGE, e);
}
}
@Override
public FileStatus next() {
if (nextFileStatus == null) {
if (!hasNext()) {
throw new NoSuchElementException();
}
}
totalFileCount++;
final FileStatus nextFileStatus = this.nextFileStatus;
this.nextFileStatus = null;
return nextFileStatus;
}
private RemoteIterator<FileStatus> getRemoteIterator(final Path currentPath) {
try {
return userGroupInformation.doAs((PrivilegedExceptionAction<RemoteIterator<FileStatus>>) () -> fileSystem.listStatusIterator(currentPath));
} catch (IOException e) {
throw new ProcessException(IO_ERROR_MESSAGE, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ProcessException(THREAD_INTERRUPT_ERROR_MESSAGE, e);
}
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.util;
import org.apache.hadoop.fs.FileStatus;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Keeps a list of the latest modified file paths and the latest modified timestamp of the current run.
*/
public class FileStatusManager {
private final List<String> currentLatestFiles;
private long currentLatestTimestamp;
public FileStatusManager(final long initialLatestTimestamp, final List<String> initialLatestFiles) {
currentLatestTimestamp = initialLatestTimestamp;
currentLatestFiles = new ArrayList<>(initialLatestFiles);
}
public void update(final FileStatus status) {
if (status.getModificationTime() > currentLatestTimestamp) {
currentLatestTimestamp = status.getModificationTime();
currentLatestFiles.clear();
currentLatestFiles.add(status.getPath().toString());
} else if (status.getModificationTime() == currentLatestTimestamp) {
currentLatestFiles.add(status.getPath().toString());
}
}
public List<String> getCurrentLatestFiles() {
return Collections.unmodifiableList(currentLatestFiles);
}
public long getCurrentLatestTimestamp() {
return currentLatestTimestamp;
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.util;
import org.apache.nifi.components.DescribedValue;
import java.util.stream.Stream;
import static org.apache.nifi.processors.hadoop.ListHDFS.FILE_FILTER;
import static org.apache.nifi.processors.hadoop.ListHDFS.RECURSE_SUBDIRS;
public enum FilterMode implements DescribedValue {
FILTER_DIRECTORIES_AND_FILES(
"filter-mode-directories-and-files",
"Directories and Files",
"Filtering will be applied to the names of directories and files. If " + RECURSE_SUBDIRS.getDisplayName()
+ " is set to true, only subdirectories with a matching name will be searched for files that match "
+ "the regular expression defined in " + FILE_FILTER.getDisplayName() + "."
),
FILTER_MODE_FILES_ONLY(
"filter-mode-files-only",
"Files Only",
"Filtering will only be applied to the names of files. If " + RECURSE_SUBDIRS.getDisplayName()
+ " is set to true, the entire subdirectory tree will be searched for files that match "
+ "the regular expression defined in " + FILE_FILTER.getDisplayName() + "."
),
FILTER_MODE_FULL_PATH(
"filter-mode-full-path",
"Full Path",
"Filtering will be applied by evaluating the regular expression defined in " + FILE_FILTER.getDisplayName()
+ " against the full path of files with and without the scheme and authority. If "
+ RECURSE_SUBDIRS.getDisplayName() + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+ "the file matches the regular expression defined in " + FILE_FILTER.getDisplayName() + ". See 'Additional Details' for more information."
);
private final String value;
private final String displayName;
private final String description;
FilterMode(final String value, final String displayName, final String description) {
this.value = value;
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return value;
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
public static FilterMode forName(String filterMode) {
return Stream.of(values())
.filter(fm -> fm.getValue().equalsIgnoreCase(filterMode))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException("Invalid filter mode: " + filterMode));
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.util.writer;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.hadoop.ListHDFS;
import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
import org.apache.nifi.processors.hadoop.util.FileStatusManager;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FlowFileObjectWriter extends HadoopFileStatusWriter {
private static final String HDFS_ATTRIBUTE_PREFIX = "hdfs";
public FlowFileObjectWriter(final ProcessSession session,
final FileStatusIterable fileStatuses,
final long minimumAge,
final long maximumAge,
final PathFilter pathFilter,
final FileStatusManager fileStatusManager,
final long previousLatestModificationTime,
final List<String> previousLatestFiles) {
super(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, previousLatestModificationTime, previousLatestFiles);
}
@Override
public void write() {
for (FileStatus status : fileStatusIterable) {
if (determineListable(status)) {
final Map<String, String> attributes = createAttributes(status);
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, ListHDFS.REL_SUCCESS);
fileStatusManager.update(status);
fileCount++;
}
}
}
private Map<String, String> createAttributes(final FileStatus status) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
attributes.put(HDFS_ATTRIBUTE_PREFIX + ".owner", status.getOwner());
attributes.put(HDFS_ATTRIBUTE_PREFIX + ".group", status.getGroup());
attributes.put(HDFS_ATTRIBUTE_PREFIX + ".lastModified", String.valueOf(status.getModificationTime()));
attributes.put(HDFS_ATTRIBUTE_PREFIX + ".length", String.valueOf(status.getLen()));
attributes.put(HDFS_ATTRIBUTE_PREFIX + ".replication", String.valueOf(status.getReplication()));
attributes.put(HDFS_ATTRIBUTE_PREFIX + ".permissions", getPermissionsString(status.getPermission()));
return attributes;
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.util.writer;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
import org.apache.nifi.processors.hadoop.util.FileStatusManager;
import java.util.List;
/**
* Interface for common management of writing to records and to FlowFiles.
*/
public abstract class HadoopFileStatusWriter {
protected final ProcessSession session;
protected final FileStatusIterable fileStatusIterable;
protected final long minimumAge;
protected final long maximumAge;
protected final PathFilter pathFilter;
protected final FileStatusManager fileStatusManager;
protected final long previousLatestTimestamp;
protected final List<String> previousLatestFiles;
protected long fileCount;
private final long currentTimeMillis;
HadoopFileStatusWriter(final ProcessSession session,
final FileStatusIterable fileStatusIterable,
final long minimumAge,
final long maximumAge,
final PathFilter pathFilter,
final FileStatusManager fileStatusManager,
final long previousLatestTimestamp,
final List<String> previousLatestFiles) {
this.session = session;
this.fileStatusIterable = fileStatusIterable;
this.minimumAge = minimumAge;
this.maximumAge = maximumAge;
this.pathFilter = pathFilter;
this.fileStatusManager = fileStatusManager;
this.previousLatestTimestamp = previousLatestTimestamp;
this.previousLatestFiles = previousLatestFiles;
currentTimeMillis = System.currentTimeMillis();
fileCount = 0L;
}
public abstract void write();
public long getListedFileCount() {
return fileCount;
}
protected boolean determineListable(final FileStatus status) {
final boolean isCopyInProgress = status.getPath().getName().endsWith("_COPYING_");
final boolean isFilterAccepted = pathFilter.accept(status.getPath());
if (isCopyInProgress || !isFilterAccepted) {
return false;
}
// If the file was created during the processor's last iteration we have to check if it was already listed
if (status.getModificationTime() == previousLatestTimestamp) {
return !previousLatestFiles.contains(status.getPath().toString());
}
final long fileAge = currentTimeMillis - status.getModificationTime();
if (minimumAge > fileAge || fileAge > maximumAge) {
return false;
}
return status.getModificationTime() > previousLatestTimestamp;
}
String getAbsolutePath(final Path path) {
final Path parent = path.getParent();
final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
return prefix + "/" + path.getName();
}
String getPerms(final FsAction action) {
final StringBuilder sb = new StringBuilder();
sb.append(action.implies(FsAction.READ) ? "r" : "-");
sb.append(action.implies(FsAction.WRITE) ? "w" : "-");
sb.append(action.implies(FsAction.EXECUTE) ? "x" : "-");
return sb.toString();
}
String getPermissionsString(final FsPermission permission) {
return String.format("%s%s%s", getPerms(permission.getUserAction()),
getPerms(permission.getGroupAction()), getPerms(permission.getOtherAction()));
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.util.writer;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
import org.apache.nifi.processors.hadoop.util.FileStatusManager;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.OutputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.hadoop.ListHDFS.REL_SUCCESS;
public class RecordObjectWriter extends HadoopFileStatusWriter {
private static final RecordSchema RECORD_SCHEMA;
private static final String FILENAME = "filename";
private static final String PATH = "path";
private static final String IS_DIRECTORY = "directory";
private static final String SIZE = "size";
private static final String LAST_MODIFIED = "lastModified";
private static final String PERMISSIONS = "permissions";
private static final String OWNER = "owner";
private static final String GROUP = "group";
private static final String REPLICATION = "replication";
private static final String IS_SYM_LINK = "symLink";
private static final String IS_ENCRYPTED = "encrypted";
private static final String IS_ERASURE_CODED = "erasureCoded";
static {
final List<RecordField> recordFields = new ArrayList<>();
recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
recordFields.add(new RecordField(PATH, RecordFieldType.STRING.getDataType(), false));
recordFields.add(new RecordField(IS_DIRECTORY, RecordFieldType.BOOLEAN.getDataType(), false));
recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
recordFields.add(new RecordField(PERMISSIONS, RecordFieldType.STRING.getDataType()));
recordFields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
recordFields.add(new RecordField(GROUP, RecordFieldType.STRING.getDataType()));
recordFields.add(new RecordField(REPLICATION, RecordFieldType.INT.getDataType()));
recordFields.add(new RecordField(IS_SYM_LINK, RecordFieldType.BOOLEAN.getDataType()));
recordFields.add(new RecordField(IS_ENCRYPTED, RecordFieldType.BOOLEAN.getDataType()));
recordFields.add(new RecordField(IS_ERASURE_CODED, RecordFieldType.BOOLEAN.getDataType()));
RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
}
private final RecordSetWriterFactory writerFactory;
private final ComponentLog logger;
public RecordObjectWriter(final ProcessSession session,
final FileStatusIterable fileStatuses,
final long minimumAge,
final long maximumAge,
final PathFilter pathFilter,
final FileStatusManager fileStatusManager,
final long previousLatestModificationTime,
final List<String> previousLatestFiles,
final RecordSetWriterFactory writerFactory,
final ComponentLog logger) {
super(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, previousLatestModificationTime, previousLatestFiles);
this.writerFactory = writerFactory;
this.logger = logger;
}
@Override
public void write() {
FlowFile flowFile = session.create();
final WriteResult writeResult;
try (
final OutputStream out = session.write(flowFile);
final RecordSetWriter recordWriter = writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile)
) {
recordWriter.beginRecordSet();
for (FileStatus status : fileStatusIterable) {
if (determineListable(status)) {
recordWriter.write(createRecordForListing(status));
fileStatusManager.update(status);
}
}
writeResult = recordWriter.finishRecordSet();
} catch (Exception e) {
throw new ProcessException("An error occurred while writing results", e);
}
fileCount = writeResult.getRecordCount();
if (fileCount == 0) {
session.remove(flowFile);
} else {
final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
}
}
private Record createRecordForListing(final FileStatus fileStatus) {
final Map<String, Object> values = new HashMap<>();
values.put(FILENAME, fileStatus.getPath().getName());
values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
values.put(OWNER, fileStatus.getOwner());
values.put(GROUP, fileStatus.getGroup());
values.put(LAST_MODIFIED, new Timestamp(fileStatus.getModificationTime()));
values.put(SIZE, fileStatus.getLen());
values.put(REPLICATION, fileStatus.getReplication());
final FsPermission permission = fileStatus.getPermission();
final String perms = getPermissionsString(permission);
values.put(PERMISSIONS, perms);
values.put(IS_DIRECTORY, fileStatus.isDirectory());
values.put(IS_SYM_LINK, fileStatus.isSymlink());
values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
return new MapRecord(RECORD_SCHEMA, values);
}
}

View File

@ -26,10 +26,11 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
@ -45,39 +46,35 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_FILES_VALUE;
import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FILES_ONLY_VALUE;
import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FULL_PATH_VALUE;
import static org.apache.nifi.processors.hadoop.ListHDFS.LATEST_TIMESTAMP_KEY;
import static org.apache.nifi.processors.hadoop.ListHDFS.REL_SUCCESS;
import static org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_DIRECTORIES_AND_FILES;
import static org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_MODE_FILES_ONLY;
import static org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_MODE_FULL_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestListHDFS {
class TestListHDFS {
private TestRunner runner;
private ListHDFSWithMockedFileSystem proc;
private NiFiProperties mockNiFiProperties;
private KerberosProperties kerberosProperties;
private MockComponentLog mockLogger;
@BeforeEach
public void setup() throws InitializationException {
mockNiFiProperties = mock(NiFiProperties.class);
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
kerberosProperties = new KerberosProperties(null);
final KerberosProperties kerberosProperties = new KerberosProperties(null);
proc = new ListHDFSWithMockedFileSystem(kerberosProperties);
mockLogger = spy(new MockComponentLog(UUID.randomUUID().toString(), proc));
@ -88,16 +85,11 @@ public class TestListHDFS {
}
@Test
public void testListingWithValidELFunction() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
void testListingWithValidELFunction() {
addFileStatus("/test", "testFile.txt", false);
runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):substring(0,5)}");
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@ -107,53 +99,39 @@ public class TestListHDFS {
}
@Test
public void testListingWithFilter() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
void testListingWithFilter() {
addFileStatus("/test", "testFile.txt", false);
runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):substring(0,5)}");
runner.setProperty(ListHDFS.FILE_FILTER, "[^test].*");
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
}
@Test
public void testListingWithInvalidELFunction() throws InterruptedException {
void testListingWithInvalidELFunction() {
runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):foo()}");
runner.assertNotValid();
}
@Test
public void testListingWithUnrecognizedELFunction() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
void testListingWithUnrecognizedELFunction() {
addFileStatus("/test", "testFile.txt", false);
runner.setProperty(ListHDFS.DIRECTORY, "data_${literal('testing'):substring(0,4)%7D");
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
final AssertionError assertionError = assertThrows(AssertionError.class, () -> runner.run());
assertEquals(IllegalArgumentException.class, assertionError.getCause().getClass());
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
}
@Test
public void testListingHasCorrectAttributes() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
void testListingHasCorrectAttributes() {
addFileStatus("/test", "testFile.txt", false);
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@ -164,30 +142,24 @@ public class TestListHDFS {
@Test
public void testRecursiveWithDefaultFilterAndFilterMode() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/.testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
void testRecursiveWithDefaultFilterAndFilterMode() {
addFileStatus("/test", ".testFile.txt", false);
addFileStatus("/test", "testFile.txt", false);
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
addFileStatus("/test", "testDir", true);
addFileStatus("/test/testDir", "1.txt", false);
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
for (int i=0; i < 2; i++) {
for (int i = 0; i < 2; i++) {
final MockFlowFile ff = flowFiles.get(i);
final String filename = ff.getAttribute("filename");
if (filename.equals("testFile.txt")) {
ff.assertAttributeEquals("path", "/test");
} else if ( filename.equals("1.txt")) {
} else if (filename.equals("1.txt")) {
ff.assertAttributeEquals("path", "/test/testDir");
} else {
fail("filename was " + filename);
@ -196,30 +168,22 @@ public class TestListHDFS {
}
@Test
public void testRecursiveWithCustomFilterDirectoriesAndFiles() throws InterruptedException, IOException {
void testRecursiveWithCustomFilterDirectoriesAndFiles() {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, ".*txt.*");
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_DIRECTORIES_AND_FILES_VALUE.getValue());
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_DIRECTORIES_AND_FILES.getValue());
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
addFileStatus("/test", "testFile.out", false);
addFileStatus("/test", "testFile.txt", false);
addFileStatus("/test", "testDir", true);
addFileStatus("/test/testDir", "1.txt", false);
addFileStatus("/test/testDir", "anotherDir", true);
addFileStatus("/test/testDir/anotherDir", "2.out", false);
addFileStatus("/test/testDir/anotherDir", "2.txt", false);
addFileStatus("/test", "txtDir", true);
addFileStatus("/test/txtDir", "3.out", false);
addFileStatus("/test/txtDir", "3.txt", false);
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir")));
proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir/3.out")));
proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir/3.txt")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@ -240,28 +204,21 @@ public class TestListHDFS {
}
@Test
public void testRecursiveWithCustomFilterFilesOnly() throws InterruptedException, IOException {
void testRecursiveWithCustomFilterFilesOnly() {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, "[^\\.].*\\.txt");
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FILES_ONLY_VALUE.getValue());
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_MODE_FILES_ONLY.getValue());
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/.partfile.txt")));
addFileStatus("/test", "testFile.out", false);
addFileStatus("/test", "testFile.txt", false);
addFileStatus("/test", ".partfile.txt", false);
addFileStatus("/test", "testDir", true);
addFileStatus("/test/testDir", "1.txt", false);
addFileStatus("/test/testDir", "anotherDir", true);
addFileStatus("/test/testDir/anotherDir", ".txt", false);
addFileStatus("/test/testDir/anotherDir", "2.out", false);
addFileStatus("/test/testDir/anotherDir", "2.txt", false);
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
@ -271,55 +228,41 @@ public class TestListHDFS {
final MockFlowFile ff = flowFiles.get(i);
final String filename = ff.getAttribute("filename");
if (filename.equals("testFile.txt")) {
ff.assertAttributeEquals("path", "/test");
} else if (filename.equals("1.txt")) {
ff.assertAttributeEquals("path", "/test/testDir");
} else if (filename.equals("2.txt")) {
ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
} else {
fail("filename was " + filename);
switch (filename) {
case "testFile.txt":
ff.assertAttributeEquals("path", "/test");
break;
case "1.txt":
ff.assertAttributeEquals("path", "/test/testDir");
break;
case "2.txt":
ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
break;
default:
fail("filename was " + filename);
break;
}
}
}
@Test
public void testRecursiveWithCustomFilterFullPathWithoutSchemeAndAuthority() throws InterruptedException, IOException {
void testRecursiveWithCustomFilterFullPathWithoutSchemeAndAuthority() {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, "(/.*/)*anotherDir/1\\..*");
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FULL_PATH_VALUE.getValue());
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_MODE_FULL_PATH.getValue());
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
addFileStatus("/test", "testFile.out", false);
addFileStatus("/test", "testFile.txt", false);
addFileStatus("/test", "testDir", true);
addFileStatus("/test/testDir", "1.txt", false);
addFileStatus("/test/testDir", "anotherDir", true);
addFileStatus("/test/testDir/anotherDir", "1.out", false);
addFileStatus("/test/testDir/anotherDir", "1.txt", false);
addFileStatus("/test/testDir/anotherDir", "2.out", false);
addFileStatus("/test/testDir/anotherDir", "2.txt", false);
addFileStatus("/test/testDir", "someDir", true);
addFileStatus("/test/testDir/someDir", "1.out", false);
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@ -340,42 +283,23 @@ public class TestListHDFS {
}
@Test
public void testRecursiveWithCustomFilterFullPathWithSchemeAndAuthority() throws InterruptedException, IOException {
void testRecursiveWithCustomFilterFullPathWithSchemeAndAuthority() {
// set custom regex filter and filter mode
runner.setProperty(ListHDFS.FILE_FILTER, "hdfs://hdfscluster:8020(/.*/)*anotherDir/1\\..*");
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FULL_PATH_VALUE.getValue());
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_MODE_FULL_PATH.getValue());
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
addFileStatus("/test", "testFile.out", false);
addFileStatus("/test", "testFile.txt", false);
addFileStatus("/test", "testDir", true);
addFileStatus("/test/testDir", "1.txt", false);
addFileStatus("/test/testDir", "anotherDir", true);
addFileStatus("/test/testDir/anotherDir", "1.out", false);
addFileStatus("/test/testDir/anotherDir", "1.txt", false);
addFileStatus("/test/testDir/anotherDir", "2.out", false);
addFileStatus("/test/testDir/anotherDir", "2.txt", false);
addFileStatus("/test/testDir", "someDir", true);
addFileStatus("/test/testDir/someDir", "1.out", false);
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@ -396,18 +320,11 @@ public class TestListHDFS {
}
@Test
public void testNotRecursive() throws InterruptedException {
void testNotRecursive() {
runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
addFileStatus("/test", "testFile.txt", false);
addFileStatus("/test", "testDir", true);
addFileStatus("/test/testDir", "1.txt", false);
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@ -419,14 +336,10 @@ public class TestListHDFS {
@Test
public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws IOException, InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws IOException {
addFileStatus("/test", "testFile.txt", false, 1999L, 0L);
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@ -438,7 +351,7 @@ public class TestListHDFS {
runner.clearTransferState();
// add new file to pull
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 2000L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
addFileStatus("/test", "testFile2.txt", false, 2000L, 0L);
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
@ -453,66 +366,43 @@ public class TestListHDFS {
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, false);
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
Map<String, String> newState = runner.getStateManager().getState(Scope.CLUSTER).toMap();
assertEquals("2000", newState.get(ListHDFS.LISTING_TIMESTAMP_KEY));
assertEquals("1999", newState.get(ListHDFS.EMITTED_TIMESTAMP_KEY));
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
newState = runner.getStateManager().getState(Scope.CLUSTER).toMap();
assertEquals("2000", newState.get(ListHDFS.LISTING_TIMESTAMP_KEY));
assertEquals("2000", newState.get(ListHDFS.EMITTED_TIMESTAMP_KEY));
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
Map<String, String> newState = runner.getStateManager().getState(Scope.CLUSTER).toMap();
assertEquals("2000", newState.get(LATEST_TIMESTAMP_KEY));
}
@Test
public void testOnlyNewestEntriesHeldBack() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 8L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
void testEntriesWithSameTimestampOnlyAddedOnce() {
addFileStatus("/test", "testFile.txt", false, 1L, 0L);
addFileStatus("/test", "testFile2.txt", false, 1L, 8L);
// this is a directory, so it won't be counted toward the entries
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 8L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 100L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 100L, 0L, create777(), "owner", "group", new Path("/test/testDir/2.txt")));
addFileStatus("/test", "testDir", true, 1L, 8L);
addFileStatus("/test/testDir", "1.txt", false, 1L, 100L);
// The first iteration should pick up 2 files with the smaller timestamps.
// The first iteration should pick up 3 files with the smaller timestamps.
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
addFileStatus("/test/testDir", "2.txt", false, 1L, 100L);
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
// Next iteration should pick up the other 2 files, since nothing else was added.
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 4);
proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 110L, 0L, create777(), "owner", "group", new Path("/test/testDir/3.txt")));
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 4);
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 5);
}
@Test
public void testMinAgeMaxAge() throws IOException, InterruptedException {
long now = new Date().getTime();
long oneHourAgo = now - 3600000;
long twoHoursAgo = now - 2*3600000;
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, now, now, create777(), "owner", "group", new Path("/test/willBeIgnored.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, now-5, now-5, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, oneHourAgo, oneHourAgo, create777(), "owner", "group", new Path("/test/testFile1.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, twoHoursAgo, twoHoursAgo, create777(), "owner", "group", new Path("/test/testFile2.txt")));
void testMinAgeMaxAge() throws IOException {
final long now = new Date().getTime();
final long oneHourAgo = now - 3600000;
final long twoHoursAgo = now - 2 * 3600000;
addFileStatus("/test", "testFile.txt", false, now - 5, now - 5);
addFileStatus("/test", "testFile1.txt", false, oneHourAgo, oneHourAgo);
addFileStatus("/test", "testFile2.txt", false, twoHoursAgo, twoHoursAgo);
// all files
runner.run();
@ -522,38 +412,27 @@ public class TestListHDFS {
runner.getStateManager().clear(Scope.CLUSTER);
// invalid min_age > max_age
runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
runner.setProperty(ListHDFS.MAX_AGE, "1 sec");
runner.setProperty(ListHDFS.MINIMUM_FILE_AGE, "30 sec");
runner.setProperty(ListHDFS.MAXIMUM_FILE_AGE, "1 sec");
runner.assertNotValid();
// only one file (one hour ago)
runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
runner.setProperty(ListHDFS.MAX_AGE, "90 min");
runner.setProperty(ListHDFS.MINIMUM_FILE_AGE, "30 sec");
runner.setProperty(ListHDFS.MAXIMUM_FILE_AGE, "90 min");
runner.assertValid();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run(); // will ignore the file for this cycle
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
// Next iteration should pick up the file, since nothing else was added.
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0).assertAttributeEquals("filename", "testFile1.txt");
runner.clearTransferState();
runner.getStateManager().clear(Scope.CLUSTER);
// two files (one hour ago and two hours ago)
runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
runner.removeProperty(ListHDFS.MAX_AGE);
runner.setProperty(ListHDFS.MINIMUM_FILE_AGE, "30 sec");
runner.removeProperty(ListHDFS.MAXIMUM_FILE_AGE);
runner.assertValid();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@ -561,79 +440,47 @@ public class TestListHDFS {
runner.getStateManager().clear(Scope.CLUSTER);
// two files (now and one hour ago)
runner.setProperty(ListHDFS.MIN_AGE, "0 sec");
runner.setProperty(ListHDFS.MAX_AGE, "90 min");
runner.setProperty(ListHDFS.MINIMUM_FILE_AGE, "0 sec");
runner.setProperty(ListHDFS.MAXIMUM_FILE_AGE, "90 min");
runner.assertValid();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
}
@Test
public void testListAfterDirectoryChange() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, false, 1, 1L, 100L,0L, create777(), "owner", "group", new Path("/test1/testFile-1_1.txt")));
proc.fileSystem.addFileStatus(new Path("/test2"), new FileStatus(1L, false, 1, 1L, 150L,0L, create777(), "owner", "group", new Path("/test2/testFile-2_1.txt")));
proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, false, 1, 1L, 200L,0L, create777(), "owner", "group", new Path("/test1/testFile-1_2.txt")));
void testListAfterDirectoryChange() {
addFileStatus("/test1", "testFile-1_1.txt", false, 100L, 0L);
addFileStatus("/test1", "testFile-1_2.txt", false, 200L, 0L);
addFileStatus("/test2", "testFile-2_1.txt", false, 150L, 0L);
runner.setProperty(ListHDFS.DIRECTORY, "/test1");
runner.run(); // Initial run, latest file from /test1 will be ignored
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run(); // Latest file i.e. testFile-1_2.txt from /test1 should also be picked up now
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
runner.setProperty(ListHDFS.DIRECTORY, "/test2"); // Changing directory should reset the state
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run(); // Will ignore the files for this cycle
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run(); // Since state has been reset, testFile-2_1.txt from /test2 should be picked up
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
}
@Test
public void testListingEmptyDir() throws InterruptedException, IOException {
void testListingEmptyDir() {
runner.setProperty(ListHDFS.DIRECTORY, "/test/emptyDir");
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
addFileStatus("/test", "testFile.out", false);
addFileStatus("/test", "testFile.txt", false);
addFileStatus("/test", "emptyDir", true);
addFileStatus("/test/testDir", "1.txt", false);
addFileStatus("/test/testDir/anotherDir", "1.out", false);
addFileStatus("/test/testDir/anotherDir", "1.txt", false);
addFileStatus("/test/testDir/anotherDir", "2.out", false);
addFileStatus("/test/testDir/anotherDir", "2.txt", false);
addFileStatus("/test/testDir/someDir", "1.out", false);
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/emptyDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
// verify that no messages were logged at the error level
@ -641,9 +488,9 @@ public class TestListHDFS {
final ArgumentCaptor<Throwable> throwableArgumentCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(mockLogger, atLeast(0)).error(anyString(), throwableArgumentCaptor.capture());
// if error.(message, throwable) was called, ignore JobConf CNFEs since mapreduce libs are not included as dependencies
assertTrue(throwableArgumentCaptor.getAllValues().stream().flatMap(Stream::of)
assertTrue(throwableArgumentCaptor.getAllValues().stream()
// check that there are no throwables that are not of JobConf CNFE exceptions
.noneMatch(throwable -> !(throwable instanceof ClassNotFoundException && throwable.getMessage().contains("JobConf"))));
.allMatch(throwable -> throwable instanceof ClassNotFoundException && throwable.getMessage().contains("JobConf")));
verify(mockLogger, never()).error(anyString(), any(Object[].class));
verify(mockLogger, never()).error(anyString(), any(Object[].class), any(Throwable.class));
@ -654,45 +501,22 @@ public class TestListHDFS {
}
@Test
public void testListingNonExistingDir() throws InterruptedException, IOException {
String nonExistingPath = "/test/nonExistingDir";
void testListingNonExistingDir() {
final String nonExistingPath = "/test/nonExistingDir";
runner.setProperty(ListHDFS.DIRECTORY, nonExistingPath);
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
addFileStatus("/test", "testFile.out", false);
addFileStatus("/test", "testFile.txt", false);
addFileStatus("/test", "emptyDir", true);
addFileStatus("/test/testDir", "1.txt", false);
addFileStatus("/test/testDir/anotherDir", "1.out", false);
addFileStatus("/test/testDir/anotherDir", "1.txt", false);
addFileStatus("/test/testDir/anotherDir", "2.out", false);
addFileStatus("/test/testDir/anotherDir", "2.txt", false);
addFileStatus("/test/testDir/someDir", "1.out", false);
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/emptyDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
final AssertionError assertionError = assertThrows(AssertionError.class, () -> runner.run());
assertEquals(ProcessException.class, assertionError.getCause().getClass());
// assert that no files were listed
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
@ -700,12 +524,41 @@ public class TestListHDFS {
runner.assertPenalizeCount(0);
}
@Test
void testRecordWriter() throws InitializationException {
runner.setProperty(ListHDFS.DIRECTORY, "/test");
final MockRecordWriter recordWriter = new MockRecordWriter(null, false);
runner.addControllerService("record-writer", recordWriter);
runner.enableControllerService(recordWriter);
runner.setProperty(ListHDFS.RECORD_WRITER, "record-writer");
addFileStatus("/test", "testFile.out", false);
addFileStatus("/test", "testFile.txt", false);
addFileStatus("/test", "testDir", true);
addFileStatus("/test/testDir", "1.txt", false);
addFileStatus("/test/testDir", "anotherDir", true);
addFileStatus("/test/testDir/anotherDir", "1.out", false);
addFileStatus("/test/testDir/anotherDir", "1.txt", false);
addFileStatus("/test/testDir/anotherDir", "2.out", false);
addFileStatus("/test/testDir/anotherDir", "2.txt", false);
addFileStatus("/test/testDir", "someDir", true);
addFileStatus("/test/testDir/someDir", "1.out", false);
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
final List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(REL_SUCCESS);
final MockFlowFile flowFile = flowFilesForRelationship.get(0);
flowFile.assertAttributeEquals("record.count", "8");
}
private FsPermission create777() {
return new FsPermission((short) 0777);
return new FsPermission((short) 0x777);
}
private class ListHDFSWithMockedFileSystem extends ListHDFS {
private static class ListHDFSWithMockedFileSystem extends ListHDFS {
private final MockFileSystem fileSystem = new MockFileSystem();
private final KerberosProperties testKerberosProps;
@ -724,25 +577,16 @@ public class TestListHDFS {
}
@Override
protected File getPersistenceFile() {
return new File("target/conf/state-file");
}
@Override
protected FileSystem getFileSystem(final Configuration config) throws IOException {
protected FileSystem getFileSystem(final Configuration config) {
return fileSystem;
}
}
private class MockFileSystem extends FileSystem {
private static class MockFileSystem extends FileSystem {
private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
public void addFileStatus(final Path parent, final FileStatus child) {
Set<FileStatus> children = fileStatuses.get(parent);
if (children == null) {
children = new HashSet<>();
fileStatuses.put(parent, children);
}
final Set<FileStatus> children = fileStatuses.computeIfAbsent(parent, k -> new HashSet<>());
children.add(child);
@ -770,33 +614,33 @@ public class TestListHDFS {
}
@Override
public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
public FSDataInputStream open(final Path f, final int bufferSize) {
return null;
}
@Override
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
final long blockSize, final Progressable progress) throws IOException {
final long blockSize, final Progressable progress) {
return null;
}
@Override
public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException {
public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) {
return null;
}
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
public boolean rename(final Path src, final Path dst) {
return false;
}
@Override
public boolean delete(final Path f, final boolean recursive) throws IOException {
public boolean delete(final Path f, final boolean recursive) {
return false;
}
@Override
public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException {
public FileStatus[] listStatus(final Path f) throws IOException {
return fileStatuses.keySet().stream()
// find the key in fileStatuses that matches the given Path f
.filter(pathKey -> f.isAbsoluteAndSchemeAuthorityNull()
@ -825,14 +669,31 @@ public class TestListHDFS {
}
@Override
public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
public boolean mkdirs(final Path f, final FsPermission permission) {
return false;
}
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
return null;
public FileStatus getFileStatus(final Path path) {
final Optional<FileStatus> fileStatus = fileStatuses.values().stream()
.flatMap(Set::stream)
.filter(fs -> fs.getPath().equals(path))
.findFirst();
if (fileStatus.isEmpty()) {
throw new IllegalArgumentException("Could not find FileStatus");
}
return fileStatus.get();
}
}
private void addFileStatus(final String path, final String filename, final boolean isDirectory, final long modificationTime, final long accessTime) {
final Path fullPath = new Path("hdfs", "hdfscluster:8020", path);
final Path filePath = new Path(fullPath, filename);
final FileStatus fileStatus = new FileStatus(1L, isDirectory, 1, 1L, modificationTime, accessTime, create777(), "owner", "group", filePath);
proc.fileSystem.addFileStatus(fullPath, fileStatus);
}
private void addFileStatus(final String path, final String filename, final boolean isDirectory) {
addFileStatus(path, filename, isDirectory, 0L, 0L);
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processors.hadoop.util.FilterMode;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import static org.mockito.Mockito.spy;
/**
* In order to test different ListHDFS implementations change the ListHDFSWithMockedFileSystem ancestor class to the one in question.
* Provide the HADOOP_RESOURCE_CONFIG, the ROOT_DIR and set the depth of the HDFS tree structure (k-ary complete tree) with the number of files.
* First create the structure by running createHdfsNaryCompleteTree() test case. Then run the testListHdfsTimeElapsed() test case with
* the implementation to test.
*/
@Disabled("This is a performance test and should be run manually")
class TestListHDFSPerformanceIT {
private static final long BYTE_TO_MB = 1024 * 1024;
private static final String HADOOP_RESOURCE_CONFIG = "???";
private static final FileSystem FILE_SYSTEM = getFileSystem();
private static final String ROOT_DIR = "???";
private static final int NUM_CHILDREN = 3;
private static final int NUM_OF_FILES = 1000;
private TestRunner runner;
private MockComponentLog mockLogger;
private ListHDFSWithMockedFileSystem proc;
@BeforeEach
public void setup() throws InitializationException {
final KerberosProperties kerberosProperties = new KerberosProperties(null);
proc = new ListHDFSWithMockedFileSystem(kerberosProperties);
mockLogger = spy(new MockComponentLog(UUID.randomUUID().toString(), proc));
runner = TestRunners.newTestRunner(proc, mockLogger);
runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, HADOOP_RESOURCE_CONFIG);
runner.setProperty(ListHDFS.DIRECTORY, ROOT_DIR);
runner.setProperty(ListHDFS.FILE_FILTER_MODE, FilterMode.FILTER_DIRECTORIES_AND_FILES.getValue());
runner.setProperty(ListHDFS.FILE_FILTER, "[^\\.].*\\.txt");
}
@Test
@Disabled("Enable this test to create an HDFS file tree")
void createHdfsNaryCompleteTree() throws IOException {
createTree(FILE_SYSTEM, new Path(ROOT_DIR), 0);
}
/**
* This only measures an estimate memory usage.
*/
@Test
void testListHdfsTimeElapsed() {
final Runtime runtime = Runtime.getRuntime();
long usedMemoryBefore = getCurrentlyUsedMemory(runtime);
Instant start = Instant.now();
runner.run();
Instant finish = Instant.now();
long timeElapsed = Duration.between(start, finish).toMillis();
System.out.println("TIME ELAPSED: " + timeElapsed);
long usedMemoryAfter = getCurrentlyUsedMemory(runtime);
System.out.println("Memory increased (MB):" + (usedMemoryAfter - usedMemoryBefore));
}
private long getCurrentlyUsedMemory(final Runtime runtime) {
return (runtime.totalMemory() - runtime.freeMemory()) / BYTE_TO_MB;
}
private void createTree(FileSystem fileSystem, Path currentPath, int depth) throws IOException {
if (depth >= NUM_CHILDREN) {
for (int j = 0; j < NUM_OF_FILES; j++) {
fileSystem.createNewFile(new Path(currentPath + "/file_" + j));
}
return;
}
for (int i = 0; i < NUM_CHILDREN; i++) {
String childPath = currentPath.toString() + "/dir_" + i;
Path newPath = new Path(childPath);
fileSystem.mkdirs(newPath);
for (int j = 0; j < NUM_OF_FILES; j++) {
fileSystem.createNewFile(new Path(currentPath + "/file_" + j));
System.out.println(i + " | " + j + " | File: " + newPath);
}
System.out.println(i + " | Directory: " + newPath);
createTree(fileSystem, newPath, depth + 1);
}
}
private static FileSystem getFileSystem() {
String[] locations = HADOOP_RESOURCE_CONFIG.split(",");
Configuration config = new Configuration();
for (String resource : locations) {
config.addResource(new Path(resource.trim()));
}
try {
return FileSystem.get(config);
} catch (IOException e) {
throw new UncheckedIOException("Failed to get FileSystem", e);
}
}
private static class ListHDFSWithMockedFileSystem extends ListHDFS {
private final KerberosProperties testKerberosProps;
public ListHDFSWithMockedFileSystem(KerberosProperties kerberosProperties) {
this.testKerberosProps = kerberosProperties;
}
@Override
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return testKerberosProps;
}
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop.util;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class TestFileStatusIterator {
@Mock
private FileSystem mockHdfs;
@Mock
private UserGroupInformation mockUserGroupInformation;
@Mock
private FileStatus mockFileStatus1;
@Mock
private FileStatus mockFileStatus2;
@Mock
private FileStatus mockFileStatus3;
private FileStatusIterable fileStatusIterable;
@BeforeEach
void setup() {
fileStatusIterable = new FileStatusIterable(new Path("/path/to/files"), false, mockHdfs, mockUserGroupInformation);
}
@Test
void pathWithNoFilesShouldReturnEmptyIterator() throws Exception {
when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenReturn(new MockRemoteIterator());
final Iterator<FileStatus> iterator = fileStatusIterable.iterator();
assertFalse(iterator.hasNext());
assertThrows(NoSuchElementException.class, iterator::next);
}
@Test
void pathWithMultipleFilesShouldReturnIteratorWithCorrectFiles() throws Exception {
final FileStatus[] fileStatuses = {mockFileStatus1, mockFileStatus2, mockFileStatus3};
setupFileStatusMocks(fileStatuses);
final Iterator<FileStatus> iterator = fileStatusIterable.iterator();
final Set<FileStatus> expectedFileStatuses = new HashSet<>(Arrays.asList(fileStatuses));
final Set<FileStatus> actualFileStatuses = new HashSet<>();
assertTrue(iterator.hasNext());
actualFileStatuses.add(iterator.next());
assertTrue(iterator.hasNext());
actualFileStatuses.add(iterator.next());
assertTrue(iterator.hasNext());
actualFileStatuses.add(iterator.next());
assertEquals(expectedFileStatuses, actualFileStatuses);
assertFalse(iterator.hasNext());
assertThrows(NoSuchElementException.class, iterator::next);
}
@Test
void getTotalFileCountWithMultipleFilesShouldReturnCorrectCount() throws Exception {
final FileStatus[] fileStatuses = {mockFileStatus1, mockFileStatus2, mockFileStatus3};
setupFileStatusMocks(fileStatuses);
assertEquals(0, fileStatusIterable.getTotalFileCount());
for (FileStatus ignored : fileStatusIterable) {
// count files
}
assertEquals(3, fileStatusIterable.getTotalFileCount());
}
private void setupFileStatusMocks(FileStatus[] fileStatuses) throws IOException, InterruptedException {
when(mockHdfs.listStatusIterator(any(Path.class))).thenReturn(new MockRemoteIterator(fileStatuses));
when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenAnswer(invocation -> {
// Get the provided lambda expression
PrivilegedExceptionAction action = invocation.getArgument(0);
// Invoke the lambda expression and return the result
return action.run();
});
}
private static class MockRemoteIterator implements RemoteIterator<FileStatus> {
final Deque<FileStatus> deque;
public MockRemoteIterator(FileStatus... fileStatuses) {
deque = new ArrayDeque<>();
Collections.addAll(deque, fileStatuses);
}
@Override
public boolean hasNext() {
return !deque.isEmpty();
}
@Override
public FileStatus next() {
return deque.pop();
}
}
}