diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index bc149252bb..29f771bd8a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -16,10 +16,16 @@ */ package org.apache.nifi.processors.aws.s3; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.S3ObjectSummary; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; @@ -40,15 +46,13 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ListVersionsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.S3VersionSummary; +import com.amazonaws.services.s3.model.VersionListing; @TriggerSerially @TriggerWhenEmpty @@ -66,9 +70,11 @@ import java.util.concurrent.TimeUnit; @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), @WritesAttribute(attribute = "filename", description = "The name of the file"), @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), + @WritesAttribute(attribute = "s3.isLatest", description = "A boolean indicating if this is the latest version of the object"), @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"), @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"), - @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),}) + @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"), + @WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable")}) @SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class}) public class ListS3 extends AbstractS3Processor { @@ -91,10 +97,21 @@ public class ListS3 extends AbstractS3Processor { .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').") .build(); + public static final PropertyDescriptor USE_VERSIONS = new PropertyDescriptor.Builder() + .name("use-versions") + .displayName("Use Versions") + .expressionLanguageSupported(false) + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("false") + .description("Specifies whether to use S3 versions, if applicable. If false, only the latest version of each object will be returned.") + .build(); + public static final List properties = Collections.unmodifiableList( Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, - PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX)); + PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS)); public static final Set relationships = Collections.unmodifiableSet( new HashSet<>(Collections.singletonList(REL_SUCCESS))); @@ -171,35 +188,46 @@ public class ListS3 extends AbstractS3Processor { String delimiter = context.getProperty(DELIMITER).getValue(); String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket); + boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean(); + + S3BucketLister bucketLister = useVersions + ? new S3VersionBucketLister(client) + : new S3ObjectBucketLister(client); + + bucketLister.setBucketName(bucket); + if (delimiter != null && !delimiter.isEmpty()) { - listObjectsRequest.setDelimiter(delimiter); + bucketLister.setDelimiter(delimiter); } if (prefix != null && !prefix.isEmpty()) { - listObjectsRequest.setPrefix(prefix); + bucketLister.setPrefix(prefix); } - ObjectListing objectListing; + VersionListing versionListing; do { - objectListing = client.listObjects(listObjectsRequest); - for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { - long lastModified = objectSummary.getLastModified().getTime(); + versionListing = bucketLister.listVersions(); + for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) { + long lastModified = versionSummary.getLastModified().getTime(); if (lastModified < currentTimestamp - || lastModified == currentTimestamp && currentKeys.contains(objectSummary.getKey())) { + || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())) { continue; } // Create the attributes final Map attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), objectSummary.getKey()); - attributes.put("s3.bucket", objectSummary.getBucketName()); - if (objectSummary.getOwner() != null) { // We may not have permission to read the owner - attributes.put("s3.owner", objectSummary.getOwner().getId()); + attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey()); + attributes.put("s3.bucket", versionSummary.getBucketName()); + if (versionSummary.getOwner() != null) { // We may not have permission to read the owner + attributes.put("s3.owner", versionSummary.getOwner().getId()); } - attributes.put("s3.etag", objectSummary.getETag()); + attributes.put("s3.etag", versionSummary.getETag()); attributes.put("s3.lastModified", String.valueOf(lastModified)); - attributes.put("s3.length", String.valueOf(objectSummary.getSize())); - attributes.put("s3.storeClass", objectSummary.getStorageClass()); + attributes.put("s3.length", String.valueOf(versionSummary.getSize())); + attributes.put("s3.storeClass", versionSummary.getStorageClass()); + attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest())); + if (versionSummary.getVersionId() != null) { + attributes.put("s3.version", versionSummary.getVersionId()); + } // Create the flowfile FlowFile flowFile = session.create(); @@ -212,24 +240,145 @@ public class ListS3 extends AbstractS3Processor { currentKeys.clear(); } if (lastModified == maxTimestamp) { - currentKeys.add(objectSummary.getKey()); + currentKeys.add(versionSummary.getKey()); } listCount++; } - listObjectsRequest.setMarker(objectListing.getNextMarker()); - } while (objectListing.isTruncated()); + bucketLister.setNextMarker(); + + commit(context, session, listCount); + listCount = 0; + } while (bucketLister.isTruncated()); currentTimestamp = maxTimestamp; final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); - if (listCount > 0) { - getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount}); - session.commit(); - persistState(context); - } else { + if (!commit(context, session, listCount)) { + if (currentTimestamp > 0) { + persistState(context); + } getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } } + + private boolean commit(final ProcessContext context, final ProcessSession session, int listCount) { + boolean willCommit = listCount > 0; + if (willCommit) { + getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount}); + session.commit(); + persistState(context); + } + return willCommit; + } + + private interface S3BucketLister { + public void setBucketName(String bucketName); + public void setPrefix(String prefix); + public void setDelimiter(String delimiter); + // Versions have a superset of the fields that Objects have, so we'll use + // them as a common interface + public VersionListing listVersions(); + public void setNextMarker(); + public boolean isTruncated(); + } + + public class S3ObjectBucketLister implements S3BucketLister { + private AmazonS3 client; + private ListObjectsRequest listObjectsRequest; + private ObjectListing objectListing; + + public S3ObjectBucketLister(AmazonS3 client) { + this.client = client; + } + + @Override + public void setBucketName(String bucketName) { + listObjectsRequest = new ListObjectsRequest().withBucketName(bucketName); + } + + @Override + public void setPrefix(String prefix) { + listObjectsRequest.setPrefix(prefix); + } + + @Override + public void setDelimiter(String delimiter) { + listObjectsRequest.setDelimiter(delimiter); + } + + @Override + public VersionListing listVersions() { + VersionListing versionListing = new VersionListing(); + this.objectListing = client.listObjects(listObjectsRequest); + for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { + S3VersionSummary versionSummary = new S3VersionSummary(); + versionSummary.setBucketName(objectSummary.getBucketName()); + versionSummary.setETag(objectSummary.getETag()); + versionSummary.setKey(objectSummary.getKey()); + versionSummary.setLastModified(objectSummary.getLastModified()); + versionSummary.setOwner(objectSummary.getOwner()); + versionSummary.setSize(objectSummary.getSize()); + versionSummary.setStorageClass(objectSummary.getStorageClass()); + versionSummary.setIsLatest(true); + + versionListing.getVersionSummaries().add(versionSummary); + } + + return versionListing; + } + + @Override + public void setNextMarker() { + listObjectsRequest.setMarker(objectListing.getNextMarker()); + } + + @Override + public boolean isTruncated() { + return (objectListing == null) ? false : objectListing.isTruncated(); + } + } + + public class S3VersionBucketLister implements S3BucketLister { + private AmazonS3 client; + private ListVersionsRequest listVersionsRequest; + private VersionListing versionListing; + + public S3VersionBucketLister(AmazonS3 client) { + this.client = client; + } + + @Override + public void setBucketName(String bucketName) { + listVersionsRequest = new ListVersionsRequest().withBucketName(bucketName); + } + + @Override + public void setPrefix(String prefix) { + listVersionsRequest.setPrefix(prefix); + } + + @Override + public void setDelimiter(String delimiter) { + listVersionsRequest.setDelimiter(delimiter); + } + + @Override + public VersionListing listVersions() { + versionListing = client.listVersions(listVersionsRequest); + return versionListing; + } + + @Override + public void setNextMarker() { + listVersionsRequest.setKeyMarker(versionListing.getNextKeyMarker()); + listVersionsRequest.setVersionIdMarker(versionListing.getNextVersionIdMarker()); + } + + @Override + public boolean isTruncated() { + return (versionListing == null) ? false : versionListing.isTruncated(); + } + } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java index 95b4ba8c55..0ecc33e946 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java @@ -52,7 +52,7 @@ import static org.junit.Assert.fail; public abstract class AbstractS3IT { protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt"; - protected final static String REGION = "us-west-1"; + protected final static String REGION = System.getProperty("it.aws.region", "us-west-1"); // Adding REGION to bucket prevents errors of // "A conflicting conditional operation is currently in progress against this resource." // when bucket is rapidly added/deleted and consistency propogation causes this error. @@ -82,7 +82,9 @@ public abstract class AbstractS3IT { fail("Bucket " + BUCKET_NAME + " exists. Choose a different bucket name to continue test"); } - CreateBucketRequest request = new CreateBucketRequest(BUCKET_NAME, REGION); + CreateBucketRequest request = REGION.contains("east") + ? new CreateBucketRequest(BUCKET_NAME) // See https://github.com/boto/boto3/issues/125 + : new CreateBucketRequest(BUCKET_NAME, REGION); client.createBucket(request); } catch (final AmazonS3Exception e) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java index 6d77eb6790..9370022bf0 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java @@ -124,11 +124,32 @@ public class ITListS3 extends AbstractS3IT { flowFiles.get(0).assertAttributeEquals("filename", "b/c"); } + @Test + public void testSimpleListWithPrefixAndVersions() throws Throwable { + putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + + final TestRunner runner = TestRunners.newTestRunner(new ListS3()); + + runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(ListS3.REGION, REGION); + runner.setProperty(ListS3.BUCKET, BUCKET_NAME); + runner.setProperty(ListS3.PREFIX, "b/"); + runner.setProperty(ListS3.USE_VERSIONS, "true"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + flowFiles.get(0).assertAttributeEquals("filename", "b/c"); + } + @Test public void testGetPropertyDescriptors() throws Exception { ListS3 processor = new ListS3(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 13, pd.size()); + assertEquals("size should be eq", 14, pd.size()); assertTrue(pd.contains(ListS3.ACCESS_KEY)); assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(ListS3.BUCKET)); @@ -142,5 +163,6 @@ public class ITListS3 extends AbstractS3IT { assertTrue(pd.contains(ListS3.PROXY_HOST_PORT)); assertTrue(pd.contains(ListS3.DELIMITER)); assertTrue(pd.contains(ListS3.PREFIX)); + assertTrue(pd.contains(ListS3.USE_VERSIONS)); } }