From ca530f40d8ef25a7c6834c96de982accb75368ee Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Wed, 27 Oct 2021 13:47:58 -0400 Subject: [PATCH] NIFI-9317: Updating config verification for ListS3 (#5485) --- .../processors/aws/AbstractAWSProcessor.java | 22 +++-- .../apache/nifi/processors/aws/s3/ListS3.java | 92 +++++++++++++------ .../nifi/processors/aws/s3/TestListS3.java | 15 +++ 3 files changed, 92 insertions(+), 37 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java index 36e6a59726..6609bebd7d 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -303,17 +303,24 @@ public abstract class AbstractAWSProcessor currentKeys = currentListing.getKeys(); - - final AmazonS3 client = getClient(); int listCount = 0; int totalListCount = 0; long latestListedTimestampInThisCycle = currentTimestamp; - String delimiter = context.getProperty(DELIMITER).getValue(); - String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); - - boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean(); - int listType = context.getProperty(LIST_TYPE).asInteger(); - S3BucketLister bucketLister = useVersions - ? new S3VersionBucketLister(client) - : listType == 2 - ? new S3ObjectBucketListerVersion2(client) - : new S3ObjectBucketLister(client); - - bucketLister.setBucketName(bucket); - bucketLister.setRequesterPays(requesterPays); - - if (delimiter != null && !delimiter.isEmpty()) { - bucketLister.setDelimiter(delimiter); - } - if (prefix != null && !prefix.isEmpty()) { - bucketLister.setPrefix(prefix); - } VersionListing versionListing; final Set listedKeys = new HashSet<>(); @@ -486,6 +469,33 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { } } + private S3BucketLister getS3BucketLister(final ProcessContext context, final AmazonS3 client) { + final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean(); + final boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean(); + + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(); + final String delimiter = context.getProperty(DELIMITER).getValue(); + final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); + + final int listType = context.getProperty(LIST_TYPE).asInteger(); + + final S3BucketLister bucketLister = useVersions + ? new S3VersionBucketLister(client) + : listType == 2 + ? new S3ObjectBucketListerVersion2(client) + : new S3ObjectBucketLister(client); + + bucketLister.setBucketName(bucket); + bucketLister.setRequesterPays(requesterPays); + + if (delimiter != null && !delimiter.isEmpty()) { + bucketLister.setDelimiter(delimiter); + } + if (prefix != null && !prefix.isEmpty()) { + bucketLister.setPrefix(prefix); + } + return bucketLister; + } private interface S3BucketLister { void setBucketName(String bucketName); @@ -891,11 +901,15 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { @Override public List verify(final ProcessContext context, final ComponentLog logger, final Map attributes) { - final AmazonS3Client client = createClient(context, getCredentials(context), createConfiguration(context)); - initializeRegionAndEndpoint(context, client); + final ControllerService service = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(); + final AmazonS3Client client = service != null ? createClient(context, getCredentialsProvider(context), createConfiguration(context)) + : createClient(context, getCredentials(context), createConfiguration(context)); + + getRegionAndInitializeEndpoint(context, client); final List results = new ArrayList<>(); final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(); + final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); if (bucketName == null || bucketName.trim().isEmpty()) { results.add(new ConfigVerificationResult.Builder() @@ -907,17 +921,35 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { return results; } - final String prefix = context.getProperty(PREFIX).getValue(); + final S3BucketLister bucketLister = getS3BucketLister(context, client); + final long listingTimestamp = System.currentTimeMillis(); // Attempt to perform a listing of objects in the S3 bucket try { - final ObjectListing listing = client.listObjects(bucketName, prefix); - final int count = listing.getObjectSummaries().size(); + int listCount = 0; + int totalListCount = 0; + VersionListing versionListing; + do { + versionListing = bucketLister.listVersions(); + for (final S3VersionSummary versionSummary : versionListing.getVersionSummaries()) { + long lastModified = versionSummary.getLastModified().getTime(); + if (lastModified > (listingTimestamp - minAgeMilliseconds)) { + continue; + } + + listCount++; + } + bucketLister.setNextMarker(); + + totalListCount += listCount; + + listCount = 0; + } while (bucketLister.isTruncated()); results.add(new ConfigVerificationResult.Builder() .verificationStepName("Perform Listing") .outcome(Outcome.SUCCESSFUL) - .explanation("Successfully listed contents of bucket '" + bucketName + "', finding " + count + " objects" + (prefix == null ? "" : " with a prefix of '" + prefix + "'")) + .explanation("Successfully listed contents of bucket '" + bucketName + "', finding " + totalListCount + " objects matching the filter") .build()); logger.info("Successfully verified configuration"); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java index a05c9e3aae..2942f2376a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.processors.aws.s3; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.GetObjectTaggingRequest; @@ -28,7 +30,10 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.S3VersionSummary; import com.amazonaws.services.s3.model.VersionListing; import org.apache.commons.lang3.time.DateUtils; +import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.VerifiableProcessor; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.state.MockStateManager; @@ -44,6 +49,7 @@ import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Calendar; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -66,6 +72,11 @@ public class TestListS3 { protected AmazonS3Client getClient() { return mockS3Client; } + + @Override + protected AmazonS3Client createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) { + return mockS3Client; + } }; runner = TestRunners.newTestRunner(mockListS3); } @@ -114,6 +125,10 @@ public class TestListS3 { flowFiles.get(1).assertAttributeEquals("filename", "b/c"); flowFiles.get(2).assertAttributeEquals("filename", "d/e"); runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER); + + final List results = ((VerifiableProcessor) runner.getProcessor()) + .verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); + assertTrue(results.get(0).getExplanation().contains("finding 3 objects")); } @Test