From 2a6c3c16862d14e5b3a34043e880a60e2c251c43 Mon Sep 17 00:00:00 2001 From: Arpad Boda Date: Thu, 21 Feb 2019 14:37:33 +0100 Subject: [PATCH] NIFI-5977 - Add "Minimum/Maximum File Age" Parameter to ListSFTP Signed-off-by: Pierre Villard This closes #3324. --- .../nifi-standard-processors/pom.xml | 6 + .../nifi/processors/standard/ListSFTP.java | 57 ++++++++ .../processors/standard/TestListSFTP.java | 126 ++++++++++++++++++ 3 files changed, 189 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index aed143ed35..7c1a13ccc7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -348,6 +348,12 @@ 1.10.0-SNAPSHOT test + + com.github.stefanbirkner + fake-sftp-server-rule + 2.0.1 + test + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java index 89012b0da1..ccdf1d8bd4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java @@ -17,9 +17,14 @@ package org.apache.nifi.processors.standard; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; @@ -31,13 +36,16 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.processors.standard.util.FileInfo; import org.apache.nifi.processors.standard.util.FTPTransfer; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; @@ -68,6 +76,8 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer; + "a new Primary Node is selected, the new node will not duplicate the data that was listed by the previous Primary Node.") public class ListSFTP extends ListFileTransfer { + private final AtomicReference> fileFilterRef = new AtomicReference(); + @Override protected List getSupportedPropertyDescriptors() { final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build(); @@ -102,6 +112,10 @@ public class ListSFTP extends ListFileTransfer { properties.add(ListedEntityTracker.TRACKING_STATE_CACHE); properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW); properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET); + properties.add(ListFile.MIN_AGE); + properties.add(ListFile.MAX_AGE); + properties.add(ListFile.MIN_SIZE); + properties.add(ListFile.MAX_SIZE); return properties; } @@ -126,4 +140,47 @@ public class ListSFTP extends ListFileTransfer { protected void customValidate(ValidationContext validationContext, Collection results) { SFTPTransfer.validateProxySpec(validationContext, results); } + + @Override + protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException { + final List listing = super.performListing(context, minTimestamp); + + return listing.stream() + .filter(fileFilterRef.get()) + .collect(Collectors.toList()); + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + fileFilterRef.set(createFileFilter(context)); + } + + private Predicate createFileFilter(final ProcessContext context) { + final long minSize = context.getProperty(ListFile.MIN_SIZE).asDataSize(DataUnit.B).longValue(); + final Double maxSize = context.getProperty(ListFile.MAX_SIZE).asDataSize(DataUnit.B); + final long minAge = context.getProperty(ListFile.MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final Long maxAge = context.getProperty(ListFile.MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + + return (attributes) -> { + if(attributes.isDirectory()) { + return true; + } + + if (minSize > attributes.getSize()) { + return false; + } + if (maxSize != null && maxSize < attributes.getSize()) { + return false; + } + final long fileAge = System.currentTimeMillis() - attributes.getLastModifiedTime(); + if (minAge > fileAge) { + return false; + } + if (maxAge != null && maxAge < fileAge) { + return false; + } + + return true; + }; + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java new file mode 100644 index 0000000000..68a217d08e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processors.standard.util.FTPTransfer; +import org.apache.nifi.processors.standard.util.SFTPTransfer; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.Rule; +import java.security.SecureRandom; + +public class TestListSFTP { + @Rule + public final FakeSftpServerRule sftpServer = new FakeSftpServerRule(); + int port; + + final String username = "nifi-sftp-user"; + final String password = "Test test test chocolate"; + + @Before + public void setUp() throws Exception { + sftpServer.addUser(username, password); + port = sftpServer.getPort(); + + + sftpServer.putFile("/directory/smallfile.txt", "byte", StandardCharsets.UTF_8); + + sftpServer.putFile("/directory/file.txt", "a bit more content in this file", StandardCharsets.UTF_8); + + byte[] bytes = new byte[120]; + SecureRandom.getInstanceStrong().nextBytes(bytes); + + sftpServer.putFile("/directory/file.bin", bytes); + } + + @After + public void tearDown() throws Exception { + sftpServer.deleteAllFilesAndDirectories(); + } + + @Test + public void basicFileList() throws InterruptedException { + TestRunner runner = TestRunners.newTestRunner(ListSFTP.class); + runner.setProperty(ListSFTP.HOSTNAME, "localhost"); + runner.setProperty(ListSFTP.USERNAME, username); + runner.setProperty(SFTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, Integer.toString(port)); + runner.setProperty(ListSFTP.REMOTE_PATH, "/directory/"); + + runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS); + runner.assertValid(); + + // Ensure wait for enough lag time. + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2); + + runner.run(); + + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3); + + runner.assertAllFlowFilesContainAttribute("sftp.remote.host"); + runner.assertAllFlowFilesContainAttribute("sftp.remote.port"); + runner.assertAllFlowFilesContainAttribute("sftp.listing.user"); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_OWNER_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_GROUP_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute( "filename"); + + final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0); + retrievedFile.assertAttributeEquals("sftp.listing.user", username); + } + + + @Test + public void sizeFilteredFileList() throws InterruptedException { + TestRunner runner = TestRunners.newTestRunner(ListSFTP.class); + runner.setProperty(ListSFTP.HOSTNAME, "localhost"); + runner.setProperty(ListSFTP.USERNAME, username); + runner.setProperty(SFTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, Integer.toString(port)); + runner.setProperty(ListSFTP.REMOTE_PATH, "/directory/"); + runner.setProperty(ListFile.MIN_SIZE, "8B"); + runner.setProperty(ListFile.MAX_SIZE, "100B"); + + + runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS); + runner.assertValid(); + + // Ensure wait for enough lag time. + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2); + + runner.run(); + + runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1); + + final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0); + //the only file between the limits + retrievedFile.assertAttributeEquals("filename", "file.txt"); + } +}