mirror of https://github.com/apache/nifi.git
NIFI-6468: Adding AWS S3 'requester pays' to Fetch and List processors.
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #3601.
This commit is contained in:
parent
7d77b464cc
commit
32c46f0bdd
|
@ -32,6 +32,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
@ -73,10 +74,23 @@ public class FetchS3Object extends AbstractS3Processor {
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
.required(false)
|
.required(false)
|
||||||
.build();
|
.build();
|
||||||
|
public static final PropertyDescriptor REQUESTER_PAYS = new PropertyDescriptor.Builder()
|
||||||
|
.name("requester-pays")
|
||||||
|
.displayName("Requester Pays")
|
||||||
|
.required(true)
|
||||||
|
.description("If true, indicates that the requester consents to pay any charges associated with retrieving objects from "
|
||||||
|
+ "the S3 bucket. This sets the 'x-amz-request-payer' header to 'requester'.")
|
||||||
|
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||||
|
.allowableValues(new AllowableValue("true", "True", "Indicates that the requester consents to pay any charges associated "
|
||||||
|
+ "with retrieving objects from the S3 bucket."), new AllowableValue("false", "False", "Does not consent to pay "
|
||||||
|
+ "requester charges for retrieving objects from the S3 bucket."))
|
||||||
|
.defaultValue("false")
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
|
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
|
||||||
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
|
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD,
|
||||||
|
REQUESTER_PAYS));
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
@ -94,6 +108,7 @@ public class FetchS3Object extends AbstractS3Processor {
|
||||||
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
|
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
|
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
|
final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
|
||||||
|
|
||||||
final AmazonS3 client = getClient();
|
final AmazonS3 client = getClient();
|
||||||
final GetObjectRequest request;
|
final GetObjectRequest request;
|
||||||
|
@ -102,6 +117,7 @@ public class FetchS3Object extends AbstractS3Processor {
|
||||||
} else {
|
} else {
|
||||||
request = new GetObjectRequest(bucket, key, versionId);
|
request = new GetObjectRequest(bucket, key, versionId);
|
||||||
}
|
}
|
||||||
|
request.setRequesterPays(requesterPays);
|
||||||
|
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
try (final S3Object s3Object = client.getObject(request)) {
|
try (final S3Object s3Object = client.getObject(request)) {
|
||||||
|
|
|
@ -52,6 +52,9 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.components.Validator;
|
||||||
import org.apache.nifi.components.state.Scope;
|
import org.apache.nifi.components.state.Scope;
|
||||||
import org.apache.nifi.components.state.StateMap;
|
import org.apache.nifi.components.state.StateMap;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
|
@ -153,6 +156,19 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
.allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False"))
|
.allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False"))
|
||||||
.defaultValue("false")
|
.defaultValue("false")
|
||||||
.build();
|
.build();
|
||||||
|
public static final PropertyDescriptor REQUESTER_PAYS = new PropertyDescriptor.Builder()
|
||||||
|
.name("requester-pays")
|
||||||
|
.displayName("Requester Pays")
|
||||||
|
.required(true)
|
||||||
|
.description("If true, indicates that the requester consents to pay any charges associated with listing "
|
||||||
|
+ "the S3 bucket. This sets the 'x-amz-request-payer' header to 'requester'. Note that this "
|
||||||
|
+ "setting is not applicable when 'Use Versions' is 'true'.")
|
||||||
|
.addValidator(createRequesterPaysValidator())
|
||||||
|
.allowableValues(new AllowableValue("true", "True", "Indicates that the requester consents to pay any charges associated "
|
||||||
|
+ "with listing the S3 bucket."), new AllowableValue("false", "False", "Does not consent to pay "
|
||||||
|
+ "requester charges for listing the S3 bucket."))
|
||||||
|
.defaultValue("false")
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor WRITE_USER_METADATA = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor WRITE_USER_METADATA = new PropertyDescriptor.Builder()
|
||||||
.name("write-s3-user-metadata")
|
.name("write-s3-user-metadata")
|
||||||
|
@ -168,7 +184,7 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, WRITE_OBJECT_TAGS, WRITE_USER_METADATA, CREDENTIALS_FILE,
|
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, WRITE_OBJECT_TAGS, WRITE_USER_METADATA, CREDENTIALS_FILE,
|
||||||
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
|
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
|
||||||
SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME,
|
SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME,
|
||||||
PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE));
|
PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE, REQUESTER_PAYS));
|
||||||
|
|
||||||
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
||||||
new HashSet<>(Collections.singletonList(REL_SUCCESS)));
|
new HashSet<>(Collections.singletonList(REL_SUCCESS)));
|
||||||
|
@ -180,6 +196,23 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
private long currentTimestamp = 0L;
|
private long currentTimestamp = 0L;
|
||||||
private Set<String> currentKeys;
|
private Set<String> currentKeys;
|
||||||
|
|
||||||
|
private static Validator createRequesterPaysValidator() {
|
||||||
|
return new Validator() {
|
||||||
|
@Override
|
||||||
|
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
|
||||||
|
boolean requesterPays = Boolean.valueOf(input);
|
||||||
|
boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
|
||||||
|
boolean valid = !requesterPays || !useVersions;
|
||||||
|
return new ValidationResult.Builder()
|
||||||
|
.input(input)
|
||||||
|
.subject(subject)
|
||||||
|
.valid(valid)
|
||||||
|
.explanation(valid ? null : "'Requester Pays' cannot be used when listing object versions.")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
return properties;
|
return properties;
|
||||||
|
@ -240,6 +273,7 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
|
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
|
||||||
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
|
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||||
final long listingTimestamp = System.currentTimeMillis();
|
final long listingTimestamp = System.currentTimeMillis();
|
||||||
|
final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
|
||||||
|
|
||||||
final AmazonS3 client = getClient();
|
final AmazonS3 client = getClient();
|
||||||
int listCount = 0;
|
int listCount = 0;
|
||||||
|
@ -257,6 +291,7 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
: new S3ObjectBucketLister(client);
|
: new S3ObjectBucketLister(client);
|
||||||
|
|
||||||
bucketLister.setBucketName(bucket);
|
bucketLister.setBucketName(bucket);
|
||||||
|
bucketLister.setRequesterPays(requesterPays);
|
||||||
|
|
||||||
if (delimiter != null && !delimiter.isEmpty()) {
|
if (delimiter != null && !delimiter.isEmpty()) {
|
||||||
bucketLister.setDelimiter(delimiter);
|
bucketLister.setDelimiter(delimiter);
|
||||||
|
@ -386,6 +421,7 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
public void setBucketName(String bucketName);
|
public void setBucketName(String bucketName);
|
||||||
public void setPrefix(String prefix);
|
public void setPrefix(String prefix);
|
||||||
public void setDelimiter(String delimiter);
|
public void setDelimiter(String delimiter);
|
||||||
|
public void setRequesterPays(boolean requesterPays);
|
||||||
// Versions have a superset of the fields that Objects have, so we'll use
|
// Versions have a superset of the fields that Objects have, so we'll use
|
||||||
// them as a common interface
|
// them as a common interface
|
||||||
public VersionListing listVersions();
|
public VersionListing listVersions();
|
||||||
|
@ -417,6 +453,11 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
listObjectsRequest.setDelimiter(delimiter);
|
listObjectsRequest.setDelimiter(delimiter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRequesterPays(boolean requesterPays) {
|
||||||
|
listObjectsRequest.setRequesterPays(requesterPays);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VersionListing listVersions() {
|
public VersionListing listVersions() {
|
||||||
VersionListing versionListing = new VersionListing();
|
VersionListing versionListing = new VersionListing();
|
||||||
|
@ -473,6 +514,11 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
listObjectsRequest.setDelimiter(delimiter);
|
listObjectsRequest.setDelimiter(delimiter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRequesterPays(boolean requesterPays) {
|
||||||
|
listObjectsRequest.setRequesterPays(requesterPays);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VersionListing listVersions() {
|
public VersionListing listVersions() {
|
||||||
VersionListing versionListing = new VersionListing();
|
VersionListing versionListing = new VersionListing();
|
||||||
|
@ -529,6 +575,11 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
listVersionsRequest.setDelimiter(delimiter);
|
listVersionsRequest.setDelimiter(delimiter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRequesterPays(boolean requesterPays) {
|
||||||
|
// Not supported in versionListing, so this does nothing.
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public VersionListing listVersions() {
|
public VersionListing listVersions() {
|
||||||
versionListing = client.listVersions(listVersionsRequest);
|
versionListing = client.listVersions(listVersionsRequest);
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.mockito.Mockito;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
|
||||||
|
@ -100,6 +101,65 @@ public class TestFetchS3Object {
|
||||||
GetObjectRequest request = captureRequest.getValue();
|
GetObjectRequest request = captureRequest.getValue();
|
||||||
assertEquals("request-bucket", request.getBucketName());
|
assertEquals("request-bucket", request.getBucketName());
|
||||||
assertEquals("request-key", request.getKey());
|
assertEquals("request-key", request.getKey());
|
||||||
|
assertFalse(request.isRequesterPays());
|
||||||
|
assertNull(request.getVersionId());
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
|
||||||
|
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
|
||||||
|
MockFlowFile ff = ffs.get(0);
|
||||||
|
ff.assertAttributeEquals("s3.bucket", "response-bucket-name");
|
||||||
|
ff.assertAttributeEquals(CoreAttributes.FILENAME.key(), "file.txt");
|
||||||
|
ff.assertAttributeEquals(CoreAttributes.PATH.key(), "key/path/to");
|
||||||
|
ff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), "key/path/to/file.txt");
|
||||||
|
ff.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain");
|
||||||
|
ff.assertAttributeEquals("hash.value", "testMD5hash");
|
||||||
|
ff.assertAttributeEquals("hash.algorithm", "MD5");
|
||||||
|
ff.assertAttributeEquals("s3.etag", "test-etag");
|
||||||
|
ff.assertAttributeEquals("s3.expirationTime", String.valueOf(expiration.getTime()));
|
||||||
|
ff.assertAttributeEquals("s3.expirationTimeRuleId", "testExpirationRuleId");
|
||||||
|
ff.assertAttributeEquals("userKey1", "userValue1");
|
||||||
|
ff.assertAttributeEquals("userKey2", "userValue2");
|
||||||
|
ff.assertAttributeEquals("s3.sseAlgorithm", "testAlgorithm");
|
||||||
|
ff.assertContentEquals("Some Content");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetObjectWithRequesterPays() throws IOException {
|
||||||
|
runner.setProperty(FetchS3Object.REGION, "us-east-1");
|
||||||
|
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
|
||||||
|
runner.setProperty(FetchS3Object.REQUESTER_PAYS, "true");
|
||||||
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
|
attrs.put("filename", "request-key");
|
||||||
|
runner.enqueue(new byte[0], attrs);
|
||||||
|
|
||||||
|
S3Object s3ObjectResponse = new S3Object();
|
||||||
|
s3ObjectResponse.setBucketName("response-bucket-name");
|
||||||
|
s3ObjectResponse.setKey("response-key");
|
||||||
|
s3ObjectResponse.setObjectContent(new StringInputStream("Some Content"));
|
||||||
|
ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class);
|
||||||
|
metadata.setContentDisposition("key/path/to/file.txt");
|
||||||
|
metadata.setContentType("text/plain");
|
||||||
|
metadata.setContentMD5("testMD5hash");
|
||||||
|
Date expiration = new Date();
|
||||||
|
metadata.setExpirationTime(expiration);
|
||||||
|
metadata.setExpirationTimeRuleId("testExpirationRuleId");
|
||||||
|
Map<String, String> userMetadata = new HashMap<>();
|
||||||
|
userMetadata.put("userKey1", "userValue1");
|
||||||
|
userMetadata.put("userKey2", "userValue2");
|
||||||
|
metadata.setUserMetadata(userMetadata);
|
||||||
|
metadata.setSSEAlgorithm("testAlgorithm");
|
||||||
|
Mockito.when(metadata.getETag()).thenReturn("test-etag");
|
||||||
|
s3ObjectResponse.setObjectMetadata(metadata);
|
||||||
|
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
|
||||||
|
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
ArgumentCaptor<GetObjectRequest> captureRequest = ArgumentCaptor.forClass(GetObjectRequest.class);
|
||||||
|
Mockito.verify(mockS3Client, Mockito.times(1)).getObject(captureRequest.capture());
|
||||||
|
GetObjectRequest request = captureRequest.getValue();
|
||||||
|
assertEquals("request-bucket", request.getBucketName());
|
||||||
|
assertEquals("request-key", request.getKey());
|
||||||
|
assertTrue(request.isRequesterPays());
|
||||||
assertNull(request.getVersionId());
|
assertNull(request.getVersionId());
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
|
||||||
|
@ -179,7 +239,7 @@ public class TestFetchS3Object {
|
||||||
public void testGetPropertyDescriptors() throws Exception {
|
public void testGetPropertyDescriptors() throws Exception {
|
||||||
FetchS3Object processor = new FetchS3Object();
|
FetchS3Object processor = new FetchS3Object();
|
||||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||||
assertEquals("size should be eq", 17, pd.size());
|
assertEquals("size should be eq", 18, pd.size());
|
||||||
assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
|
assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
|
||||||
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||||
assertTrue(pd.contains(FetchS3Object.BUCKET));
|
assertTrue(pd.contains(FetchS3Object.BUCKET));
|
||||||
|
@ -197,6 +257,7 @@ public class TestFetchS3Object {
|
||||||
assertTrue(pd.contains(FetchS3Object.PROXY_HOST_PORT));
|
assertTrue(pd.contains(FetchS3Object.PROXY_HOST_PORT));
|
||||||
assertTrue(pd.contains(FetchS3Object.PROXY_USERNAME));
|
assertTrue(pd.contains(FetchS3Object.PROXY_USERNAME));
|
||||||
assertTrue(pd.contains(FetchS3Object.PROXY_PASSWORD));
|
assertTrue(pd.contains(FetchS3Object.PROXY_PASSWORD));
|
||||||
|
assertTrue(pd.contains(FetchS3Object.REQUESTER_PAYS));
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,6 +50,7 @@ import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
|
||||||
|
@ -103,6 +104,7 @@ public class TestListS3 {
|
||||||
Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture());
|
Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture());
|
||||||
ListObjectsRequest request = captureRequest.getValue();
|
ListObjectsRequest request = captureRequest.getValue();
|
||||||
assertEquals("test-bucket", request.getBucketName());
|
assertEquals("test-bucket", request.getBucketName());
|
||||||
|
assertFalse(request.isRequesterPays());
|
||||||
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
|
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
|
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
|
||||||
|
@ -117,6 +119,62 @@ public class TestListS3 {
|
||||||
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
|
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListWithRequesterPays() {
|
||||||
|
runner.setProperty(ListS3.REGION, "eu-west-1");
|
||||||
|
runner.setProperty(ListS3.BUCKET, "test-bucket");
|
||||||
|
runner.setProperty(ListS3.REQUESTER_PAYS, "true");
|
||||||
|
|
||||||
|
Date lastModified = new Date();
|
||||||
|
ObjectListing objectListing = new ObjectListing();
|
||||||
|
S3ObjectSummary objectSummary1 = new S3ObjectSummary();
|
||||||
|
objectSummary1.setBucketName("test-bucket");
|
||||||
|
objectSummary1.setKey("a");
|
||||||
|
objectSummary1.setLastModified(lastModified);
|
||||||
|
objectListing.getObjectSummaries().add(objectSummary1);
|
||||||
|
S3ObjectSummary objectSummary2 = new S3ObjectSummary();
|
||||||
|
objectSummary2.setBucketName("test-bucket");
|
||||||
|
objectSummary2.setKey("b/c");
|
||||||
|
objectSummary2.setLastModified(lastModified);
|
||||||
|
objectListing.getObjectSummaries().add(objectSummary2);
|
||||||
|
S3ObjectSummary objectSummary3 = new S3ObjectSummary();
|
||||||
|
objectSummary3.setBucketName("test-bucket");
|
||||||
|
objectSummary3.setKey("d/e");
|
||||||
|
objectSummary3.setLastModified(lastModified);
|
||||||
|
objectListing.getObjectSummaries().add(objectSummary3);
|
||||||
|
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
ArgumentCaptor<ListObjectsRequest> captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class);
|
||||||
|
Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture());
|
||||||
|
ListObjectsRequest request = captureRequest.getValue();
|
||||||
|
assertEquals("test-bucket", request.getBucketName());
|
||||||
|
assertTrue(request.isRequesterPays());
|
||||||
|
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
|
||||||
|
MockFlowFile ff0 = flowFiles.get(0);
|
||||||
|
ff0.assertAttributeEquals("filename", "a");
|
||||||
|
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
|
||||||
|
String lastModifiedTimestamp = String.valueOf(lastModified.getTime());
|
||||||
|
ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
|
||||||
|
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
|
||||||
|
flowFiles.get(2).assertAttributeEquals("filename", "d/e");
|
||||||
|
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListWithRequesterPays_invalid() {
|
||||||
|
runner.setProperty(ListS3.REGION, "eu-west-1");
|
||||||
|
runner.setProperty(ListS3.BUCKET, "test-bucket");
|
||||||
|
runner.setProperty(ListS3.USE_VERSIONS, "true"); // requester pays cannot be used with versions
|
||||||
|
runner.setProperty(ListS3.REQUESTER_PAYS, "true");
|
||||||
|
|
||||||
|
runner.assertNotValid();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testListVersion2() {
|
public void testListVersion2() {
|
||||||
runner.setProperty(ListS3.REGION, "eu-west-1");
|
runner.setProperty(ListS3.REGION, "eu-west-1");
|
||||||
|
@ -148,6 +206,54 @@ public class TestListS3 {
|
||||||
Mockito.verify(mockS3Client, Mockito.times(1)).listObjectsV2(captureRequest.capture());
|
Mockito.verify(mockS3Client, Mockito.times(1)).listObjectsV2(captureRequest.capture());
|
||||||
ListObjectsV2Request request = captureRequest.getValue();
|
ListObjectsV2Request request = captureRequest.getValue();
|
||||||
assertEquals("test-bucket", request.getBucketName());
|
assertEquals("test-bucket", request.getBucketName());
|
||||||
|
assertFalse(request.isRequesterPays());
|
||||||
|
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
|
||||||
|
MockFlowFile ff0 = flowFiles.get(0);
|
||||||
|
ff0.assertAttributeEquals("filename", "a");
|
||||||
|
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
|
||||||
|
String lastModifiedTimestamp = String.valueOf(lastModified.getTime());
|
||||||
|
ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
|
||||||
|
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
|
||||||
|
flowFiles.get(2).assertAttributeEquals("filename", "d/e");
|
||||||
|
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListVersion2WithRequesterPays() {
|
||||||
|
runner.setProperty(ListS3.REGION, "eu-west-1");
|
||||||
|
runner.setProperty(ListS3.BUCKET, "test-bucket");
|
||||||
|
runner.setProperty(ListS3.REQUESTER_PAYS, "true");
|
||||||
|
runner.setProperty(ListS3.LIST_TYPE, "2");
|
||||||
|
|
||||||
|
Date lastModified = new Date();
|
||||||
|
ListObjectsV2Result objectListing = new ListObjectsV2Result();
|
||||||
|
S3ObjectSummary objectSummary1 = new S3ObjectSummary();
|
||||||
|
objectSummary1.setBucketName("test-bucket");
|
||||||
|
objectSummary1.setKey("a");
|
||||||
|
objectSummary1.setLastModified(lastModified);
|
||||||
|
objectListing.getObjectSummaries().add(objectSummary1);
|
||||||
|
S3ObjectSummary objectSummary2 = new S3ObjectSummary();
|
||||||
|
objectSummary2.setBucketName("test-bucket");
|
||||||
|
objectSummary2.setKey("b/c");
|
||||||
|
objectSummary2.setLastModified(lastModified);
|
||||||
|
objectListing.getObjectSummaries().add(objectSummary2);
|
||||||
|
S3ObjectSummary objectSummary3 = new S3ObjectSummary();
|
||||||
|
objectSummary3.setBucketName("test-bucket");
|
||||||
|
objectSummary3.setKey("d/e");
|
||||||
|
objectSummary3.setLastModified(lastModified);
|
||||||
|
objectListing.getObjectSummaries().add(objectSummary3);
|
||||||
|
Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
ArgumentCaptor<ListObjectsV2Request> captureRequest = ArgumentCaptor.forClass(ListObjectsV2Request.class);
|
||||||
|
Mockito.verify(mockS3Client, Mockito.times(1)).listObjectsV2(captureRequest.capture());
|
||||||
|
ListObjectsV2Request request = captureRequest.getValue();
|
||||||
|
assertEquals("test-bucket", request.getBucketName());
|
||||||
|
assertTrue(request.isRequesterPays());
|
||||||
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
|
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
|
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
|
||||||
|
@ -375,5 +481,6 @@ public class TestListS3 {
|
||||||
assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
|
assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
|
||||||
assertTrue(pd.contains(ListS3.PROXY_USERNAME));
|
assertTrue(pd.contains(ListS3.PROXY_USERNAME));
|
||||||
assertTrue(pd.contains(ListS3.PROXY_PASSWORD));
|
assertTrue(pd.contains(ListS3.PROXY_PASSWORD));
|
||||||
|
assertTrue(pd.contains(ListS3.REQUESTER_PAYS));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue