NIFI-2810 Allow Content Type to be set in PutS3Object processor

This closes #1034.
This commit is contained in:
Edgardo 2016-09-20 11:08:39 -04:00 committed by Pierre Villard
parent f06aeaee2a
commit b34de74db2
2 changed files with 43 additions and 1 deletions

View File

@ -109,6 +109,7 @@ import com.amazonaws.services.s3.model.UploadPartResult;
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"), @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"),
@WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"), @WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"),
@WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"),
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"), @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"), @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
@WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"), @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
@ -132,6 +133,20 @@ public class PutS3Object extends AbstractS3Processor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content Type")
.displayName("Content Type")
.description("Sets the Content-Type HTTP header indicating the type of content stored in the associated " +
"object. The value of this header is a standard MIME type.\n" +
"AWS S3 Java client will attempt to determine the correct content type if one hasn't been set" +
" yet. Users are responsible for ensuring a suitable content type is set when uploading streams. If " +
"no content type is provided and cannot be determined by the filename, the default content type " +
"\"application/octet-stream\" will be used.")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder() public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
.name("Storage Class") .name("Storage Class")
.required(true) .required(true)
@ -190,13 +205,14 @@ public class PutS3Object extends AbstractS3Processor {
.build(); .build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE,
ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION,
PROXY_HOST, PROXY_HOST_PORT)); PROXY_HOST, PROXY_HOST_PORT));
final static String S3_BUCKET_KEY = "s3.bucket"; final static String S3_BUCKET_KEY = "s3.bucket";
final static String S3_OBJECT_KEY = "s3.key"; final static String S3_OBJECT_KEY = "s3.key";
final static String S3_CONTENT_TYPE = "s3.contenttype";
final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId"; final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
final static String S3_VERSION_ATTR_KEY = "s3.version"; final static String S3_VERSION_ATTR_KEY = "s3.version";
final static String S3_ETAG_ATTR_KEY = "s3.etag"; final static String S3_ETAG_ATTR_KEY = "s3.etag";
@ -406,6 +422,12 @@ public class PutS3Object extends AbstractS3Processor {
objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key())); objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
objectMetadata.setContentLength(ff.getSize()); objectMetadata.setContentLength(ff.getSize());
final String contentType = context.getProperty(CONTENT_TYPE)
.evaluateAttributeExpressions(ff).getValue();
if (contentType != null) {
objectMetadata.setContentType(contentType);
}
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID) final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
.evaluateAttributeExpressions(ff).getValue(); .evaluateAttributeExpressions(ff).getValue();
if (expirationRule != null) { if (expirationRule != null) {

View File

@ -237,6 +237,26 @@ public class ITPutS3Object extends AbstractS3IT {
} }
} }
@Test
public void testContentType() throws IOException {
PutS3Object processor = new PutS3Object();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.CONTENT_TYPE, "text/plain");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
MockFlowFile ff1 = flowFiles.get(0);
ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_TYPE, "text/plain");
}
@Test @Test
public void testPutInFolder() throws IOException { public void testPutInFolder() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());