mirror of https://github.com/apache/nifi.git
NIFI-5221 - Added 'Object Tagging' functionalities to S3 Processors
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #2751.
This commit is contained in:
parent
be63378a1e
commit
3752398943
|
@ -78,13 +78,6 @@
|
||||||
<groupId>com.amazonaws</groupId>
|
<groupId>com.amazonaws</groupId>
|
||||||
<artifactId>aws-java-sdk-sts</artifactId>
|
<artifactId>aws-java-sdk-sts</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- Test Dependencies for testing interaction with AWS -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
|
||||||
<artifactId>jackson-databind</artifactId>
|
|
||||||
<version>2.6.6</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -26,6 +26,9 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
|
||||||
|
import com.amazonaws.services.s3.model.GetObjectTaggingResult;
|
||||||
|
import com.amazonaws.services.s3.model.Tag;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
|
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
|
||||||
|
@ -80,7 +83,9 @@ import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||||
@WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
|
@WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
|
||||||
@WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
|
@WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
|
||||||
@WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),
|
@WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),
|
||||||
@WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable")})
|
@WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable"),
|
||||||
|
@WritesAttribute(attribute = "s3.tag.___", description = "If 'Write Object Tags' is set to 'True', the tags associated to the S3 object that is being listed " +
|
||||||
|
"will be written as part of the flowfile attributes")})
|
||||||
@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
|
@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
|
||||||
public class ListS3 extends AbstractS3Processor {
|
public class ListS3 extends AbstractS3Processor {
|
||||||
|
|
||||||
|
@ -136,11 +141,20 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
.defaultValue("0 sec")
|
.defaultValue("0 sec")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor WRITE_OBJECT_TAGS = new PropertyDescriptor.Builder()
|
||||||
|
.name("write-s3-object-tags")
|
||||||
|
.displayName("Write Object Tags")
|
||||||
|
.description("If set to 'True', the tags associated with the S3 object will be written as FlowFile attributes")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False"))
|
||||||
|
.defaultValue("false")
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
|
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, WRITE_OBJECT_TAGS, CREDENTIALS_FILE,
|
||||||
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
|
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
|
||||||
SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD,
|
SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME,
|
||||||
DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE));
|
PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE));
|
||||||
|
|
||||||
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
||||||
new HashSet<>(Collections.singletonList(REL_SUCCESS)));
|
new HashSet<>(Collections.singletonList(REL_SUCCESS)));
|
||||||
|
@ -263,6 +277,10 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
attributes.put("s3.version", versionSummary.getVersionId());
|
attributes.put("s3.version", versionSummary.getVersionId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
|
||||||
|
attributes.putAll(writeObjectTags(client, versionSummary));
|
||||||
|
}
|
||||||
|
|
||||||
// Create the flowfile
|
// Create the flowfile
|
||||||
FlowFile flowFile = session.create();
|
FlowFile flowFile = session.create();
|
||||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||||
|
@ -307,6 +325,20 @@ public class ListS3 extends AbstractS3Processor {
|
||||||
return willCommit;
|
return willCommit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<String, String> writeObjectTags(AmazonS3 client, S3VersionSummary versionSummary) {
|
||||||
|
final GetObjectTaggingResult taggingResult = client.getObjectTagging(new GetObjectTaggingRequest(versionSummary.getBucketName(), versionSummary.getKey()));
|
||||||
|
final Map<String, String> tagMap = new HashMap<>();
|
||||||
|
|
||||||
|
if (taggingResult != null) {
|
||||||
|
final List<Tag> tags = taggingResult.getTagSet();
|
||||||
|
|
||||||
|
for (final Tag tag : tags) {
|
||||||
|
tagMap.put("s3.tag." + tag.getKey(), tag.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tagMap;
|
||||||
|
}
|
||||||
|
|
||||||
private interface S3BucketLister {
|
private interface S3BucketLister {
|
||||||
public void setBucketName(String bucketName);
|
public void setBucketName(String bucketName);
|
||||||
public void setPrefix(String prefix);
|
public void setPrefix(String prefix);
|
||||||
|
|
|
@ -39,6 +39,8 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.model.ObjectTagging;
|
||||||
|
import com.amazonaws.services.s3.model.Tag;
|
||||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
@ -49,6 +51,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
@ -205,11 +208,32 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
.defaultValue(NO_SERVER_SIDE_ENCRYPTION)
|
.defaultValue(NO_SERVER_SIDE_ENCRYPTION)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor OBJECT_TAGS_PREFIX = new PropertyDescriptor.Builder()
|
||||||
|
.name("s3-object-tags-prefix")
|
||||||
|
.displayName("Object Tags Prefix")
|
||||||
|
.description("Specifies the prefix which would be scanned against the incoming FlowFile's attributes and the matching attribute's " +
|
||||||
|
"name and value would be considered as the outgoing S3 object's Tag name and Tag value respectively. For Ex: If the " +
|
||||||
|
"incoming FlowFile carries the attributes tagS3country, tagS3PII, the tag prefix to be specified would be 'tagS3'")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor REMOVE_TAG_PREFIX = new PropertyDescriptor.Builder()
|
||||||
|
.name("s3-object-remove-tags-prefix")
|
||||||
|
.displayName("Remove Tag Prefix")
|
||||||
|
.description("If set to 'True', the value provided for '" + OBJECT_TAGS_PREFIX.getDisplayName() + "' will be removed from " +
|
||||||
|
"the attribute(s) and then considered as the Tag name. For ex: If the incoming FlowFile carries the attributes tagS3country, " +
|
||||||
|
"tagS3PII and the prefix is set to 'tagS3' then the corresponding tag values would be 'country' and 'PII'")
|
||||||
|
.allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False"))
|
||||||
|
.defaultValue("false")
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
|
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX,
|
||||||
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE,
|
STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
|
||||||
ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE,
|
CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL,
|
||||||
SERVER_SIDE_ENCRYPTION, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
|
MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
|
||||||
|
|
||||||
final static String S3_BUCKET_KEY = "s3.bucket";
|
final static String S3_BUCKET_KEY = "s3.bucket";
|
||||||
final static String S3_OBJECT_KEY = "s3.key";
|
final static String S3_OBJECT_KEY = "s3.key";
|
||||||
|
@ -415,6 +439,7 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
* Then
|
* Then
|
||||||
*/
|
*/
|
||||||
try {
|
try {
|
||||||
|
final FlowFile flowFileCopy = flowFile;
|
||||||
session.read(flowFile, new InputStreamCallback() {
|
session.read(flowFile, new InputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void process(final InputStream rawIn) throws IOException {
|
public void process(final InputStream rawIn) throws IOException {
|
||||||
|
@ -471,6 +496,10 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
request.withCannedAcl(cannedAcl);
|
request.withCannedAcl(cannedAcl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
|
||||||
|
request.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final PutObjectResult result = s3.putObject(request);
|
final PutObjectResult result = s3.putObject(request);
|
||||||
if (result.getVersionId() != null) {
|
if (result.getVersionId() != null) {
|
||||||
|
@ -562,6 +591,11 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
if (cannedAcl != null) {
|
if (cannedAcl != null) {
|
||||||
initiateRequest.withCannedACL(cannedAcl);
|
initiateRequest.withCannedACL(cannedAcl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (context.getProperty(OBJECT_TAGS_PREFIX).isSet()) {
|
||||||
|
initiateRequest.setTagging(new ObjectTagging(getObjectTags(context, flowFileCopy)));
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final InitiateMultipartUploadResult initiateResult =
|
final InitiateMultipartUploadResult initiateResult =
|
||||||
s3.initiateMultipartUpload(initiateRequest);
|
s3.initiateMultipartUpload(initiateRequest);
|
||||||
|
@ -785,6 +819,26 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<Tag> getObjectTags(ProcessContext context, FlowFile flowFile) {
|
||||||
|
final String prefix = context.getProperty(OBJECT_TAGS_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final List<Tag> objectTags = new ArrayList<>();
|
||||||
|
final Map<String, String> attributesMap = flowFile.getAttributes();
|
||||||
|
|
||||||
|
attributesMap.entrySet().stream().sequential()
|
||||||
|
.filter(attribute -> attribute.getKey().startsWith(prefix))
|
||||||
|
.forEach(attribute -> {
|
||||||
|
String tagKey = attribute.getKey();
|
||||||
|
String tagValue = attribute.getValue();
|
||||||
|
|
||||||
|
if (context.getProperty(REMOVE_TAG_PREFIX).asBoolean()) {
|
||||||
|
tagKey = tagKey.replace(prefix, "");
|
||||||
|
}
|
||||||
|
objectTags.add(new Tag(tagKey, tagValue));
|
||||||
|
});
|
||||||
|
|
||||||
|
return objectTags;
|
||||||
|
}
|
||||||
|
|
||||||
protected static class MultipartState implements Serializable {
|
protected static class MultipartState implements Serializable {
|
||||||
|
|
||||||
private static final long serialVersionUID = 9006072180563519740L;
|
private static final long serialVersionUID = 9006072180563519740L;
|
||||||
|
|
|
@ -23,8 +23,11 @@ import com.amazonaws.services.s3.model.CreateBucketRequest;
|
||||||
import com.amazonaws.services.s3.model.DeleteBucketRequest;
|
import com.amazonaws.services.s3.model.DeleteBucketRequest;
|
||||||
import com.amazonaws.services.s3.model.ObjectListing;
|
import com.amazonaws.services.s3.model.ObjectListing;
|
||||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
|
import com.amazonaws.services.s3.model.ObjectTagging;
|
||||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||||
|
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||||
|
import com.amazonaws.services.s3.model.Tag;
|
||||||
import org.apache.nifi.util.file.FileUtils;
|
import org.apache.nifi.util.file.FileUtils;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -38,6 +41,7 @@ import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -58,7 +62,7 @@ public abstract class AbstractS3IT {
|
||||||
// when bucket is rapidly added/deleted and consistency propagation causes this error.
|
// when bucket is rapidly added/deleted and consistency propagation causes this error.
|
||||||
// (Should not be necessary if REGION remains static, but added to prevent future frustration.)
|
// (Should not be necessary if REGION remains static, but added to prevent future frustration.)
|
||||||
// [see http://stackoverflow.com/questions/13898057/aws-error-message-a-conflicting-conditional-operation-is-currently-in-progress]
|
// [see http://stackoverflow.com/questions/13898057/aws-error-message-a-conflicting-conditional-operation-is-currently-in-progress]
|
||||||
protected final static String BUCKET_NAME = "test-bucket-00000000-0000-0000-0000-123456789021-" + REGION;
|
protected final static String BUCKET_NAME = "test-bucket-" + System.currentTimeMillis() + "-" + REGION;
|
||||||
|
|
||||||
// Static so multiple Tests can use same client
|
// Static so multiple Tests can use same client
|
||||||
protected static AmazonS3Client client;
|
protected static AmazonS3Client client;
|
||||||
|
@ -144,6 +148,12 @@ public abstract class AbstractS3IT {
|
||||||
client.putObject(putRequest);
|
client.putObject(putRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void putFileWithObjectTag(String key, File file, List<Tag> objectTags) {
|
||||||
|
PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, file);
|
||||||
|
putRequest.setTagging(new ObjectTagging(objectTags));
|
||||||
|
PutObjectResult result = client.putObject(putRequest);
|
||||||
|
}
|
||||||
|
|
||||||
protected Path getResourcePath(String resourceName) {
|
protected Path getResourcePath(String resourceName) {
|
||||||
Path path = null;
|
Path path = null;
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.aws.s3;
|
package org.apache.nifi.processors.aws.s3;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.model.Tag;
|
||||||
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
|
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
|
||||||
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
|
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
@ -24,6 +25,7 @@ import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
|
@ -64,7 +66,7 @@ public class ITListS3 extends AbstractS3IT {
|
||||||
|
|
||||||
runner.addControllerService("awsCredentialsProvider", serviceImpl);
|
runner.addControllerService("awsCredentialsProvider", serviceImpl);
|
||||||
|
|
||||||
runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties");
|
runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
runner.enableControllerService(serviceImpl);
|
runner.enableControllerService(serviceImpl);
|
||||||
runner.assertValid(serviceImpl);
|
runner.assertValid(serviceImpl);
|
||||||
|
|
||||||
|
@ -142,4 +144,33 @@ public class ITListS3 extends AbstractS3IT {
|
||||||
flowFiles.get(0).assertAttributeEquals("filename", "b/c");
|
flowFiles.get(0).assertAttributeEquals("filename", "b/c");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testObjectTagsWritten() {
|
||||||
|
List<Tag> objectTags = new ArrayList<>();
|
||||||
|
objectTags.add(new Tag("dummytag1", "dummyvalue1"));
|
||||||
|
objectTags.add(new Tag("dummytag2", "dummyvalue2"));
|
||||||
|
|
||||||
|
putFileWithObjectTag("b/fileWithTag", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), objectTags);
|
||||||
|
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new ListS3());
|
||||||
|
|
||||||
|
runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(ListS3.PREFIX, "b/");
|
||||||
|
runner.setProperty(ListS3.REGION, REGION);
|
||||||
|
runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
|
||||||
|
runner.setProperty(ListS3.WRITE_OBJECT_TAGS, "true");
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
MockFlowFile flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
|
||||||
|
|
||||||
|
flowFiles.assertAttributeEquals("filename", "b/fileWithTag");
|
||||||
|
flowFiles.assertAttributeExists("s3.tag.dummytag1");
|
||||||
|
flowFiles.assertAttributeExists("s3.tag.dummytag2");
|
||||||
|
flowFiles.assertAttributeEquals("s3.tag.dummytag1", "dummyvalue1");
|
||||||
|
flowFiles.assertAttributeEquals("s3.tag.dummytag2", "dummyvalue2");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,8 +28,11 @@ import java.util.Map;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import com.amazonaws.AmazonServiceException;
|
import com.amazonaws.AmazonServiceException;
|
||||||
|
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
|
||||||
|
import com.amazonaws.services.s3.model.GetObjectTaggingResult;
|
||||||
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
|
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
|
||||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
|
import com.amazonaws.services.s3.model.Tag;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
@ -185,7 +188,7 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
|
|
||||||
runner.addControllerService("awsCredentialsProvider", serviceImpl);
|
runner.addControllerService("awsCredentialsProvider", serviceImpl);
|
||||||
|
|
||||||
runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties");
|
runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
runner.enableControllerService(serviceImpl);
|
runner.enableControllerService(serviceImpl);
|
||||||
|
|
||||||
runner.assertValid(serviceImpl);
|
runner.assertValid(serviceImpl);
|
||||||
|
@ -840,6 +843,35 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
Assert.assertEquals(0, uploadList.getMultipartUploads().size());
|
Assert.assertEquals(0, uploadList.getMultipartUploads().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testObjectTags() throws IOException, InterruptedException {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
||||||
|
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutS3Object.REGION, REGION);
|
||||||
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3");
|
||||||
|
runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "true");
|
||||||
|
|
||||||
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
|
attrs.put("filename", "tag-test.txt");
|
||||||
|
attrs.put("tagS3PII", "true");
|
||||||
|
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
GetObjectTaggingResult result = client.getObjectTagging(new GetObjectTaggingRequest(BUCKET_NAME, "tag-test.txt"));
|
||||||
|
List<Tag> objectTags = result.getTagSet();
|
||||||
|
|
||||||
|
for (Tag tag : objectTags) {
|
||||||
|
System.out.println("Tag Key : " + tag.getKey() + ", Tag Value : " + tag.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue(objectTags.size() == 1);
|
||||||
|
Assert.assertEquals("PII", objectTags.get(0).getKey());
|
||||||
|
Assert.assertEquals("true", objectTags.get(0).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
private class MockAmazonS3Client extends AmazonS3Client {
|
private class MockAmazonS3Client extends AmazonS3Client {
|
||||||
MultipartUploadListing listing;
|
MultipartUploadListing listing;
|
||||||
public void setListing(MultipartUploadListing newlisting) {
|
public void setListing(MultipartUploadListing newlisting) {
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
|
||||||
import org.apache.commons.lang3.time.DateUtils;
|
import org.apache.commons.lang3.time.DateUtils;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.state.Scope;
|
import org.apache.nifi.components.state.Scope;
|
||||||
|
@ -291,17 +292,45 @@ public class TestListS3 {
|
||||||
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
|
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteObjectTags() {
|
||||||
|
runner.setProperty(ListS3.REGION, "eu-west-1");
|
||||||
|
runner.setProperty(ListS3.BUCKET, "test-bucket");
|
||||||
|
runner.setProperty(ListS3.WRITE_OBJECT_TAGS, "true");
|
||||||
|
|
||||||
|
Date lastModified = new Date();
|
||||||
|
ObjectListing objectListing = new ObjectListing();
|
||||||
|
S3ObjectSummary objectSummary1 = new S3ObjectSummary();
|
||||||
|
objectSummary1.setBucketName("test-bucket");
|
||||||
|
objectSummary1.setKey("a");
|
||||||
|
objectSummary1.setLastModified(lastModified);
|
||||||
|
objectListing.getObjectSummaries().add(objectSummary1);
|
||||||
|
|
||||||
|
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
ArgumentCaptor<GetObjectTaggingRequest> captureRequest = ArgumentCaptor.forClass(GetObjectTaggingRequest.class);
|
||||||
|
Mockito.verify(mockS3Client, Mockito.times(1)).getObjectTagging(captureRequest.capture());
|
||||||
|
GetObjectTaggingRequest request = captureRequest.getValue();
|
||||||
|
|
||||||
|
assertEquals("test-bucket", request.getBucketName());
|
||||||
|
assertEquals("a", request.getKey());
|
||||||
|
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetPropertyDescriptors() throws Exception {
|
public void testGetPropertyDescriptors() throws Exception {
|
||||||
ListS3 processor = new ListS3();
|
ListS3 processor = new ListS3();
|
||||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||||
assertEquals("size should be eq", 20, pd.size());
|
assertEquals("size should be eq", 21, pd.size());
|
||||||
assertTrue(pd.contains(ListS3.ACCESS_KEY));
|
assertTrue(pd.contains(ListS3.ACCESS_KEY));
|
||||||
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||||
assertTrue(pd.contains(ListS3.BUCKET));
|
assertTrue(pd.contains(ListS3.BUCKET));
|
||||||
assertTrue(pd.contains(ListS3.CREDENTIALS_FILE));
|
assertTrue(pd.contains(ListS3.CREDENTIALS_FILE));
|
||||||
assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE));
|
assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE));
|
||||||
assertTrue(pd.contains(ListS3.REGION));
|
assertTrue(pd.contains(ListS3.REGION));
|
||||||
|
assertTrue(pd.contains(ListS3.WRITE_OBJECT_TAGS));
|
||||||
assertTrue(pd.contains(ListS3.SECRET_KEY));
|
assertTrue(pd.contains(ListS3.SECRET_KEY));
|
||||||
assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE));
|
assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE));
|
||||||
assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));
|
assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.model.Tag;
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
@ -71,26 +72,8 @@ public class TestPutS3Object {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutSinglePart() {
|
public void testPutSinglePart() {
|
||||||
runner.setProperty(PutS3Object.REGION, "ap-northeast-1");
|
|
||||||
runner.setProperty(PutS3Object.BUCKET, "test-bucket");
|
|
||||||
runner.setProperty("x-custom-prop", "hello");
|
runner.setProperty("x-custom-prop", "hello");
|
||||||
final Map<String, String> ffAttributes = new HashMap<>();
|
testBase();
|
||||||
ffAttributes.put("filename", "testfile.txt");
|
|
||||||
runner.enqueue("Test Content", ffAttributes);
|
|
||||||
|
|
||||||
PutObjectResult putObjectResult = Mockito.spy(PutObjectResult.class);
|
|
||||||
Date expiration = new Date();
|
|
||||||
putObjectResult.setExpirationTime(expiration);
|
|
||||||
putObjectResult.setMetadata(new ObjectMetadata());
|
|
||||||
putObjectResult.setVersionId("test-version");
|
|
||||||
Mockito.when(putObjectResult.getETag()).thenReturn("test-etag");
|
|
||||||
Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
|
|
||||||
MultipartUploadListing uploadListing = new MultipartUploadListing();
|
|
||||||
Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
|
|
||||||
Mockito.when(mockS3Client.getResourceUrl(Mockito.anyString(), Mockito.anyString())).thenReturn("test-s3-url");
|
|
||||||
|
|
||||||
runner.assertValid();
|
|
||||||
runner.run(1);
|
|
||||||
|
|
||||||
ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
|
ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
|
||||||
Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
|
Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
|
||||||
|
@ -98,8 +81,10 @@ public class TestPutS3Object {
|
||||||
assertEquals("test-bucket", request.getBucketName());
|
assertEquals("test-bucket", request.getBucketName());
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
|
||||||
|
|
||||||
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
|
||||||
MockFlowFile ff0 = flowFiles.get(0);
|
MockFlowFile ff0 = flowFiles.get(0);
|
||||||
|
|
||||||
ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "testfile.txt");
|
ff0.assertAttributeEquals(CoreAttributes.FILENAME.key(), "testfile.txt");
|
||||||
ff0.assertAttributeEquals(PutS3Object.S3_ETAG_ATTR_KEY, "test-etag");
|
ff0.assertAttributeEquals(PutS3Object.S3_ETAG_ATTR_KEY, "test-etag");
|
||||||
ff0.assertAttributeEquals(PutS3Object.S3_VERSION_ATTR_KEY, "test-version");
|
ff0.assertAttributeEquals(PutS3Object.S3_VERSION_ATTR_KEY, "test-version");
|
||||||
|
@ -147,11 +132,55 @@ public class TestPutS3Object {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testObjectTags() {
|
||||||
|
runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3");
|
||||||
|
runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "false");
|
||||||
|
testBase();
|
||||||
|
|
||||||
|
ArgumentCaptor<PutObjectRequest> captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class);
|
||||||
|
Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture());
|
||||||
|
PutObjectRequest request = captureRequest.getValue();
|
||||||
|
|
||||||
|
List<Tag> tagSet = request.getTagging().getTagSet();
|
||||||
|
|
||||||
|
assertEquals(1, tagSet.size());
|
||||||
|
assertEquals("tagS3PII", tagSet.get(0).getKey());
|
||||||
|
assertEquals("true", tagSet.get(0).getValue());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testBase() {
|
||||||
|
runner.setProperty(PutS3Object.REGION, "ap-northeast-1");
|
||||||
|
runner.setProperty(PutS3Object.BUCKET, "test-bucket");
|
||||||
|
|
||||||
|
final Map<String, String> ffAttributes = new HashMap<>();
|
||||||
|
ffAttributes.put("filename", "testfile.txt");
|
||||||
|
ffAttributes.put("tagS3PII", "true");
|
||||||
|
runner.enqueue("Test Content", ffAttributes);
|
||||||
|
|
||||||
|
PutObjectResult putObjectResult = Mockito.spy(PutObjectResult.class);
|
||||||
|
Date expiration = new Date();
|
||||||
|
|
||||||
|
putObjectResult.setExpirationTime(expiration);
|
||||||
|
putObjectResult.setMetadata(new ObjectMetadata());
|
||||||
|
putObjectResult.setVersionId("test-version");
|
||||||
|
|
||||||
|
Mockito.when(putObjectResult.getETag()).thenReturn("test-etag");
|
||||||
|
Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult);
|
||||||
|
|
||||||
|
MultipartUploadListing uploadListing = new MultipartUploadListing();
|
||||||
|
Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing);
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
runner.run(1);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetPropertyDescriptors() throws Exception {
|
public void testGetPropertyDescriptors() throws Exception {
|
||||||
PutS3Object processor = new PutS3Object();
|
PutS3Object processor = new PutS3Object();
|
||||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||||
assertEquals("size should be eq", 31, pd.size());
|
assertEquals("size should be eq", 33, pd.size());
|
||||||
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
||||||
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||||
assertTrue(pd.contains(PutS3Object.BUCKET));
|
assertTrue(pd.contains(PutS3Object.BUCKET));
|
||||||
|
@ -178,6 +207,8 @@ public class TestPutS3Object {
|
||||||
assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT));
|
assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT));
|
||||||
assertTrue(pd.contains(PutS3Object.PROXY_USERNAME));
|
assertTrue(pd.contains(PutS3Object.PROXY_USERNAME));
|
||||||
assertTrue(pd.contains(PutS3Object.PROXY_PASSWORD));
|
assertTrue(pd.contains(PutS3Object.PROXY_PASSWORD));
|
||||||
|
assertTrue(pd.contains(PutS3Object.OBJECT_TAGS_PREFIX));
|
||||||
|
assertTrue(pd.contains(PutS3Object.REMOVE_TAG_PREFIX));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue