NIFI-4144 - added min/max age to ListHDFS processor

This closes #1966.

Signed-off-by: Tony Kurc <tkurc@apache.org>
This commit is contained in:
Pierre Villard 2017-06-30 18:55:37 +02:00 committed by Tony Kurc
parent 8b5342dec0
commit 7843b885ee
2 changed files with 123 additions and 3 deletions

View File

@ -33,6 +33,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@ -48,6 +50,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -58,7 +61,6 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@TriggerSerially
@TriggerWhenEmpty
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@ -114,6 +116,24 @@ public class ListHDFS extends AbstractHadoopProcessor {
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
.name("minimum-file-age")
.displayName("Minimum File Age")
.description("The minimum age that a file must be in order to be pulled; any file younger than this "
+ "amount of time (based on last modification date) will be ignored")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
.build();
public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
.name("maximum-file-age")
.displayName("Maximum File Age")
.description("The maximum age that a file must be in order to be pulled; any file older than this "
+ "amount of time (based on last modification date) will be ignored. Minimum value is 100ms.")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles are transferred to this relationship")
@ -144,6 +164,8 @@ public class ListHDFS extends AbstractHadoopProcessor {
props.add(DIRECTORY);
props.add(RECURSE_SUBDIRS);
props.add(FILE_FILTER);
props.add(MIN_AGE);
props.add(MAX_AGE);
return props;
}
@ -154,6 +176,23 @@ public class ListHDFS extends AbstractHadoopProcessor {
return relationships;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
if (minimumAge > maximumAge) {
problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
.explanation(MIN_AGE.getName() + " cannot be greater than " + MAX_AGE.getName()).build());
}
return problems;
}
protected String getKey(final String directory) {
return getIdentifier() + ".lastListingTime." + directory;
}
@ -171,18 +210,31 @@ public class ListHDFS extends AbstractHadoopProcessor {
* Determines which of the given FileStatus's describes a File that should be listed.
*
* @param statuses the eligible FileStatus objects that we could potentially list
* @param context processor context with properties values
* @return a Set containing only those FileStatus objects that we want to list
*/
Set<FileStatus> determineListable(final Set<FileStatus> statuses) {
Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
final long minTimestamp = this.latestTimestampListed;
final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
// NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
// the future relative to the nifi instance, files are not skipped.
final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
// Build a sorted map to determine the latest possible entries
for (final FileStatus status : statuses) {
if (status.getPath().getName().endsWith("_COPYING_")) {
continue;
}
final long fileAge = System.currentTimeMillis() - status.getModificationTime();
if (minimumAge > fileAge || fileAge > maximumAge) {
continue;
}
final long entityTimestamp = status.getModificationTime();
if (entityTimestamp > latestTimestampListed) {
@ -293,7 +345,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
return;
}
final Set<FileStatus> listable = determineListable(statuses);
final Set<FileStatus> listable = determineListable(statuses, context);
getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()});
for (final FileStatus status : listable) {

View File

@ -44,6 +44,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -313,6 +314,73 @@ public class TestListHDFS {
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 5);
}
@Test
public void testMinAgeMaxAge() throws IOException, InterruptedException {
long now = new Date().getTime();
long oneHourAgo = now - 3600000;
long twoHoursAgo = now - 2*3600000;
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, now, now, create777(), "owner", "group", new Path("/test/willBeIgnored.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, now-5, now-5, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, oneHourAgo, oneHourAgo, create777(), "owner", "group", new Path("/test/testFile1.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, twoHoursAgo, twoHoursAgo, create777(), "owner", "group", new Path("/test/testFile2.txt")));
// all files
runner.run();
runner.assertValid();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
runner.clearTransferState();
runner.getStateManager().clear(Scope.CLUSTER);
// invalid min_age > max_age
runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
runner.setProperty(ListHDFS.MAX_AGE, "1 sec");
runner.assertNotValid();
// only one file (one hour ago)
runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
runner.setProperty(ListHDFS.MAX_AGE, "90 min");
runner.assertValid();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run(); // will ignore the file for this cycle
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
// Next iteration should pick up the file, since nothing else was added.
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0).assertAttributeEquals("filename", "testFile1.txt");
runner.clearTransferState();
runner.getStateManager().clear(Scope.CLUSTER);
// two files (one hour ago and two hours ago)
runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
runner.removeProperty(ListHDFS.MAX_AGE);
runner.assertValid();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
runner.clearTransferState();
runner.getStateManager().clear(Scope.CLUSTER);
// two files (now and one hour ago)
runner.setProperty(ListHDFS.MIN_AGE, "0 sec");
runner.setProperty(ListHDFS.MAX_AGE, "90 min");
runner.assertValid();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
}
private FsPermission create777() {
return new FsPermission((short) 0777);