NIFI-12772 Expose REMOTE_POLL_BATCH_SIZE property for ListSFTP

This closes #8390.

revert max results optimisation + unnecessary import

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
Tom Brisland 2024-02-09 23:13:35 +00:00 committed by Joseph Witt
parent d35247b7b1
commit 89b618cc05
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
3 changed files with 29 additions and 20 deletions

View File

@ -31,7 +31,6 @@ import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -124,13 +123,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
return listing;
}
final Iterator<FileInfo> itr = listing.iterator();
while (itr.hasNext()) {
final FileInfo next = itr.next();
if (next.getLastModifiedTime() < minTimestamp) {
itr.remove();
}
}
listing.removeIf(file -> file.getLastModifiedTime() < minTimestamp);
return listing;
}

View File

@ -100,6 +100,7 @@ public class ListSFTP extends ListFileTransfer {
properties.add(SFTPTransfer.FILE_FILTER_REGEX);
properties.add(SFTPTransfer.PATH_FILTER_REGEX);
properties.add(SFTPTransfer.IGNORE_DOTTED_FILES);
properties.add(SFTPTransfer.REMOTE_POLL_BATCH_SIZE);
properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
properties.add(SFTPTransfer.HOST_KEY_FILE);
properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
@ -176,7 +177,7 @@ public class ListSFTP extends ListFileTransfer {
final Long maxAge = context.getProperty(ListFile.MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
return (attributes) -> {
if(attributes.isDirectory()) {
if (attributes.isDirectory()) {
return true;
}

View File

@ -16,16 +16,6 @@
*/
package org.apache.nifi.processors.standard;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@ -45,6 +35,16 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestListSFTP {
@ -94,7 +94,7 @@ public class TestListSFTP {
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE);
runner.assertAllFlowFilesContainAttribute( "filename");
runner.assertAllFlowFilesContainAttribute("filename");
final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
retrievedFile.assertAttributeEquals("sftp.listing.user", sshServer.getUsername());
@ -178,6 +178,21 @@ public class TestListSFTP {
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 0);
}
@Test
public void testRemotePollBatchSizeEnforced() {
runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING);
runner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "1");
runner.run();
// Of 3 items only 1 returned due to batch size
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
runner.setProperty(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "2");
runner.run();
runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
}
@Test
public void testVerificationSuccessful() {
final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())