diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index 321ea4c87a..f2a094d454 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -124,10 +124,19 @@ public class ListS3 extends AbstractS3Processor { .description("Specifies whether to use the original List Objects or the newer List Objects Version 2 endpoint.") .build(); + public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder() + .name("min-age") + .displayName("Minimum Object Age") + .description("The minimum age that an S3 object must be in order to be considered; any object younger than this amount of time (according to last modification date) will be ignored") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 sec") + .build(); + public static final List properties = Collections.unmodifiableList( Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, - SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE)); + SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE)); public static final Set relationships = Collections.unmodifiableSet( new HashSet<>(Collections.singletonList(REL_SUCCESS))); @@ -197,6 +206,8 @@ public class ListS3 extends AbstractS3Processor { final long startNanos = System.nanoTime(); final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(); + final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final long listingTimestamp = System.currentTimeMillis(); final AmazonS3 client = getClient(); int listCount = 0; @@ -227,7 +238,8 @@ public class ListS3 extends AbstractS3Processor { for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) { long lastModified = versionSummary.getLastModified().getTime(); if (lastModified < currentTimestamp - || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())) { + || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey()) + || lastModified > (listingTimestamp - minAgeMilliseconds)) { continue; } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java index 5ccdb4974a..f5ce29122f 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.time.DateUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; import org.apache.nifi.state.MockStateManager; @@ -237,11 +238,63 @@ public class TestListS3 { runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 0); } + + @Test + public void testListIgnoreByMinAge() throws IOException { + runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(ListS3.BUCKET, "test-bucket"); + runner.setProperty(ListS3.MIN_AGE, "30 sec"); + + Date lastModifiedNow = new Date(); + Date lastModifiedMinus1Hour = DateUtils.addHours(lastModifiedNow, -1); + Date lastModifiedMinus3Hour = DateUtils.addHours(lastModifiedNow, -3); + ObjectListing objectListing = new ObjectListing(); + S3ObjectSummary objectSummary1 = new S3ObjectSummary(); + objectSummary1.setBucketName("test-bucket"); + objectSummary1.setKey("minus-3hour"); + objectSummary1.setLastModified(lastModifiedMinus3Hour); + objectListing.getObjectSummaries().add(objectSummary1); + S3ObjectSummary objectSummary2 = new S3ObjectSummary(); + objectSummary2.setBucketName("test-bucket"); + objectSummary2.setKey("minus-1hour"); + objectSummary2.setLastModified(lastModifiedMinus1Hour); + objectListing.getObjectSummaries().add(objectSummary2); + S3ObjectSummary objectSummary3 = new S3ObjectSummary(); + objectSummary3.setBucketName("test-bucket"); + objectSummary3.setKey("now"); + objectSummary3.setLastModified(lastModifiedNow); + objectListing.getObjectSummaries().add(objectSummary3); + Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing); + + Map stateMap = new HashMap<>(); + String previousTimestamp = String.valueOf(lastModifiedMinus3Hour.getTime()); + stateMap.put(ListS3.CURRENT_TIMESTAMP, previousTimestamp); + stateMap.put(ListS3.CURRENT_KEY_PREFIX + "0", "minus-3hour"); + runner.getStateManager().setState(stateMap, Scope.CLUSTER); + + runner.run(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture()); + ListObjectsRequest request = captureRequest.getValue(); + assertEquals("test-bucket", request.getBucketName()); + Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any()); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals("filename", "minus-1hour"); + ff0.assertAttributeEquals("s3.bucket", "test-bucket"); + String lastModifiedTimestamp = String.valueOf(lastModifiedMinus1Hour.getTime()); + ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp); + runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER); + } + @Test public void testGetPropertyDescriptors() throws Exception { ListS3 processor = new ListS3(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 16, pd.size()); + assertEquals("size should be eq", 17, pd.size()); assertTrue(pd.contains(ListS3.ACCESS_KEY)); assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(ListS3.BUCKET)); @@ -257,5 +310,6 @@ public class TestListS3 { assertTrue(pd.contains(ListS3.DELIMITER)); assertTrue(pd.contains(ListS3.PREFIX)); assertTrue(pd.contains(ListS3.USE_VERSIONS)); + assertTrue(pd.contains(ListS3.MIN_AGE)); } }