From 31bb89514cfa0e5df9db578387d566c7b5117c45 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 13 Jul 2018 16:00:02 +0200 Subject: [PATCH] NIFI-5426: Use NIO.2 API for ListFile --- .../nifi/processors/standard/ListFile.java | 128 +++++++++--------- .../standard/util/FileInfoFilter.java | 36 ----- 2 files changed, 62 insertions(+), 102 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfoFilter.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java index d31fe2a49b..3923350b1e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java @@ -39,11 +39,11 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.list.AbstractListProcessor; import org.apache.nifi.processors.standard.util.FileInfo; -import org.apache.nifi.processors.standard.util.FileInfoFilter; import java.io.File; import java.io.IOException; import java.nio.file.FileStore; +import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -65,7 +65,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiPredicate; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; @TriggerSerially @InputRequirement(Requirement.INPUT_FORBIDDEN) @@ -198,9 +201,9 @@ public class ListFile extends AbstractListProcessor { private List properties; private Set relationships; - private final AtomicReference fileFilterRef = new AtomicReference<>(); private volatile boolean includeFileAttributes; + private final AtomicReference> fileFilterRef = new AtomicReference>(); public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime"; public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime"; @@ -328,9 +331,36 @@ public class ListFile extends AbstractListProcessor { @Override protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException { - final File path = new File(getPath(context)); + final Path path = new File(getPath(context)).toPath(); final Boolean recurse = context.getProperty(RECURSE).asBoolean(); - return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp); + final Map lastModifiedMap = new HashMap<>(); + + final BiPredicate fileFilter = fileFilterRef.get(); + int maxDepth = recurse ? Integer.MAX_VALUE : 1; + BiPredicate matcher = (p, attributes) -> { + if (!attributes.isDirectory() + && (minTimestamp == null || attributes.lastModifiedTime().toMillis() >= minTimestamp) + && fileFilter.test(p, attributes)) { + // We store the attributes for each Path we are returning in order to avoid to + // retrieve them again later when creating the FileInfo + lastModifiedMap.put(p, attributes); + return true; + } + return false; + }; + Stream inputStream = Files.find(path, maxDepth, matcher, FileVisitOption.FOLLOW_LINKS); + Stream listing = inputStream.map(p -> { + File file = p.toFile(); + BasicFileAttributes attributes = lastModifiedMap.get(p); + return new FileInfo.Builder() + .directory(false) + .filename(file.getName()) + .fullPathFileName(file.getAbsolutePath()) + .lastModifiedTime(attributes.lastModifiedTime().toMillis()) + .size(attributes.size()) + .build(); + }); + return listing.collect(Collectors.toList()); } @Override @@ -346,36 +376,7 @@ public class ListFile extends AbstractListProcessor { || IGNORE_HIDDEN_FILES.equals(property); } - private List scanDirectory(final File path, final FileInfoFilter filter, final Boolean recurse, - final Long minTimestamp) throws IOException { - final List listing = new ArrayList<>(); - File[] files = path.listFiles(); - if (files != null) { - for (File file : files) { - if (file.isDirectory()) { - if (recurse) { - listing.addAll(scanDirectory(file, filter, true, minTimestamp)); - } - } else { - FileInfo fileInfo = new FileInfo.Builder() - .directory(false) - .filename(file.getName()) - .fullPathFileName(file.getAbsolutePath()) - .size(file.length()) - .lastModifiedTime(file.lastModified()) - .build(); - if ((minTimestamp == null || fileInfo.getLastModifiedTime() >= minTimestamp) - && filter.accept(file, fileInfo)) { - listing.add(fileInfo); - } - } - } - } - - return listing; - } - - private FileInfoFilter createFileFilter(final ProcessContext context) { + private BiPredicate createFileFilter(final ProcessContext context) { final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue(); final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B); final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); @@ -386,41 +387,36 @@ public class ListFile extends AbstractListProcessor { final boolean recurseDirs = context.getProperty(RECURSE).asBoolean(); final String pathPatternStr = context.getProperty(PATH_FILTER).getValue(); final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr); - - return new FileInfoFilter() { - @Override - public boolean accept(final File file, final FileInfo info) { - if (minSize > info.getSize()) { - return false; - } - if (maxSize != null && maxSize < info.getSize()) { - return false; - } - final long fileAge = System.currentTimeMillis() - info.getLastModifiedTime(); - if (minAge > fileAge) { - return false; - } - if (maxAge != null && maxAge < fileAge) { - return false; - } - if (ignoreHidden && file.isHidden()) { - return false; - } - if (pathPattern != null) { - Path reldir = Paths.get(indir).relativize(file.toPath()).getParent(); - if (reldir != null && !reldir.toString().isEmpty()) { - if (!pathPattern.matcher(reldir.toString()).matches()) { - return false; - } + return (path, attributes) -> { + if (minSize > attributes.size()) { + return false; + } + if (maxSize != null && maxSize < attributes.size()) { + return false; + } + final long fileAge = System.currentTimeMillis() - attributes.lastModifiedTime().toMillis(); + if (minAge > fileAge) { + return false; + } + if (maxAge != null && maxAge < fileAge) { + return false; + } + if (ignoreHidden && path.toFile().isHidden()) { + return false; + } + if (pathPattern != null) { + Path reldir = Paths.get(indir).relativize(path).getParent(); + if (reldir != null && !reldir.toString().isEmpty()) { + if (!pathPattern.matcher(reldir.toString()).matches()) { + return false; } } - //Verify that we have at least read permissions on the file we're considering grabbing - if (!Files.isReadable(file.toPath())) { - return false; - } - return filePattern.matcher(file.getName()).matches(); } + // Verify that we have at least read permissions on the file we're considering grabbing + if (!Files.isReadable(path)) { + return false; + } + return filePattern.matcher(path.getFileName().toString()).matches(); }; } - } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfoFilter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfoFilter.java deleted file mode 100644 index 7ee1430ec3..0000000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfoFilter.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util; - -import java.io.File; - -/** - * A filter for files. - */ -public interface FileInfoFilter { - - /** - * Tests whether or not the specified file with its - * {@link org.apache.nifi.processors.standard.util.FileInfo} satisfies the - * requirements of this filter. - * - * @param file The file to be tested - * @param info Attributes and info about the file. - * @return true if and only if the file should be included - */ - boolean accept(File file, FileInfo info); -}