From ddb304a927b372c518ae84572315f07b75733a28 Mon Sep 17 00:00:00 2001 From: Kent Nguyen Date: Thu, 30 Jul 2020 21:45:48 +0700 Subject: [PATCH] NIFI-7664 Add Content Disposition property to PutS3Object processor Add new property 'Content Disposition' to allow user to set the content-disposition http header on the S3 object. Allowed values are 'inline' (default) and 'attachment'. If 'attachment' is selected, the filename will be set to the S3 Object key. Remove default value and keep backward compatibility Update fetchS3Object filename attribute settin Update constant names Update order of if-else condition NIFI-7664 Update condition in FetchS3Processor NIFI-7664 Undo the unexpected indent NIFI-7664 Update international chars unit test NIFI-7664 Set fetchS3 file path name NIFI-7664 Update code style This closes #4423. Signed-off-by: Peter Turcsanyi --- .../nifi/processors/aws/s3/FetchS3Object.java | 22 +++++-- .../nifi/processors/aws/s3/PutS3Object.java | 30 ++++++++- .../nifi/processors/aws/s3/ITPutS3Object.java | 64 ++++++++++++++++++- .../processors/aws/s3/TestPutS3Object.java | 8 ++- 4 files changed, 111 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 4f68a98358..f985e4d19c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -165,14 +165,12 @@ public class FetchS3Object extends AbstractS3Processor { final ObjectMetadata metadata = s3Object.getObjectMetadata(); if (metadata.getContentDisposition() != null) { - final String fullyQualified = metadata.getContentDisposition(); - final int lastSlash = fullyQualified.lastIndexOf("/"); - if (lastSlash > -1 && lastSlash < fullyQualified.length() - 1) { - attributes.put(CoreAttributes.PATH.key(), fullyQualified.substring(0, lastSlash)); - attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), fullyQualified); - attributes.put(CoreAttributes.FILENAME.key(), fullyQualified.substring(lastSlash + 1)); + final String contentDisposition = metadata.getContentDisposition(); + + if (contentDisposition.equals(PutS3Object.CONTENT_DISPOSITION_INLINE) || contentDisposition.startsWith("attachment; filename=")) { + setFilePathAttributes(attributes, key); } else { - attributes.put(CoreAttributes.FILENAME.key(), metadata.getContentDisposition()); + setFilePathAttributes(attributes, contentDisposition); } } if (metadata.getContentMD5() != null) { @@ -231,4 +229,14 @@ public class FetchS3Object extends AbstractS3Processor { session.getProvenanceReporter().fetch(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis); } + protected void setFilePathAttributes(Map attributes, String filePathName) { + final int lastSlash = filePathName.lastIndexOf("/"); + if (lastSlash > -1 && lastSlash < filePathName.length() - 1) { + attributes.put(CoreAttributes.PATH.key(), filePathName.substring(0, lastSlash)); + attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), filePathName); + attributes.put(CoreAttributes.FILENAME.key(), filePathName.substring(lastSlash + 1)); + } else { + attributes.put(CoreAttributes.FILENAME.key(), filePathName); + } + } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index aa2ced6a23..c36c858f75 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -117,6 +117,7 @@ import com.amazonaws.services.s3.model.UploadPartResult; @WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"), @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"), @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"), + @WritesAttribute(attribute = "s3.contentdisposition", description = "The content disposition of the S3 Object that put in S3"), @WritesAttribute(attribute = "s3.cachecontrol", description = "The cache-control header of the S3 Object"), @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"), @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " + @@ -131,6 +132,8 @@ public class PutS3Object extends AbstractS3Processor { public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L; public static final String PERSISTENCE_ROOT = "conf/state/"; public static final String NO_SERVER_SIDE_ENCRYPTION = "None"; + public static final String CONTENT_DISPOSITION_INLINE = "inline"; + public static final String CONTENT_DISPOSITION_ATTACHMENT = "attachment"; public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder() .name("Expiration Time Rule") @@ -153,6 +156,16 @@ public class PutS3Object extends AbstractS3Processor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor CONTENT_DISPOSITION = new PropertyDescriptor.Builder() + .name("Content Disposition") + .displayName("Content Disposition") + .description("Sets the Content-Disposition HTTP header indicating if the content is intended to be displayed inline or should be downloaded.\n " + + "Possible values are 'inline' or 'attachment'. If this property is not specified, object's content-disposition will be set to filename. " + + "When 'attachment' is selected, '; filename=' plus object key are automatically appended to form final value 'attachment; filename=\"filename.jpg\"'.") + .required(false) + .allowableValues(CONTENT_DISPOSITION_INLINE, CONTENT_DISPOSITION_ATTACHMENT) + .build(); + public static final PropertyDescriptor CACHE_CONTROL = new PropertyDescriptor.Builder() .name("Cache Control") .displayName("Cache Control") @@ -242,7 +255,7 @@ public class PutS3Object extends AbstractS3Processor { .build(); public static final List properties = Collections.unmodifiableList( - Arrays.asList(KEY, BUCKET, CONTENT_TYPE, CACHE_CONTROL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX, + Arrays.asList(KEY, BUCKET, CONTENT_TYPE, CONTENT_DISPOSITION, CACHE_CONTROL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, USE_CHUNKED_ENCODING, USE_PATH_STYLE_ACCESS, @@ -251,6 +264,7 @@ public class PutS3Object extends AbstractS3Processor { final static String S3_BUCKET_KEY = "s3.bucket"; final static String S3_OBJECT_KEY = "s3.key"; final static String S3_CONTENT_TYPE = "s3.contenttype"; + final static String S3_CONTENT_DISPOSITION = "s3.contentdisposition"; final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId"; final static String S3_VERSION_ATTR_KEY = "s3.version"; final static String S3_ETAG_ATTR_KEY = "s3.etag"; @@ -462,7 +476,6 @@ public class PutS3Object extends AbstractS3Processor { public void process(final InputStream rawIn) throws IOException { try (final InputStream in = new BufferedInputStream(rawIn)) { final ObjectMetadata objectMetadata = new ObjectMetadata(); - objectMetadata.setContentDisposition(URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), "UTF-8")); objectMetadata.setContentLength(ff.getSize()); final String contentType = context.getProperty(CONTENT_TYPE) @@ -479,6 +492,19 @@ public class PutS3Object extends AbstractS3Processor { attributes.put(S3_CACHE_CONTROL, cacheControl); } + final String contentDisposition = context.getProperty(CONTENT_DISPOSITION).getValue(); + String fileName = URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), "UTF-8"); + if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) { + objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE); + attributes.put(S3_CONTENT_DISPOSITION, CONTENT_DISPOSITION_INLINE); + } else if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) { + String contentDispositionValue = CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\""; + objectMetadata.setContentDisposition(contentDispositionValue); + attributes.put(S3_CONTENT_DISPOSITION, contentDispositionValue); + } else { + objectMetadata.setContentDisposition(fileName); + } + final String expirationRule = context.getProperty(EXPIRATION_RULE_ID) .evaluateAttributeExpressions(ff).getValue(); if (expirationRule != null) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index 8f93c76974..fac1e86d19 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -292,12 +292,74 @@ public class ITPutS3Object extends AbstractS3IT { ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_TYPE, "text/plain"); } + @Test + public void testContentDispositionInline() throws IOException { + TestRunner runner = initTestRunner(); + + runner.setProperty(PutS3Object.CONTENT_DISPOSITION, PutS3Object.CONTENT_DISPOSITION_INLINE); + + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME)); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + MockFlowFile ff1 = flowFiles.get(0); + ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_DISPOSITION, PutS3Object.CONTENT_DISPOSITION_INLINE); + } + + @Test + public void testContentDispositionNull() throws IOException { + // Put + TestRunner runner = initTestRunner(); + + final Map attrs = new HashMap<>(); + attrs.put("filename", "filename-on-s3.txt"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + List ffs = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + + // Fetch + runner = TestRunners.newTestRunner(new FetchS3Object()); + + runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(FetchS3Object.REGION, REGION); + runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME); + + runner.enqueue(new byte[0], attrs); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); + ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS); + MockFlowFile ff = ffs.get(0); + ff.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + ff.assertAttributeNotExists(PutS3Object.S3_CONTENT_DISPOSITION); + } + + @Test + public void testContentDispositionAttachment() throws IOException { + TestRunner runner = initTestRunner(); + + runner.setProperty(PutS3Object.CONTENT_DISPOSITION, PutS3Object.CONTENT_DISPOSITION_ATTACHMENT); + + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME)); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + MockFlowFile ff1 = flowFiles.get(0); + ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_DISPOSITION, "attachment; filename=\"hello.txt\""); + } + @Test public void testCacheControl() throws IOException { TestRunner runner = initTestRunner(); runner.setProperty(PutS3Object.CACHE_CONTROL, "no-cache"); - runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME)); runner.run(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java index c66b767dca..3e3abdf1f9 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java @@ -17,7 +17,7 @@ package org.apache.nifi.processors.aws.s3; import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; +import java.net.URLEncoder; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -64,6 +64,7 @@ public class TestPutS3Object { public void setUp() { mockS3Client = Mockito.mock(AmazonS3Client.class); putS3Object = new PutS3Object() { + @Override protected AmazonS3Client getClient() { return mockS3Client; } @@ -176,7 +177,7 @@ public class TestPutS3Object { PutObjectRequest request = captureRequest.getValue(); ObjectMetadata objectMetadata = request.getMetadata(); - assertEquals("Iñtërnâtiônàližætiøn.txt", URLDecoder.decode(objectMetadata.getContentDisposition(), "UTF-8")); + assertEquals(URLEncoder.encode("Iñtërnâtiônàližætiøn.txt", "UTF-8"), objectMetadata.getContentDisposition()); } private void prepareTest() { @@ -209,7 +210,7 @@ public class TestPutS3Object { public void testGetPropertyDescriptors() { PutS3Object processor = new PutS3Object(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 37, pd.size()); + assertEquals("size should be eq", 38, pd.size()); assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(PutS3Object.BUCKET)); @@ -242,6 +243,7 @@ public class TestPutS3Object { assertTrue(pd.contains(PutS3Object.OBJECT_TAGS_PREFIX)); assertTrue(pd.contains(PutS3Object.REMOVE_TAG_PREFIX)); assertTrue(pd.contains(PutS3Object.CONTENT_TYPE)); + assertTrue(pd.contains(PutS3Object.CONTENT_DISPOSITION)); assertTrue(pd.contains(PutS3Object.CACHE_CONTROL)); assertTrue(pd.contains(PutS3Object.MULTIPART_THRESHOLD)); assertTrue(pd.contains(PutS3Object.MULTIPART_PART_SIZE));