NIFI-2631: Adding page commits and 'Use Versions' to ListS3

This closes #916 and closes #917.

Signed-off-by: James Wing <jvwing@gmail.com>
This commit is contained in:
Joe Gresock 2016-08-23 12:46:12 +00:00 committed by James Wing
parent 97e2f406da
commit 0e64c3ed30
3 changed files with 214 additions and 41 deletions

View File

@ -16,10 +16,16 @@
*/ */
package org.apache.nifi.processors.aws.s3; package org.apache.nifi.processors.aws.s3;
import com.amazonaws.services.s3.AmazonS3; import java.io.IOException;
import com.amazonaws.services.s3.model.ListObjectsRequest; import java.util.Arrays;
import com.amazonaws.services.s3.model.ObjectListing; import java.util.Collections;
import com.amazonaws.services.s3.model.S3ObjectSummary; import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.Stateful;
@ -40,15 +46,13 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException; import com.amazonaws.services.s3.AmazonS3;
import java.util.Arrays; import com.amazonaws.services.s3.model.ListObjectsRequest;
import java.util.Collections; import com.amazonaws.services.s3.model.ListVersionsRequest;
import java.util.HashMap; import com.amazonaws.services.s3.model.ObjectListing;
import java.util.HashSet; import com.amazonaws.services.s3.model.S3ObjectSummary;
import java.util.List; import com.amazonaws.services.s3.model.S3VersionSummary;
import java.util.Map; import com.amazonaws.services.s3.model.VersionListing;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@TriggerSerially @TriggerSerially
@TriggerWhenEmpty @TriggerWhenEmpty
@ -66,9 +70,11 @@ import java.util.concurrent.TimeUnit;
@WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
@WritesAttribute(attribute = "filename", description = "The name of the file"), @WritesAttribute(attribute = "filename", description = "The name of the file"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
@WritesAttribute(attribute = "s3.isLatest", description = "A boolean indicating if this is the latest version of the object"),
@WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"), @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
@WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"), @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
@WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),}) @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),
@WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable")})
@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class}) @SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
public class ListS3 extends AbstractS3Processor { public class ListS3 extends AbstractS3Processor {
@ -91,10 +97,21 @@ public class ListS3 extends AbstractS3Processor {
.description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').") .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
.build(); .build();
public static final PropertyDescriptor USE_VERSIONS = new PropertyDescriptor.Builder()
.name("use-versions")
.displayName("Use Versions")
.expressionLanguageSupported(false)
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.description("Specifies whether to use S3 versions, if applicable. If false, only the latest version of each object will be returned.")
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX)); PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS));
public static final Set<Relationship> relationships = Collections.unmodifiableSet( public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Collections.singletonList(REL_SUCCESS))); new HashSet<>(Collections.singletonList(REL_SUCCESS)));
@ -171,35 +188,46 @@ public class ListS3 extends AbstractS3Processor {
String delimiter = context.getProperty(DELIMITER).getValue(); String delimiter = context.getProperty(DELIMITER).getValue();
String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket); boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
S3BucketLister bucketLister = useVersions
? new S3VersionBucketLister(client)
: new S3ObjectBucketLister(client);
bucketLister.setBucketName(bucket);
if (delimiter != null && !delimiter.isEmpty()) { if (delimiter != null && !delimiter.isEmpty()) {
listObjectsRequest.setDelimiter(delimiter); bucketLister.setDelimiter(delimiter);
} }
if (prefix != null && !prefix.isEmpty()) { if (prefix != null && !prefix.isEmpty()) {
listObjectsRequest.setPrefix(prefix); bucketLister.setPrefix(prefix);
} }
ObjectListing objectListing; VersionListing versionListing;
do { do {
objectListing = client.listObjects(listObjectsRequest); versionListing = bucketLister.listVersions();
for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
long lastModified = objectSummary.getLastModified().getTime(); long lastModified = versionSummary.getLastModified().getTime();
if (lastModified < currentTimestamp if (lastModified < currentTimestamp
|| lastModified == currentTimestamp && currentKeys.contains(objectSummary.getKey())) { || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())) {
continue; continue;
} }
// Create the attributes // Create the attributes
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), objectSummary.getKey()); attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
attributes.put("s3.bucket", objectSummary.getBucketName()); attributes.put("s3.bucket", versionSummary.getBucketName());
if (objectSummary.getOwner() != null) { // We may not have permission to read the owner if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
attributes.put("s3.owner", objectSummary.getOwner().getId()); attributes.put("s3.owner", versionSummary.getOwner().getId());
} }
attributes.put("s3.etag", objectSummary.getETag()); attributes.put("s3.etag", versionSummary.getETag());
attributes.put("s3.lastModified", String.valueOf(lastModified)); attributes.put("s3.lastModified", String.valueOf(lastModified));
attributes.put("s3.length", String.valueOf(objectSummary.getSize())); attributes.put("s3.length", String.valueOf(versionSummary.getSize()));
attributes.put("s3.storeClass", objectSummary.getStorageClass()); attributes.put("s3.storeClass", versionSummary.getStorageClass());
attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest()));
if (versionSummary.getVersionId() != null) {
attributes.put("s3.version", versionSummary.getVersionId());
}
// Create the flowfile // Create the flowfile
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
@ -212,24 +240,145 @@ public class ListS3 extends AbstractS3Processor {
currentKeys.clear(); currentKeys.clear();
} }
if (lastModified == maxTimestamp) { if (lastModified == maxTimestamp) {
currentKeys.add(objectSummary.getKey()); currentKeys.add(versionSummary.getKey());
} }
listCount++; listCount++;
} }
listObjectsRequest.setMarker(objectListing.getNextMarker()); bucketLister.setNextMarker();
} while (objectListing.isTruncated());
commit(context, session, listCount);
listCount = 0;
} while (bucketLister.isTruncated());
currentTimestamp = maxTimestamp; currentTimestamp = maxTimestamp;
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis});
if (listCount > 0) { if (!commit(context, session, listCount)) {
getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount}); if (currentTimestamp > 0) {
session.commit(); persistState(context);
persistState(context); }
} else {
getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket});
context.yield(); context.yield();
} }
} }
private boolean commit(final ProcessContext context, final ProcessSession session, int listCount) {
boolean willCommit = listCount > 0;
if (willCommit) {
getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount});
session.commit();
persistState(context);
}
return willCommit;
}
private interface S3BucketLister {
public void setBucketName(String bucketName);
public void setPrefix(String prefix);
public void setDelimiter(String delimiter);
// Versions have a superset of the fields that Objects have, so we'll use
// them as a common interface
public VersionListing listVersions();
public void setNextMarker();
public boolean isTruncated();
}
public class S3ObjectBucketLister implements S3BucketLister {
private AmazonS3 client;
private ListObjectsRequest listObjectsRequest;
private ObjectListing objectListing;
public S3ObjectBucketLister(AmazonS3 client) {
this.client = client;
}
@Override
public void setBucketName(String bucketName) {
listObjectsRequest = new ListObjectsRequest().withBucketName(bucketName);
}
@Override
public void setPrefix(String prefix) {
listObjectsRequest.setPrefix(prefix);
}
@Override
public void setDelimiter(String delimiter) {
listObjectsRequest.setDelimiter(delimiter);
}
@Override
public VersionListing listVersions() {
VersionListing versionListing = new VersionListing();
this.objectListing = client.listObjects(listObjectsRequest);
for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
S3VersionSummary versionSummary = new S3VersionSummary();
versionSummary.setBucketName(objectSummary.getBucketName());
versionSummary.setETag(objectSummary.getETag());
versionSummary.setKey(objectSummary.getKey());
versionSummary.setLastModified(objectSummary.getLastModified());
versionSummary.setOwner(objectSummary.getOwner());
versionSummary.setSize(objectSummary.getSize());
versionSummary.setStorageClass(objectSummary.getStorageClass());
versionSummary.setIsLatest(true);
versionListing.getVersionSummaries().add(versionSummary);
}
return versionListing;
}
@Override
public void setNextMarker() {
listObjectsRequest.setMarker(objectListing.getNextMarker());
}
@Override
public boolean isTruncated() {
return (objectListing == null) ? false : objectListing.isTruncated();
}
}
public class S3VersionBucketLister implements S3BucketLister {
private AmazonS3 client;
private ListVersionsRequest listVersionsRequest;
private VersionListing versionListing;
public S3VersionBucketLister(AmazonS3 client) {
this.client = client;
}
@Override
public void setBucketName(String bucketName) {
listVersionsRequest = new ListVersionsRequest().withBucketName(bucketName);
}
@Override
public void setPrefix(String prefix) {
listVersionsRequest.setPrefix(prefix);
}
@Override
public void setDelimiter(String delimiter) {
listVersionsRequest.setDelimiter(delimiter);
}
@Override
public VersionListing listVersions() {
versionListing = client.listVersions(listVersionsRequest);
return versionListing;
}
@Override
public void setNextMarker() {
listVersionsRequest.setKeyMarker(versionListing.getNextKeyMarker());
listVersionsRequest.setVersionIdMarker(versionListing.getNextVersionIdMarker());
}
@Override
public boolean isTruncated() {
return (versionListing == null) ? false : versionListing.isTruncated();
}
}
} }

