NIFI-5426: Use NIO.2 API for ListFile

This commit is contained in:
Marco Gaido 2018-07-13 16:00:02 +02:00 committed by Mark Payne
parent ffbff42421
commit 31bb89514c
2 changed files with 62 additions and 102 deletions

View File

@ -39,11 +39,11 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.processors.standard.util.FileInfoFilter;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -65,7 +65,10 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@TriggerSerially
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@ -198,9 +201,9 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final AtomicReference<FileInfoFilter> fileFilterRef = new AtomicReference<>();
private volatile boolean includeFileAttributes;
private final AtomicReference<BiPredicate<Path, BasicFileAttributes>> fileFilterRef = new AtomicReference<BiPredicate<Path, BasicFileAttributes>>();
public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
@ -328,9 +331,36 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
@Override
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
final File path = new File(getPath(context));
final Path path = new File(getPath(context)).toPath();
final Boolean recurse = context.getProperty(RECURSE).asBoolean();
return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp);
final Map<Path, BasicFileAttributes> lastModifiedMap = new HashMap<>();
final BiPredicate<Path, BasicFileAttributes> fileFilter = fileFilterRef.get();
int maxDepth = recurse ? Integer.MAX_VALUE : 1;
BiPredicate<Path, BasicFileAttributes> matcher = (p, attributes) -> {
if (!attributes.isDirectory()
&& (minTimestamp == null || attributes.lastModifiedTime().toMillis() >= minTimestamp)
&& fileFilter.test(p, attributes)) {
// We store the attributes for each Path we are returning in order to avoid to
// retrieve them again later when creating the FileInfo
lastModifiedMap.put(p, attributes);
return true;
}
return false;
};
Stream<Path> inputStream = Files.find(path, maxDepth, matcher, FileVisitOption.FOLLOW_LINKS);
Stream<FileInfo> listing = inputStream.map(p -> {
File file = p.toFile();
BasicFileAttributes attributes = lastModifiedMap.get(p);
return new FileInfo.Builder()
.directory(false)
.filename(file.getName())
.fullPathFileName(file.getAbsolutePath())
.lastModifiedTime(attributes.lastModifiedTime().toMillis())
.size(attributes.size())
.build();
});
return listing.collect(Collectors.toList());
}
@Override
@ -346,36 +376,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
|| IGNORE_HIDDEN_FILES.equals(property);
}
private List<FileInfo> scanDirectory(final File path, final FileInfoFilter filter, final Boolean recurse,
final Long minTimestamp) throws IOException {
final List<FileInfo> listing = new ArrayList<>();
File[] files = path.listFiles();
if (files != null) {
for (File file : files) {
if (file.isDirectory()) {
if (recurse) {
listing.addAll(scanDirectory(file, filter, true, minTimestamp));
}
} else {
FileInfo fileInfo = new FileInfo.Builder()
.directory(false)
.filename(file.getName())
.fullPathFileName(file.getAbsolutePath())
.size(file.length())
.lastModifiedTime(file.lastModified())
.build();
if ((minTimestamp == null || fileInfo.getLastModifiedTime() >= minTimestamp)
&& filter.accept(file, fileInfo)) {
listing.add(fileInfo);
}
}
}
}
return listing;
}
private FileInfoFilter createFileFilter(final ProcessContext context) {
private BiPredicate<Path, BasicFileAttributes> createFileFilter(final ProcessContext context) {
final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
@ -386,41 +387,36 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
return new FileInfoFilter() {
@Override
public boolean accept(final File file, final FileInfo info) {
if (minSize > info.getSize()) {
return false;
}
if (maxSize != null && maxSize < info.getSize()) {
return false;
}
final long fileAge = System.currentTimeMillis() - info.getLastModifiedTime();
if (minAge > fileAge) {
return false;
}
if (maxAge != null && maxAge < fileAge) {
return false;
}
if (ignoreHidden && file.isHidden()) {
return false;
}
if (pathPattern != null) {
Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
if (reldir != null && !reldir.toString().isEmpty()) {
if (!pathPattern.matcher(reldir.toString()).matches()) {
return false;
}
return (path, attributes) -> {
if (minSize > attributes.size()) {
return false;
}
if (maxSize != null && maxSize < attributes.size()) {
return false;
}
final long fileAge = System.currentTimeMillis() - attributes.lastModifiedTime().toMillis();
if (minAge > fileAge) {
return false;
}
if (maxAge != null && maxAge < fileAge) {
return false;
}
if (ignoreHidden && path.toFile().isHidden()) {
return false;
}
if (pathPattern != null) {
Path reldir = Paths.get(indir).relativize(path).getParent();
if (reldir != null && !reldir.toString().isEmpty()) {
if (!pathPattern.matcher(reldir.toString()).matches()) {
return false;
}
}
//Verify that we have at least read permissions on the file we're considering grabbing
if (!Files.isReadable(file.toPath())) {
return false;
}
return filePattern.matcher(file.getName()).matches();
}
// Verify that we have at least read permissions on the file we're considering grabbing
if (!Files.isReadable(path)) {
return false;
}
return filePattern.matcher(path.getFileName().toString()).matches();
};
}
}

View File

@ -1,36 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import java.io.File;
/**
* A filter for files.
*/
public interface FileInfoFilter {
/**
* Tests whether or not the specified file with its
* <code>{@link org.apache.nifi.processors.standard.util.FileInfo}</code> satisfies the
* requirements of this filter.
*
* @param file The file to be tested
* @param info Attributes and info about the file.
* @return <code>true</code> if and only if the <code>file</code> should be included
*/
boolean accept(File file, FileInfo info);
}