NIFI-4628 Add support for ListS3Version2 API

Signed-off-by: James Wing <jvwing@gmail.com>

This closes #2300.
This commit is contained in:
aburkard 2017-11-27 15:18:47 -05:00 committed by James Wing
parent dd981e87dd
commit 103a345e85
2 changed files with 125 additions and 4 deletions

View File

@ -36,6 +36,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
@ -53,6 +54,8 @@ import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
@TriggerSerially
@TriggerWhenEmpty
@ -108,10 +111,23 @@ public class ListS3 extends AbstractS3Processor {
.description("Specifies whether to use S3 versions, if applicable. If false, only the latest version of each object will be returned.")
.build();
public static final PropertyDescriptor LIST_TYPE = new PropertyDescriptor.Builder()
.name("list-type")
.displayName("List Type")
.expressionLanguageSupported(false)
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.allowableValues(
new AllowableValue("1", "List Objects V1"),
new AllowableValue("2", "List Objects V2"))
.defaultValue("1")
.description("Specifies whether to use the original List Objects or the newer List Objects Version 2 endpoint.")
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS));
SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE));
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Collections.singletonList(REL_SUCCESS)));
@ -189,9 +205,11 @@ public class ListS3 extends AbstractS3Processor {
String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
int listType = context.getProperty(LIST_TYPE).asInteger();
S3BucketLister bucketLister = useVersions
? new S3VersionBucketLister(client)
: listType == 2
? new S3ObjectBucketListerVersion2(client)
: new S3ObjectBucketLister(client);
bucketLister.setBucketName(bucket);
@ -340,6 +358,62 @@ public class ListS3 extends AbstractS3Processor {
}
}
public class S3ObjectBucketListerVersion2 implements S3BucketLister {
private AmazonS3 client;
private ListObjectsV2Request listObjectsRequest;
private ListObjectsV2Result objectListing;
public S3ObjectBucketListerVersion2(AmazonS3 client) {
this.client = client;
}
@Override
public void setBucketName(String bucketName) {
listObjectsRequest = new ListObjectsV2Request().withBucketName(bucketName);
}
@Override
public void setPrefix(String prefix) {
listObjectsRequest.setPrefix(prefix);
}
@Override
public void setDelimiter(String delimiter) {
listObjectsRequest.setDelimiter(delimiter);
}
@Override
public VersionListing listVersions() {
VersionListing versionListing = new VersionListing();
this.objectListing = client.listObjectsV2(listObjectsRequest);
for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
S3VersionSummary versionSummary = new S3VersionSummary();
versionSummary.setBucketName(objectSummary.getBucketName());
versionSummary.setETag(objectSummary.getETag());
versionSummary.setKey(objectSummary.getKey());
versionSummary.setLastModified(objectSummary.getLastModified());
versionSummary.setOwner(objectSummary.getOwner());
versionSummary.setSize(objectSummary.getSize());
versionSummary.setStorageClass(objectSummary.getStorageClass());
versionSummary.setIsLatest(true);
versionListing.getVersionSummaries().add(versionSummary);
}
return versionListing;
}
@Override
public void setNextMarker() {
listObjectsRequest.setContinuationToken(objectListing.getNextContinuationToken());
}
@Override
public boolean isTruncated() {
return (objectListing == null) ? false : objectListing.isTruncated();
}
}
public class S3VersionBucketLister implements S3BucketLister {
private AmazonS3 client;
private ListVersionsRequest listVersionsRequest;

View File

@ -37,6 +37,8 @@ import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import org.junit.Before;
import org.junit.Test;
@ -111,6 +113,51 @@ public class TestListS3 {
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
}
@Test
public void testListVersion2() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET, "test-bucket");
runner.setProperty(ListS3.LIST_TYPE, "2");
Date lastModified = new Date();
ListObjectsV2Result objectListing = new ListObjectsV2Result();
S3ObjectSummary objectSummary1 = new S3ObjectSummary();
objectSummary1.setBucketName("test-bucket");
objectSummary1.setKey("a");
objectSummary1.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary1);
S3ObjectSummary objectSummary2 = new S3ObjectSummary();
objectSummary2.setBucketName("test-bucket");
objectSummary2.setKey("b/c");
objectSummary2.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary2);
S3ObjectSummary objectSummary3 = new S3ObjectSummary();
objectSummary3.setBucketName("test-bucket");
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsV2Request> captureRequest = ArgumentCaptor.forClass(ListObjectsV2Request.class);
Mockito.verify(mockS3Client, Mockito.times(1)).listObjectsV2(captureRequest.capture());
ListObjectsV2Request request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("filename", "a");
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
String lastModifiedTimestamp = String.valueOf(lastModified.getTime());
ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
flowFiles.get(2).assertAttributeEquals("filename", "d/e");
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
}
@Test
public void testListVersions() {
runner.setProperty(ListS3.REGION, "eu-west-1");
@ -194,7 +241,7 @@ public class TestListS3 {
public void testGetPropertyDescriptors() throws Exception {
ListS3 processor = new ListS3();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 15, pd.size());
assertEquals("size should be eq", 16, pd.size());
assertTrue(pd.contains(ListS3.ACCESS_KEY));
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(ListS3.BUCKET));