View File

@ -52,7 +52,7 @@ import static org.junit.Assert.fail;
public abstract class AbstractS3IT { public abstract class AbstractS3IT {
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt"; protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt";
protected final static String REGION = "us-west-1"; protected final static String REGION = System.getProperty("it.aws.region", "us-west-1");
// Adding REGION to bucket prevents errors of // Adding REGION to bucket prevents errors of
// "A conflicting conditional operation is currently in progress against this resource." // "A conflicting conditional operation is currently in progress against this resource."
// when bucket is rapidly added/deleted and consistency propogation causes this error. // when bucket is rapidly added/deleted and consistency propogation causes this error.
@ -82,7 +82,9 @@ public abstract class AbstractS3IT {
fail("Bucket " + BUCKET_NAME + " exists. Choose a different bucket name to continue test"); fail("Bucket " + BUCKET_NAME + " exists. Choose a different bucket name to continue test");
} }
CreateBucketRequest request = new CreateBucketRequest(BUCKET_NAME, REGION); CreateBucketRequest request = REGION.contains("east")
? new CreateBucketRequest(BUCKET_NAME) // See https://github.com/boto/boto3/issues/125
: new CreateBucketRequest(BUCKET_NAME, REGION);
client.createBucket(request); client.createBucket(request);
} catch (final AmazonS3Exception e) { } catch (final AmazonS3Exception e) {

View File

@ -124,11 +124,32 @@ public class ITListS3 extends AbstractS3IT {
flowFiles.get(0).assertAttributeEquals("filename", "b/c"); flowFiles.get(0).assertAttributeEquals("filename", "b/c");
} }
@Test
public void testSimpleListWithPrefixAndVersions() throws Throwable {
putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(ListS3.REGION, REGION);
runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
runner.setProperty(ListS3.PREFIX, "b/");
runner.setProperty(ListS3.USE_VERSIONS, "true");
runner.run();
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
flowFiles.get(0).assertAttributeEquals("filename", "b/c");
}
@Test @Test
public void testGetPropertyDescriptors() throws Exception { public void testGetPropertyDescriptors() throws Exception {
ListS3 processor = new ListS3(); ListS3 processor = new ListS3();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 13, pd.size()); assertEquals("size should be eq", 14, pd.size());
assertTrue(pd.contains(ListS3.ACCESS_KEY)); assertTrue(pd.contains(ListS3.ACCESS_KEY));
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(ListS3.BUCKET)); assertTrue(pd.contains(ListS3.BUCKET));
@ -142,5 +163,6 @@ public class ITListS3 extends AbstractS3IT {
assertTrue(pd.contains(ListS3.PROXY_HOST_PORT)); assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
assertTrue(pd.contains(ListS3.DELIMITER)); assertTrue(pd.contains(ListS3.DELIMITER));
assertTrue(pd.contains(ListS3.PREFIX)); assertTrue(pd.contains(ListS3.PREFIX));
assertTrue(pd.contains(ListS3.USE_VERSIONS));
} }
} }