NIFI-7292 Preventing file listing from fail because of insufficient privileges

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4195.
This commit is contained in:
Bence Simon 2020-04-08 18:32:19 +02:00 committed by Pierre Villard
parent c263daf20b
commit 83400789f6
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
2 changed files with 145 additions and 85 deletions

View File

@ -48,8 +48,11 @@ import org.apache.nifi.util.Tuple;
import java.io.File;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.nio.file.FileStore;
import java.nio.file.FileVisitOption;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -68,6 +71,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -80,8 +84,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
import static org.apache.nifi.processor.util.StandardValidators.POSITIVE_INTEGER_VALIDATOR;
@ -547,31 +549,65 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
}
};
final Stream<Path> inputStream = getPathStream(basePath, maxDepth, matcher);
final Stream<FileInfo> listing = inputStream.map(p -> {
File file = p.toFile();
BasicFileAttributes attributes = lastModifiedMap.get(p);
final FileInfo fileInfo = new FileInfo.Builder()
.directory(false)
.filename(file.getName())
.fullPathFileName(file.getAbsolutePath())
.lastModifiedTime(attributes.lastModifiedTime().toMillis())
.size(attributes.size())
.build();
return fileInfo;
});
// Perform the actual listing
try {
final long start = System.currentTimeMillis();
final List<FileInfo> fileInfos = listing.collect(Collectors.toList());
final List<FileInfo> result = new LinkedList<>();
Files.walkFileTree(basePath, Collections.singleton(FileVisitOption.FOLLOW_LINKS), maxDepth, new FileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(final Path dir, final BasicFileAttributes attributes) throws IOException {
if (Files.isReadable(dir)) {
return FileVisitResult.CONTINUE;
} else {
getLogger().debug("The following directory is not readable: {}", new Object[] {dir.toString()});
return FileVisitResult.SKIP_SUBTREE;
}
}
@Override
public FileVisitResult visitFile(final Path path, final BasicFileAttributes attributes) throws IOException {
if (matcher.test(path, attributes)) {
final File file = path.toFile();
final BasicFileAttributes fileAttributes = lastModifiedMap.get(path);
final FileInfo fileInfo = new FileInfo.Builder()
.directory(false)
.filename(file.getName())
.fullPathFileName(file.getAbsolutePath())
.lastModifiedTime(fileAttributes.lastModifiedTime().toMillis())
.size(fileAttributes.size())
.build();
result.add(fileInfo);
}
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(final Path path, final IOException e) throws IOException {
if (e instanceof AccessDeniedException) {
getLogger().debug("The following file is not readable: {}", new Object[] {path.toString()});
return FileVisitResult.SKIP_SUBTREE;
} else {
getLogger().error("Error during visiting file {}: {}", new Object[] {path.toString(), e.getMessage()}, e);
return FileVisitResult.TERMINATE;
}
}
@Override
public FileVisitResult postVisitDirectory(final Path dir, final IOException e) throws IOException {
if (e != null) {
getLogger().error("Error during visiting directory {}: {}", new Object[] {dir.toString(), e.getMessage()}, e);
}
return FileVisitResult.CONTINUE;
}
});
final long millis = System.currentTimeMillis() - start;
getLogger().debug("Took {} milliseconds to perform listing and gather {} entries", new Object[] {millis, fileInfos.size()});
return fileInfos;
getLogger().debug("Took {} milliseconds to perform listing and gather {} entries", new Object[] {millis, result.size()});
return result;
} catch (final ProcessorStoppedException pse) {
getLogger().info("Processor was stopped so will not complete listing of Files");
return Collections.emptyList();
@ -580,10 +616,6 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
}
}
protected Stream<Path> getPathStream(final Path basePath, final int maxDepth, final BiPredicate<Path, BasicFileAttributes> matcher) throws IOException {
return Files.find(basePath, maxDepth, matcher, FileVisitOption.FOLLOW_LINKS);
}
@Override
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
return DIRECTORY.equals(property)

View File

@ -43,8 +43,6 @@ import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@ -56,10 +54,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -187,60 +183,7 @@ public class TestListFile {
}
}
final BasicFileAttributes basicFileAttributes = new BasicFileAttributes() {
@Override
public FileTime lastModifiedTime() {
return FileTime.fromMillis(System.currentTimeMillis());
}
@Override
public FileTime lastAccessTime() {
return FileTime.fromMillis(System.currentTimeMillis());
}
@Override
public FileTime creationTime() {
return FileTime.fromMillis(System.currentTimeMillis());
}
@Override
public boolean isRegularFile() {
return false;
}
@Override
public boolean isDirectory() {
return false;
}
@Override
public boolean isSymbolicLink() {
return false;
}
@Override
public boolean isOther() {
return false;
}
@Override
public long size() {
return 0;
}
@Override
public Object fileKey() {
return null;
}
};
processor = new ListFile() {
@Override
protected Stream<Path> getPathStream(final Path basePath, final int maxDepth, final BiPredicate<Path, BasicFileAttributes> matcher) throws IOException {
return paths.stream()
.filter(path -> matcher.test(path, basicFileAttributes));
}
};
processor = new ListFile();
runner = TestRunners.newTestRunner(processor);
runner.setProperty(AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, AbstractListProcessor.PRECISION_SECONDS.getValue());
@ -502,6 +445,91 @@ public class TestListFile {
assertEquals(1, successFiles2.size());
}
@Test
public void testListWithUnreadableFiles() throws Exception {
final File file1 = new File(TESTDIR + "/unreadable.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setReadable(false));
final File file2 = new File(TESTDIR + "/readable.txt");
assertTrue(file2.createNewFile());
final long now = getTestModifiedTime();
assertTrue(file1.setLastModified(now));
assertTrue(file2.setLastModified(now));
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ".*");
runNext();
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles.size());
}
@Test
public void testListWithinUnreadableDirectory() throws Exception {
final File subdir = new File(TESTDIR + "/subdir");
assertTrue(subdir.mkdir());
assertTrue(subdir.setReadable(false));
final File file1 = new File(TESTDIR + "/subdir/unreadable.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setReadable(false));
final File file2 = new File(TESTDIR + "/subdir/readable.txt");
assertTrue(file2.createNewFile());
final File file3 = new File(TESTDIR + "/secondReadable.txt");
assertTrue(file3.createNewFile());
final long now = getTestModifiedTime();
assertTrue(file1.setLastModified(now));
assertTrue(file2.setLastModified(now));
assertTrue(file3.setLastModified(now));
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ".*");
runNext();
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles.size());
assertEquals("secondReadable.txt", successFiles.get(0).getAttribute("filename"));
subdir.setReadable(true);
}
@Test
public void testListingNeedsSufficientPrivilegesAndFittingFilter() throws Exception {
final File file = new File(TESTDIR + "/file.txt");
assertTrue(file.createNewFile());
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
// Run with privileges but without fitting filter
runner.setProperty(ListFile.FILE_FILTER, "willBeFilteredOut");
assertTrue(file.setLastModified(getTestModifiedTime()));
runNext();
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(0, successFiles1.size());
// Run with privileges and with fitting filter
runner.setProperty(ListFile.FILE_FILTER, "file.*");
assertTrue(file.setLastModified(getTestModifiedTime()));
runNext();
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles2.size());
// Run without privileges and with fitting filter
assertTrue(file.setReadable(false));
assertTrue(file.setLastModified(getTestModifiedTime()));
runNext();
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(0, successFiles3.size());
}
@Test
public void testFilterFilePattern() throws Exception {