From 6ce2e3799c7525ccbd1112cfcfc66912640da5f3 Mon Sep 17 00:00:00 2001 From: Marco Carlino Date: Thu, 11 Aug 2022 17:31:21 +0200 Subject: [PATCH] NIFI-10349 add maximum object age property to list s3 This closes #6293. Signed-off-by: Peter Turcsanyi --- .../apache/nifi/processors/aws/s3/ListS3.java | 33 ++++++++++++- .../nifi/processors/aws/s3/TestListS3.java | 48 +++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index 282cd0c354..d492708606 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -81,6 +81,7 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.FormatUtils; import java.io.IOException; import java.io.OutputStream; @@ -220,6 +221,15 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { .defaultValue("0 sec") .build(); + public static final PropertyDescriptor MAX_AGE = new Builder() + .name("max-age") + .displayName("Maximum Object Age") + .description("The maximum age that an S3 object can be in order to be considered; any object older than this amount of time (according to last modification date) will be ignored") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .addValidator(createMaxAgeValidator()) + .build(); + public static final PropertyDescriptor WRITE_OBJECT_TAGS = new Builder() .name("write-s3-object-tags") .displayName("Write Object Tags") @@ -284,6 +294,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { SECRET_KEY, RECORD_WRITER, MIN_AGE, + MAX_AGE, BATCH_SIZE, WRITE_OBJECT_TAGS, WRITE_USER_METADATA, @@ -362,6 +373,22 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { } }; } + private static Validator createMaxAgeValidator() { + return new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + Double maxAge = input != null ? FormatUtils.getPreciseTimeDuration(input, TimeUnit.MILLISECONDS) : null; + long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + boolean valid = input != null && maxAge > minAge; + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(valid) + .explanation(valid ? null : "'Maximum Age' must be greater than 'Minimum Age' ") + .build(); + } + }; + } @Override protected List getSupportedPropertyDescriptors() { @@ -449,6 +476,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { final long startNanos = System.nanoTime(); final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null; final long listingTimestamp = System.currentTimeMillis(); final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(); @@ -481,6 +509,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { long lastModified = versionSummary.getLastModified().getTime(); if (lastModified < currentTimestamp || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey()) + || (maxAgeMilliseconds != null && (lastModified < (listingTimestamp - maxAgeMilliseconds))) || lastModified > (listingTimestamp - minAgeMilliseconds)) { continue; } @@ -1103,6 +1132,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { final List results = new ArrayList<>(super.verify(context, logger, attributes)); final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue(); final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null; if (bucketName == null || bucketName.trim().isEmpty()) { results.add(new ConfigVerificationResult.Builder() @@ -1126,7 +1156,8 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { versionListing = bucketLister.listVersions(); for (final S3VersionSummary versionSummary : versionListing.getVersionSummaries()) { long lastModified = versionSummary.getLastModified().getTime(); - if (lastModified > (listingTimestamp - minAgeMilliseconds)) { + if ((maxAgeMilliseconds != null && (lastModified < (listingTimestamp - maxAgeMilliseconds))) + || lastModified > (listingTimestamp - minAgeMilliseconds)) { continue; } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java index 410cf9370f..01e259513c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java @@ -463,6 +463,54 @@ public class TestListS3 { runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER); } + @Test + public void testListIgnoreByMaxAge() throws IOException { + runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(ListS3.BUCKET, "test-bucket"); + runner.setProperty(ListS3.MAX_AGE, "30 sec"); + Date lastModifiedNow = new Date(); + Date lastModifiedMinus1Hour = DateUtils.addHours(lastModifiedNow, -1); + Date lastModifiedMinus3Hour = DateUtils.addHours(lastModifiedNow, -3); + ObjectListing objectListing = new ObjectListing(); + S3ObjectSummary objectSummary1 = new S3ObjectSummary(); + objectSummary1.setBucketName("test-bucket"); + objectSummary1.setKey("minus-3hour"); + objectSummary1.setLastModified(lastModifiedMinus3Hour); + objectListing.getObjectSummaries().add(objectSummary1); + S3ObjectSummary objectSummary2 = new S3ObjectSummary(); + objectSummary2.setBucketName("test-bucket"); + objectSummary2.setKey("minus-1hour"); + objectSummary2.setLastModified(lastModifiedMinus1Hour); + objectListing.getObjectSummaries().add(objectSummary2); + S3ObjectSummary objectSummary3 = new S3ObjectSummary(); + objectSummary3.setBucketName("test-bucket"); + objectSummary3.setKey("now"); + objectSummary3.setLastModified(lastModifiedNow); + objectListing.getObjectSummaries().add(objectSummary3); + Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing); + + Map stateMap = new HashMap<>(); + String previousTimestamp = String.valueOf(lastModifiedMinus3Hour.getTime()); + stateMap.put(ListS3.CURRENT_TIMESTAMP, previousTimestamp); + stateMap.put(ListS3.CURRENT_KEY_PREFIX + "0", "minus-3hour"); + runner.getStateManager().setState(stateMap, Scope.CLUSTER); + runner.run(); + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture()); + ListObjectsRequest request = captureRequest.getValue(); + assertEquals("test-bucket", request.getBucketName()); + Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any()); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals("filename", "now"); + ff0.assertAttributeEquals("s3.bucket", "test-bucket"); + String lastModifiedTimestamp = String.valueOf(lastModifiedNow.getTime()); + ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp); + runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER); + } + @Test public void testWriteObjectTags() { runner.setProperty(ListS3.REGION, "eu-west-1");