From b34de74db296c5848e92c3804510c05e0df4e782 Mon Sep 17 00:00:00 2001 From: Edgardo Date: Tue, 20 Sep 2016 11:08:39 -0400 Subject: [PATCH] NIFI-2810 Allow Content Type to be set in PutS3Object processor This closes #1034. --- .../nifi/processors/aws/s3/PutS3Object.java | 24 ++++++++++++++++++- .../nifi/processors/aws/s3/ITPutS3Object.java | 20 ++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 8e744133e4..775114314d 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -109,6 +109,7 @@ import com.amazonaws.services.s3.model.UploadPartResult; @WritesAttributes({ @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"), @WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"), + @WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"), @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"), @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"), @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"), @@ -132,6 +133,20 @@ public class PutS3Object extends AbstractS3Processor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder() + .name("Content Type") + .displayName("Content Type") + .description("Sets the Content-Type HTTP header indicating the type of content stored in the associated " + + "object. The value of this header is a standard MIME type.\n" + + "AWS S3 Java client will attempt to determine the correct content type if one hasn't been set" + + " yet. Users are responsible for ensuring a suitable content type is set when uploading streams. If " + + "no content type is provided and cannot be determined by the filename, the default content type " + + "\"application/octet-stream\" will be used.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder() .name("Storage Class") .required(true) @@ -190,13 +205,14 @@ public class PutS3Object extends AbstractS3Processor { .build(); public static final List properties = Collections.unmodifiableList( - Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, + Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, PROXY_HOST, PROXY_HOST_PORT)); final static String S3_BUCKET_KEY = "s3.bucket"; final static String S3_OBJECT_KEY = "s3.key"; + final static String S3_CONTENT_TYPE = "s3.contenttype"; final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId"; final static String S3_VERSION_ATTR_KEY = "s3.version"; final static String S3_ETAG_ATTR_KEY = "s3.etag"; @@ -406,6 +422,12 @@ public class PutS3Object extends AbstractS3Processor { objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key())); objectMetadata.setContentLength(ff.getSize()); + final String contentType = context.getProperty(CONTENT_TYPE) + .evaluateAttributeExpressions(ff).getValue(); + if (contentType != null) { + objectMetadata.setContentType(contentType); + } + final String expirationRule = context.getProperty(EXPIRATION_RULE_ID) .evaluateAttributeExpressions(ff).getValue(); if (expirationRule != null) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index ea6be48c4a..28c67503dc 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -237,6 +237,26 @@ public class ITPutS3Object extends AbstractS3IT { } } + @Test + public void testContentType() throws IOException { + PutS3Object processor = new PutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + runner.setProperty(PutS3Object.CONTENT_TYPE, "text/plain"); + + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME)); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + MockFlowFile ff1 = flowFiles.get(0); + ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_TYPE, "text/plain"); + } + @Test public void testPutInFolder() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());