NIFI-4631: Use java.nio.file.Files in ListFile to improve performance

This closes #2565.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Marco Gaido 2018-03-19 14:48:27 +01:00 committed by Mark Payne
parent 66590b78f5
commit 758e44682f
1 changed files with 49 additions and 63 deletions

View File

@ -40,7 +40,6 @@ import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.standard.util.FileInfo;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
@ -64,7 +63,10 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@TriggerSerially
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@ -189,7 +191,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
private final AtomicReference<BiPredicate<Path, BasicFileAttributes>> fileFilterRef = new AtomicReference<BiPredicate<Path, BasicFileAttributes>>();
public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
@ -311,9 +313,24 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
@Override
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
final File path = new File(getPath(context));
final Path path = new File(getPath(context)).toPath();
final Boolean recurse = context.getProperty(RECURSE).asBoolean();
return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp);
final BiPredicate<Path, BasicFileAttributes> fileFilter = fileFilterRef.get();
int maxDepth = recurse? Integer.MAX_VALUE : 1;
Stream<Path> inputStream = Files.find(path, maxDepth, (p, attributes) ->
!attributes.isDirectory() && (minTimestamp == null || attributes.lastModifiedTime().toMillis() >= minTimestamp)
&& fileFilter.test(p, attributes));
Stream<FileInfo> listing = inputStream.map(p -> {
File file = p.toFile();
return new FileInfo.Builder()
.directory(file.isDirectory())
.filename(file.getName())
.fullPathFileName(file.getAbsolutePath())
.lastModifiedTime(file.lastModified())
.build();
});
return listing.collect(Collectors.toList());
}
@Override
@ -329,33 +346,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
|| IGNORE_HIDDEN_FILES.equals(property);
}
private List<FileInfo> scanDirectory(final File path, final FileFilter filter, final Boolean recurse,
final Long minTimestamp) throws IOException {
final List<FileInfo> listing = new ArrayList<>();
File[] files = path.listFiles();
if (files != null) {
for (File file : files) {
if (file.isDirectory()) {
if (recurse) {
listing.addAll(scanDirectory(file, filter, true, minTimestamp));
}
} else {
if ((minTimestamp == null || file.lastModified() >= minTimestamp) && filter.accept(file)) {
listing.add(new FileInfo.Builder()
.directory(file.isDirectory())
.filename(file.getName())
.fullPathFileName(file.getAbsolutePath())
.lastModifiedTime(file.lastModified())
.build());
}
}
}
}
return listing;
}
private FileFilter createFileFilter(final ProcessContext context) {
private BiPredicate<Path, BasicFileAttributes> createFileFilter(final ProcessContext context) {
final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
@ -366,41 +357,36 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
return new FileFilter() {
@Override
public boolean accept(final File file) {
if (minSize > file.length()) {
return false;
}
if (maxSize != null && maxSize < file.length()) {
return false;
}
final long fileAge = System.currentTimeMillis() - file.lastModified();
if (minAge > fileAge) {
return false;
}
if (maxAge != null && maxAge < fileAge) {
return false;
}
if (ignoreHidden && file.isHidden()) {
return false;
}
if (pathPattern != null) {
Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
if (reldir != null && !reldir.toString().isEmpty()) {
if (!pathPattern.matcher(reldir.toString()).matches()) {
return false;
}
return (path, attributes) -> {
if (minSize > attributes.size()) {
return false;
}
if (maxSize != null && maxSize < attributes.size()) {
return false;
}
final long fileAge = System.currentTimeMillis() - attributes.lastModifiedTime().toMillis();
if (minAge > fileAge) {
return false;
}
if (maxAge != null && maxAge < fileAge) {
return false;
}
if (ignoreHidden && path.toFile().isHidden()) {
return false;
}
if (pathPattern != null) {
Path reldir = Paths.get(indir).relativize(path).getParent();
if (reldir != null && !reldir.toString().isEmpty()) {
if (!pathPattern.matcher(reldir.toString()).matches()) {
return false;
}
}
//Verify that we have at least read permissions on the file we're considering grabbing
if (!Files.isReadable(file.toPath())) {
return false;
}
return filePattern.matcher(file.getName()).matches();
}
// Verify that we have at least read permissions on the file we're considering grabbing
if (!Files.isReadable(path)) {
return false;
}
return filePattern.matcher(path.getFileName().toString()).matches();
};
}
}