diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 4f68a98358..f985e4d19c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -165,14 +165,12 @@ public class FetchS3Object extends AbstractS3Processor { final ObjectMetadata metadata = s3Object.getObjectMetadata(); if (metadata.getContentDisposition() != null) { - final String fullyQualified = metadata.getContentDisposition(); - final int lastSlash = fullyQualified.lastIndexOf("/"); - if (lastSlash > -1 && lastSlash < fullyQualified.length() - 1) { - attributes.put(CoreAttributes.PATH.key(), fullyQualified.substring(0, lastSlash)); - attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), fullyQualified); - attributes.put(CoreAttributes.FILENAME.key(), fullyQualified.substring(lastSlash + 1)); + final String contentDisposition = metadata.getContentDisposition(); + + if (contentDisposition.equals(PutS3Object.CONTENT_DISPOSITION_INLINE) || contentDisposition.startsWith("attachment; filename=")) { + setFilePathAttributes(attributes, key); } else { - attributes.put(CoreAttributes.FILENAME.key(), metadata.getContentDisposition()); + setFilePathAttributes(attributes, contentDisposition); } } if (metadata.getContentMD5() != null) { @@ -231,4 +229,14 @@ public class FetchS3Object extends AbstractS3Processor { session.getProvenanceReporter().fetch(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis); } + protected void setFilePathAttributes(Map attributes, String filePathName) { + final int lastSlash = filePathName.lastIndexOf("/"); + if (lastSlash > -1 && lastSlash < filePathName.length() - 1) { + attributes.put(CoreAttributes.PATH.key(), filePathName.substring(0, lastSlash)); + attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), filePathName); + attributes.put(CoreAttributes.FILENAME.key(), filePathName.substring(lastSlash + 1)); + } else { + attributes.put(CoreAttributes.FILENAME.key(), filePathName); + } + } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index aa2ced6a23..c36c858f75 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -117,6 +117,7 @@ import com.amazonaws.services.s3.model.UploadPartResult; @WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"), @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"), @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"), + @WritesAttribute(attribute = "s3.contentdisposition", description = "The content disposition of the S3 Object that put in S3"), @WritesAttribute(attribute = "s3.cachecontrol", description = "The cache-control header of the S3 Object"), @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"), @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " + @@ -131,6 +132,8 @@ public class PutS3Object extends AbstractS3Processor { public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L; public static final String PERSISTENCE_ROOT = "conf/state/"; public static final String NO_SERVER_SIDE_ENCRYPTION = "None"; + public static final String CONTENT_DISPOSITION_INLINE = "inline"; + public static final String CONTENT_DISPOSITION_ATTACHMENT = "attachment"; public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder() .name("Expiration Time Rule") @@ -153,6 +156,16 @@ public class PutS3Object extends AbstractS3Processor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor CONTENT_DISPOSITION = new PropertyDescriptor.Builder() + .name("Content Disposition") + .displayName("Content Disposition") + .description("Sets the Content-Disposition HTTP header indicating if the content is intended to be displayed inline or should be downloaded.\n " + + "Possible values are 'inline' or 'attachment'. If this property is not specified, object's content-disposition will be set to filename. " + + "When 'attachment' is selected, '; filename=' plus object key are automatically appended to form final value 'attachment; filename=\"filename.jpg\"'.") + .required(false) + .allowableValues(CONTENT_DISPOSITION_INLINE, CONTENT_DISPOSITION_ATTACHMENT) + .build(); + public static final PropertyDescriptor CACHE_CONTROL = new PropertyDescriptor.Builder() .name("Cache Control") .displayName("Cache Control") @@ -242,7 +255,7 @@ public class PutS3Object extends AbstractS3Processor { .build(); public static final List properties = Collections.unmodifiableList( - Arrays.asList(KEY, BUCKET, CONTENT_TYPE, CACHE_CONTROL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX, + Arrays.asList(KEY, BUCKET, CONTENT_TYPE, CONTENT_DISPOSITION, CACHE_CONTROL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, USE_CHUNKED_ENCODING, USE_PATH_STYLE_ACCESS, @@ -251,6 +264,7 @@ public class PutS3Object extends AbstractS3Processor { final static String S3_BUCKET_KEY = "s3.bucket"; final static String S3_OBJECT_KEY = "s3.key"; final static String S3_CONTENT_TYPE = "s3.contenttype"; + final static String S3_CONTENT_DISPOSITION = "s3.contentdisposition"; final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId"; final static String S3_VERSION_ATTR_KEY = "s3.version"; final static String S3_ETAG_ATTR_KEY = "s3.etag"; @@ -462,7 +476,6 @@ public class PutS3Object extends AbstractS3Processor { public void process(final InputStream rawIn) throws IOException { try (final InputStream in = new BufferedInputStream(rawIn)) { final ObjectMetadata objectMetadata = new ObjectMetadata(); - objectMetadata.setContentDisposition(URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), "UTF-8")); objectMetadata.setContentLength(ff.getSize()); final String contentType = context.getProperty(CONTENT_TYPE) @@ -479,6 +492,19 @@ public class PutS3Object extends AbstractS3Processor { attributes.put(S3_CACHE_CONTROL, cacheControl); } + final String contentDisposition = context.getProperty(CONTENT_DISPOSITION).getValue(); + String fileName = URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), "UTF-8"); + if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) { + objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE); + attributes.put(S3_CONTENT_DISPOSITION, CONTENT_DISPOSITION_INLINE); + } else if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) { + String contentDispositionValue = CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\""; + objectMetadata.setContentDisposition(contentDispositionValue); + attributes.put(S3_CONTENT_DISPOSITION, contentDispositionValue); + } else { + objectMetadata.setContentDisposition(fileName); + } + final String expirationRule = context.getProperty(EXPIRATION_RULE_ID) .evaluateAttributeExpressions(ff).getValue(); if (expirationRule != null) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index 8f93c76974..fac1e86d19 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -292,12 +292,74 @@ public class ITPutS3Object extends AbstractS3IT { ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_TYPE, "text/plain"); } + @Test + public void testContentDispositionInline() throws IOException { + TestRunner runner = initTestRunner(); + + runner.setProperty(PutS3Object.CONTENT_DISPOSITION, PutS3Object.CONTENT_DISPOSITION_INLINE); + + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME)); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + MockFlowFile ff1 = flowFiles.get(0); + ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_DISPOSITION, PutS3Object.CONTENT_DISPOSITION_INLINE); + } + + @Test + public void testContentDispositionNull() throws IOException { + // Put + TestRunner runner = initTestRunner(); + + final Map attrs = new HashMap<>(); + attrs.put("filename", "filename-on-s3.txt"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + List ffs = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + + // Fetch + runner = TestRunners.newTestRunner(new FetchS3Object()); + + runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(FetchS3Object.REGION, REGION); + runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME); + + runner.enqueue(new byte[0], attrs); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); + ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS); + MockFlowFile ff = ffs.get(0); + ff.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + ff.assertAttributeNotExists(PutS3Object.S3_CONTENT_DISPOSITION); + } + + @Test + public void testContentDispositionAttachment() throws IOException { + TestRunner runner = initTestRunner(); + + runner.setProperty(PutS3Object.CONTENT_DISPOSITION, PutS3Object.CONTENT_DISPOSITION_ATTACHMENT); + + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME)); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + MockFlowFile ff1 = flowFiles.get(0); + ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_DISPOSITION, "attachment; filename=\"hello.txt\""); + } + @Test public void testCacheControl() throws IOException { TestRunner runner = initTestRunner(); runner.setProperty(PutS3Object.CACHE_CONTROL, "no-cache"); - runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME)); runner.run(); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java index c66b767dca..3e3abdf1f9 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java @@ -17,7 +17,7 @@ package org.apache.nifi.processors.aws.s3; import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; +import java.net.URLEncoder; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -64,6 +64,7 @@ public class TestPutS3Object { public void setUp() { mockS3Client = Mockito.mock(AmazonS3Client.class); putS3Object = new PutS3Object() { + @Override protected AmazonS3Client getClient() { return mockS3Client; } @@ -176,7 +177,7 @@ public class TestPutS3Object { PutObjectRequest request = captureRequest.getValue(); ObjectMetadata objectMetadata = request.getMetadata(); - assertEquals("Iñtërnâtiônàližætiøn.txt", URLDecoder.decode(objectMetadata.getContentDisposition(), "UTF-8")); + assertEquals(URLEncoder.encode("Iñtërnâtiônàližætiøn.txt", "UTF-8"), objectMetadata.getContentDisposition()); } private void prepareTest() { @@ -209,7 +210,7 @@ public class TestPutS3Object { public void testGetPropertyDescriptors() { PutS3Object processor = new PutS3Object(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 37, pd.size()); + assertEquals("size should be eq", 38, pd.size()); assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(PutS3Object.BUCKET)); @@ -242,6 +243,7 @@ public class TestPutS3Object { assertTrue(pd.contains(PutS3Object.OBJECT_TAGS_PREFIX)); assertTrue(pd.contains(PutS3Object.REMOVE_TAG_PREFIX)); assertTrue(pd.contains(PutS3Object.CONTENT_TYPE)); + assertTrue(pd.contains(PutS3Object.CONTENT_DISPOSITION)); assertTrue(pd.contains(PutS3Object.CACHE_CONTROL)); assertTrue(pd.contains(PutS3Object.MULTIPART_THRESHOLD)); assertTrue(pd.contains(PutS3Object.MULTIPART_PART_SIZE));