diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index fd27f2b4d0..321ea4c87a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -36,6 +36,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; @@ -53,6 +54,8 @@ import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.S3VersionSummary; import com.amazonaws.services.s3.model.VersionListing; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; @TriggerSerially @TriggerWhenEmpty @@ -108,10 +111,23 @@ public class ListS3 extends AbstractS3Processor { .description("Specifies whether to use S3 versions, if applicable. If false, only the latest version of each object will be returned.") .build(); + public static final PropertyDescriptor LIST_TYPE = new PropertyDescriptor.Builder() + .name("list-type") + .displayName("List Type") + .expressionLanguageSupported(false) + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .allowableValues( + new AllowableValue("1", "List Objects V1"), + new AllowableValue("2", "List Objects V2")) + .defaultValue("1") + .description("Specifies whether to use the original List Objects or the newer List Objects Version 2 endpoint.") + .build(); + public static final List properties = Collections.unmodifiableList( Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, - SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS)); + SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE)); public static final Set relationships = Collections.unmodifiableSet( new HashSet<>(Collections.singletonList(REL_SUCCESS))); @@ -189,10 +205,12 @@ public class ListS3 extends AbstractS3Processor { String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean(); - + int listType = context.getProperty(LIST_TYPE).asInteger(); S3BucketLister bucketLister = useVersions ? new S3VersionBucketLister(client) - : new S3ObjectBucketLister(client); + : listType == 2 + ? new S3ObjectBucketListerVersion2(client) + : new S3ObjectBucketLister(client); bucketLister.setBucketName(bucket); @@ -340,6 +358,62 @@ public class ListS3 extends AbstractS3Processor { } } + public class S3ObjectBucketListerVersion2 implements S3BucketLister { + private AmazonS3 client; + private ListObjectsV2Request listObjectsRequest; + private ListObjectsV2Result objectListing; + + public S3ObjectBucketListerVersion2(AmazonS3 client) { + this.client = client; + } + + @Override + public void setBucketName(String bucketName) { + listObjectsRequest = new ListObjectsV2Request().withBucketName(bucketName); + } + + @Override + public void setPrefix(String prefix) { + listObjectsRequest.setPrefix(prefix); + } + + @Override + public void setDelimiter(String delimiter) { + listObjectsRequest.setDelimiter(delimiter); + } + + @Override + public VersionListing listVersions() { + VersionListing versionListing = new VersionListing(); + this.objectListing = client.listObjectsV2(listObjectsRequest); + for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { + S3VersionSummary versionSummary = new S3VersionSummary(); + versionSummary.setBucketName(objectSummary.getBucketName()); + versionSummary.setETag(objectSummary.getETag()); + versionSummary.setKey(objectSummary.getKey()); + versionSummary.setLastModified(objectSummary.getLastModified()); + versionSummary.setOwner(objectSummary.getOwner()); + versionSummary.setSize(objectSummary.getSize()); + versionSummary.setStorageClass(objectSummary.getStorageClass()); + versionSummary.setIsLatest(true); + + versionListing.getVersionSummaries().add(versionSummary); + } + + return versionListing; + } + + @Override + public void setNextMarker() { + listObjectsRequest.setContinuationToken(objectListing.getNextContinuationToken()); + } + + @Override + public boolean isTruncated() { + return (objectListing == null) ? false : objectListing.isTruncated(); + } + } + public class S3VersionBucketLister implements S3BucketLister { private AmazonS3 client; private ListVersionsRequest listVersionsRequest; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java index 06b3683c81..5ccdb4974a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java @@ -37,6 +37,8 @@ import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.S3VersionSummary; import com.amazonaws.services.s3.model.VersionListing; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; import org.junit.Before; import org.junit.Test; @@ -111,6 +113,51 @@ public class TestListS3 { runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER); } + @Test + public void testListVersion2() { + runner.setProperty(ListS3.REGION, "eu-west-1"); + runner.setProperty(ListS3.BUCKET, "test-bucket"); + runner.setProperty(ListS3.LIST_TYPE, "2"); + + Date lastModified = new Date(); + ListObjectsV2Result objectListing = new ListObjectsV2Result(); + S3ObjectSummary objectSummary1 = new S3ObjectSummary(); + objectSummary1.setBucketName("test-bucket"); + objectSummary1.setKey("a"); + objectSummary1.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary1); + S3ObjectSummary objectSummary2 = new S3ObjectSummary(); + objectSummary2.setBucketName("test-bucket"); + objectSummary2.setKey("b/c"); + objectSummary2.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary2); + S3ObjectSummary objectSummary3 = new S3ObjectSummary(); + objectSummary3.setBucketName("test-bucket"); + objectSummary3.setKey("d/e"); + objectSummary3.setLastModified(lastModified); + objectListing.getObjectSummaries().add(objectSummary3); + Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing); + + runner.run(); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(ListObjectsV2Request.class); + Mockito.verify(mockS3Client, Mockito.times(1)).listObjectsV2(captureRequest.capture()); + ListObjectsV2Request request = captureRequest.getValue(); + assertEquals("test-bucket", request.getBucketName()); + Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any()); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals("filename", "a"); + ff0.assertAttributeEquals("s3.bucket", "test-bucket"); + String lastModifiedTimestamp = String.valueOf(lastModified.getTime()); + ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp); + flowFiles.get(1).assertAttributeEquals("filename", "b/c"); + flowFiles.get(2).assertAttributeEquals("filename", "d/e"); + runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER); + } + @Test public void testListVersions() { runner.setProperty(ListS3.REGION, "eu-west-1"); @@ -194,7 +241,7 @@ public class TestListS3 { public void testGetPropertyDescriptors() throws Exception { ListS3 processor = new ListS3(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 15, pd.size()); + assertEquals("size should be eq", 16, pd.size()); assertTrue(pd.contains(ListS3.ACCESS_KEY)); assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(ListS3.BUCKET));