NIFI-13750 Abort SFTP listing once maxResults is reached (#9267)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Lucas 2024-09-19 23:16:10 +02:00 committed by GitHub
parent e125d8cdbb
commit c71c13ce31
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 25 additions and 23 deletions

View File

@ -16,13 +16,13 @@
*/
package org.apache.nifi.processors.standard.util;
import com.hierynomus.sshj.sftp.RemoteResourceSelector;
import net.schmizz.sshj.DefaultConfig;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.common.Factory;
import net.schmizz.sshj.sftp.FileAttributes;
import net.schmizz.sshj.sftp.FileMode;
import net.schmizz.sshj.sftp.RemoteFile;
import net.schmizz.sshj.sftp.RemoteResourceFilter;
import net.schmizz.sshj.sftp.RemoteResourceInfo;
import net.schmizz.sshj.sftp.Response;
import net.schmizz.sshj.sftp.SFTPClient;
@ -287,6 +287,7 @@ public class SFTPTransfer implements FileTransfer {
final List<FileInfo> listing = new ArrayList<>(1000);
getListing(path, depth, maxResults, listing, applyFilters);
return listing;
}
@ -328,45 +329,47 @@ public class SFTPTransfer implements FileTransfer {
final boolean pathMatched = pathFilterMatches;
final boolean filteringDisabled = !applyFilters;
final List<RemoteResourceInfo> subDirs = new ArrayList<>();
final List<RemoteResourceInfo> subDirectoryPaths = new ArrayList<>();
try {
final RemoteResourceFilter filter = (entry) -> {
final RemoteResourceSelector selector = (entry) -> {
final String entryFilename = entry.getName();
// skip over 'this directory' and 'parent directory' special files regardless of ignoring dot files
if (RELATIVE_CURRENT_DIRECTORY.equals(entryFilename) || RELATIVE_PARENT_DIRECTORY.equals(entryFilename)) {
return false;
return RemoteResourceSelector.Result.CONTINUE;
}
// skip files and directories that begin with a dot if we're ignoring them
if (ignoreDottedFiles && entryFilename.startsWith(DOT_PREFIX)) {
return false;
return RemoteResourceSelector.Result.CONTINUE;
}
// remember directory for later recursive listing
if (isIncludedDirectory(entry, recurse, symlink)) {
subDirs.add(entry);
return false;
}
// Since SSHJ does not have the concept of BREAK that JSCH had, we need to move this before the call to listing.add
// below, otherwise we would keep adding to the listings since returning false here doesn't break
if (listing.size() >= maxResults) {
return false;
subDirectoryPaths.add(entry);
return RemoteResourceSelector.Result.CONTINUE;
}
// add regular files matching our filter to the result
if (isIncludedFile(entry, symlink) && (filteringDisabled || pathMatched)) {
if (filteringDisabled || fileFilterPattern == null || fileFilterPattern.matcher(entryFilename).matches()) {
listing.add(newFileInfo(path, entry.getName(), entry.getAttributes()));
// abort further processing once we've reached the configured amount of maxResults
if (listing.size() >= maxResults) {
return RemoteResourceSelector.Result.BREAK;
}
}
}
return false;
// SSHJ does not need to keep track as we collect the results ourselves, continue with next entry instead
return RemoteResourceSelector.Result.CONTINUE;
};
if (path == null || path.trim().isEmpty()) {
sftpClient.ls(RELATIVE_CURRENT_DIRECTORY, filter);
if (path == null || path.isBlank()) {
sftpClient.ls(RELATIVE_CURRENT_DIRECTORY, selector);
} else {
sftpClient.ls(path, filter);
sftpClient.ls(path, selector);
}
} catch (final SFTPException e) {
final String pathDesc = path == null ? "current directory" : path;
@ -381,7 +384,7 @@ public class SFTPTransfer implements FileTransfer {
}
}
for (final RemoteResourceInfo entry : subDirs) {
for (final RemoteResourceInfo entry : subDirectoryPaths) {
final String entryFilename = entry.getName();
final File newFullPath = new File(path, entryFilename);
final String newFullForwardPath = newFullPath.getPath().replace("\\", "/");
@ -392,7 +395,6 @@ public class SFTPTransfer implements FileTransfer {
logger.error("Unable to get listing from {}; skipping", newFullForwardPath, e);
}
}
}
/**

View File

@ -16,10 +16,10 @@
*/
package org.apache.nifi.processors.standard.util;
import com.hierynomus.sshj.sftp.RemoteResourceSelector;
import net.schmizz.sshj.sftp.FileAttributes;
import net.schmizz.sshj.sftp.FileMode;
import net.schmizz.sshj.sftp.PathComponents;
import net.schmizz.sshj.sftp.RemoteResourceFilter;
import net.schmizz.sshj.sftp.RemoteResourceInfo;
import net.schmizz.sshj.sftp.Response;
import net.schmizz.sshj.sftp.SFTPClient;
@ -263,7 +263,7 @@ public class TestSFTPTransfer {
when(processContext.getProperty(eq(FileTransfer.REMOTE_PATH))).thenReturn(new MockPropertyValue("."));
try (SFTPClient sftpClient = mock(SFTPClient.class)) {
when(sftpClient.ls(any(), ArgumentMatchers.<RemoteResourceFilter>any())).then(invocation -> {
when(sftpClient.ls(any(), ArgumentMatchers.<RemoteResourceSelector>any())).then(invocation -> {
final Map<String, String> extended = new LinkedHashMap<>();
final List<RemoteResourceInfo> list = new ArrayList<>();
list.add(new RemoteResourceInfo(
@ -272,8 +272,8 @@ public class TestSFTPTransfer {
list.add(new RemoteResourceInfo(
new PathComponents("./", "regular.txt", "/"),
new FileAttributes(FileAttributes.Flag.MODE.get(), 0, 0, 0, new FileMode(FILE_MASK_REGULAR_777), 0, 0, extended)));
final RemoteResourceFilter filter = invocation.getArgument(1, RemoteResourceFilter.class);
list.forEach(filter::accept);
final RemoteResourceSelector selector = invocation.getArgument(1, RemoteResourceSelector.class);
list.forEach(selector::select);
return list;
});