mirror of https://github.com/apache/nifi.git
NIFI-7664 Add Content Disposition property to PutS3Object processor
Add new property 'Content Disposition' to allow user to set the content-disposition http header on the S3 object. Allowed values are 'inline' (default) and 'attachment'. If 'attachment' is selected, the filename will be set to the S3 Object key. Remove default value and keep backward compatibility Update fetchS3Object filename attribute settin Update constant names Update order of if-else condition NIFI-7664 Update condition in FetchS3Processor NIFI-7664 Undo the unexpected indent NIFI-7664 Update international chars unit test NIFI-7664 Set fetchS3 file path name NIFI-7664 Update code style This closes #4423. Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
parent
255d0c0f19
commit
ddb304a927
|
@ -165,14 +165,12 @@ public class FetchS3Object extends AbstractS3Processor {
|
||||||
|
|
||||||
final ObjectMetadata metadata = s3Object.getObjectMetadata();
|
final ObjectMetadata metadata = s3Object.getObjectMetadata();
|
||||||
if (metadata.getContentDisposition() != null) {
|
if (metadata.getContentDisposition() != null) {
|
||||||
final String fullyQualified = metadata.getContentDisposition();
|
final String contentDisposition = metadata.getContentDisposition();
|
||||||
final int lastSlash = fullyQualified.lastIndexOf("/");
|
|
||||||
if (lastSlash > -1 && lastSlash < fullyQualified.length() - 1) {
|
if (contentDisposition.equals(PutS3Object.CONTENT_DISPOSITION_INLINE) || contentDisposition.startsWith("attachment; filename=")) {
|
||||||
attributes.put(CoreAttributes.PATH.key(), fullyQualified.substring(0, lastSlash));
|
setFilePathAttributes(attributes, key);
|
||||||
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), fullyQualified);
|
|
||||||
attributes.put(CoreAttributes.FILENAME.key(), fullyQualified.substring(lastSlash + 1));
|
|
||||||
} else {
|
} else {
|
||||||
attributes.put(CoreAttributes.FILENAME.key(), metadata.getContentDisposition());
|
setFilePathAttributes(attributes, contentDisposition);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (metadata.getContentMD5() != null) {
|
if (metadata.getContentMD5() != null) {
|
||||||
|
@ -231,4 +229,14 @@ public class FetchS3Object extends AbstractS3Processor {
|
||||||
session.getProvenanceReporter().fetch(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis);
|
session.getProvenanceReporter().fetch(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void setFilePathAttributes(Map<String, String> attributes, String filePathName) {
|
||||||
|
final int lastSlash = filePathName.lastIndexOf("/");
|
||||||
|
if (lastSlash > -1 && lastSlash < filePathName.length() - 1) {
|
||||||
|
attributes.put(CoreAttributes.PATH.key(), filePathName.substring(0, lastSlash));
|
||||||
|
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), filePathName);
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), filePathName.substring(lastSlash + 1));
|
||||||
|
} else {
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), filePathName);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,6 +117,7 @@ import com.amazonaws.services.s3.model.UploadPartResult;
|
||||||
@WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"),
|
@WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"),
|
||||||
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
|
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
|
||||||
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
|
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
|
||||||
|
@WritesAttribute(attribute = "s3.contentdisposition", description = "The content disposition of the S3 Object that put in S3"),
|
||||||
@WritesAttribute(attribute = "s3.cachecontrol", description = "The cache-control header of the S3 Object"),
|
@WritesAttribute(attribute = "s3.cachecontrol", description = "The cache-control header of the S3 Object"),
|
||||||
@WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
|
@WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
|
||||||
@WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " +
|
@WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " +
|
||||||
|
@ -131,6 +132,8 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L;
|
public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L;
|
||||||
public static final String PERSISTENCE_ROOT = "conf/state/";
|
public static final String PERSISTENCE_ROOT = "conf/state/";
|
||||||
public static final String NO_SERVER_SIDE_ENCRYPTION = "None";
|
public static final String NO_SERVER_SIDE_ENCRYPTION = "None";
|
||||||
|
public static final String CONTENT_DISPOSITION_INLINE = "inline";
|
||||||
|
public static final String CONTENT_DISPOSITION_ATTACHMENT = "attachment";
|
||||||
|
|
||||||
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
|
||||||
.name("Expiration Time Rule")
|
.name("Expiration Time Rule")
|
||||||
|
@ -153,6 +156,16 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor CONTENT_DISPOSITION = new PropertyDescriptor.Builder()
|
||||||
|
.name("Content Disposition")
|
||||||
|
.displayName("Content Disposition")
|
||||||
|
.description("Sets the Content-Disposition HTTP header indicating if the content is intended to be displayed inline or should be downloaded.\n " +
|
||||||
|
"Possible values are 'inline' or 'attachment'. If this property is not specified, object's content-disposition will be set to filename. " +
|
||||||
|
"When 'attachment' is selected, '; filename=' plus object key are automatically appended to form final value 'attachment; filename=\"filename.jpg\"'.")
|
||||||
|
.required(false)
|
||||||
|
.allowableValues(CONTENT_DISPOSITION_INLINE, CONTENT_DISPOSITION_ATTACHMENT)
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor CACHE_CONTROL = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor CACHE_CONTROL = new PropertyDescriptor.Builder()
|
||||||
.name("Cache Control")
|
.name("Cache Control")
|
||||||
.displayName("Cache Control")
|
.displayName("Cache Control")
|
||||||
|
@ -242,7 +255,7 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, CACHE_CONTROL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX,
|
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, CONTENT_DISPOSITION, CACHE_CONTROL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX,
|
||||||
STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
|
STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
|
||||||
CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL,
|
CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL,
|
||||||
MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, USE_CHUNKED_ENCODING, USE_PATH_STYLE_ACCESS,
|
MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, USE_CHUNKED_ENCODING, USE_PATH_STYLE_ACCESS,
|
||||||
|
@ -251,6 +264,7 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
final static String S3_BUCKET_KEY = "s3.bucket";
|
final static String S3_BUCKET_KEY = "s3.bucket";
|
||||||
final static String S3_OBJECT_KEY = "s3.key";
|
final static String S3_OBJECT_KEY = "s3.key";
|
||||||
final static String S3_CONTENT_TYPE = "s3.contenttype";
|
final static String S3_CONTENT_TYPE = "s3.contenttype";
|
||||||
|
final static String S3_CONTENT_DISPOSITION = "s3.contentdisposition";
|
||||||
final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
|
final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
|
||||||
final static String S3_VERSION_ATTR_KEY = "s3.version";
|
final static String S3_VERSION_ATTR_KEY = "s3.version";
|
||||||
final static String S3_ETAG_ATTR_KEY = "s3.etag";
|
final static String S3_ETAG_ATTR_KEY = "s3.etag";
|
||||||
|
@ -462,7 +476,6 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
public void process(final InputStream rawIn) throws IOException {
|
public void process(final InputStream rawIn) throws IOException {
|
||||||
try (final InputStream in = new BufferedInputStream(rawIn)) {
|
try (final InputStream in = new BufferedInputStream(rawIn)) {
|
||||||
final ObjectMetadata objectMetadata = new ObjectMetadata();
|
final ObjectMetadata objectMetadata = new ObjectMetadata();
|
||||||
objectMetadata.setContentDisposition(URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), "UTF-8"));
|
|
||||||
objectMetadata.setContentLength(ff.getSize());
|
objectMetadata.setContentLength(ff.getSize());
|
||||||
|
|
||||||
final String contentType = context.getProperty(CONTENT_TYPE)
|
final String contentType = context.getProperty(CONTENT_TYPE)
|
||||||
|
@ -479,6 +492,19 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
attributes.put(S3_CACHE_CONTROL, cacheControl);
|
attributes.put(S3_CACHE_CONTROL, cacheControl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String contentDisposition = context.getProperty(CONTENT_DISPOSITION).getValue();
|
||||||
|
String fileName = URLEncoder.encode(ff.getAttribute(CoreAttributes.FILENAME.key()), "UTF-8");
|
||||||
|
if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_INLINE)) {
|
||||||
|
objectMetadata.setContentDisposition(CONTENT_DISPOSITION_INLINE);
|
||||||
|
attributes.put(S3_CONTENT_DISPOSITION, CONTENT_DISPOSITION_INLINE);
|
||||||
|
} else if (contentDisposition != null && contentDisposition.equals(CONTENT_DISPOSITION_ATTACHMENT)) {
|
||||||
|
String contentDispositionValue = CONTENT_DISPOSITION_ATTACHMENT + "; filename=\"" + fileName + "\"";
|
||||||
|
objectMetadata.setContentDisposition(contentDispositionValue);
|
||||||
|
attributes.put(S3_CONTENT_DISPOSITION, contentDispositionValue);
|
||||||
|
} else {
|
||||||
|
objectMetadata.setContentDisposition(fileName);
|
||||||
|
}
|
||||||
|
|
||||||
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
|
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
|
||||||
.evaluateAttributeExpressions(ff).getValue();
|
.evaluateAttributeExpressions(ff).getValue();
|
||||||
if (expirationRule != null) {
|
if (expirationRule != null) {
|
||||||
|
|
|
@ -292,12 +292,74 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||||
ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_TYPE, "text/plain");
|
ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_TYPE, "text/plain");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContentDispositionInline() throws IOException {
|
||||||
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.CONTENT_DISPOSITION, PutS3Object.CONTENT_DISPOSITION_INLINE);
|
||||||
|
|
||||||
|
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
|
||||||
|
MockFlowFile ff1 = flowFiles.get(0);
|
||||||
|
ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_DISPOSITION, PutS3Object.CONTENT_DISPOSITION_INLINE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContentDispositionNull() throws IOException {
|
||||||
|
// Put
|
||||||
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
|
attrs.put("filename", "filename-on-s3.txt");
|
||||||
|
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
|
||||||
|
List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
runner = TestRunners.newTestRunner(new FetchS3Object());
|
||||||
|
|
||||||
|
runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(FetchS3Object.REGION, REGION);
|
||||||
|
runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
|
||||||
|
runner.enqueue(new byte[0], attrs);
|
||||||
|
|
||||||
|
runner.run(1);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
|
||||||
|
ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
|
||||||
|
MockFlowFile ff = ffs.get(0);
|
||||||
|
ff.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
ff.assertAttributeNotExists(PutS3Object.S3_CONTENT_DISPOSITION);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContentDispositionAttachment() throws IOException {
|
||||||
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.CONTENT_DISPOSITION, PutS3Object.CONTENT_DISPOSITION_ATTACHMENT);
|
||||||
|
|
||||||
|
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
|
||||||
|
MockFlowFile ff1 = flowFiles.get(0);
|
||||||
|
ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_DISPOSITION, "attachment; filename=\"hello.txt\"");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCacheControl() throws IOException {
|
public void testCacheControl() throws IOException {
|
||||||
TestRunner runner = initTestRunner();
|
TestRunner runner = initTestRunner();
|
||||||
|
|
||||||
runner.setProperty(PutS3Object.CACHE_CONTROL, "no-cache");
|
runner.setProperty(PutS3Object.CACHE_CONTROL, "no-cache");
|
||||||
|
|
||||||
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
|
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
|
||||||
|
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
package org.apache.nifi.processors.aws.s3;
|
package org.apache.nifi.processors.aws.s3;
|
||||||
|
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.net.URLDecoder;
|
import java.net.URLEncoder;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -64,6 +64,7 @@ public class TestPutS3Object {
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
mockS3Client = Mockito.mock(AmazonS3Client.class);
|
mockS3Client = Mockito.mock(AmazonS3Client.class);
|
||||||
putS3Object = new PutS3Object() {
|
putS3Object = new PutS3Object() {
|
||||||
|
@Override
|
||||||
protected AmazonS3Client getClient() {
|
protected AmazonS3Client getClient() {
|
||||||
return mockS3Client;
|
return mockS3Client;
|
||||||
}
|
}
|
||||||
|
@ -176,7 +177,7 @@ public class TestPutS3Object {
|
||||||
PutObjectRequest request = captureRequest.getValue();
|
PutObjectRequest request = captureRequest.getValue();
|
||||||
|
|
||||||
ObjectMetadata objectMetadata = request.getMetadata();
|
ObjectMetadata objectMetadata = request.getMetadata();
|
||||||
assertEquals("Iñtërnâtiônàližætiøn.txt", URLDecoder.decode(objectMetadata.getContentDisposition(), "UTF-8"));
|
assertEquals(URLEncoder.encode("Iñtërnâtiônàližætiøn.txt", "UTF-8"), objectMetadata.getContentDisposition());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void prepareTest() {
|
private void prepareTest() {
|
||||||
|
@ -209,7 +210,7 @@ public class TestPutS3Object {
|
||||||
public void testGetPropertyDescriptors() {
|
public void testGetPropertyDescriptors() {
|
||||||
PutS3Object processor = new PutS3Object();
|
PutS3Object processor = new PutS3Object();
|
||||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||||
assertEquals("size should be eq", 37, pd.size());
|
assertEquals("size should be eq", 38, pd.size());
|
||||||
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
|
||||||
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
|
||||||
assertTrue(pd.contains(PutS3Object.BUCKET));
|
assertTrue(pd.contains(PutS3Object.BUCKET));
|
||||||
|
@ -242,6 +243,7 @@ public class TestPutS3Object {
|
||||||
assertTrue(pd.contains(PutS3Object.OBJECT_TAGS_PREFIX));
|
assertTrue(pd.contains(PutS3Object.OBJECT_TAGS_PREFIX));
|
||||||
assertTrue(pd.contains(PutS3Object.REMOVE_TAG_PREFIX));
|
assertTrue(pd.contains(PutS3Object.REMOVE_TAG_PREFIX));
|
||||||
assertTrue(pd.contains(PutS3Object.CONTENT_TYPE));
|
assertTrue(pd.contains(PutS3Object.CONTENT_TYPE));
|
||||||
|
assertTrue(pd.contains(PutS3Object.CONTENT_DISPOSITION));
|
||||||
assertTrue(pd.contains(PutS3Object.CACHE_CONTROL));
|
assertTrue(pd.contains(PutS3Object.CACHE_CONTROL));
|
||||||
assertTrue(pd.contains(PutS3Object.MULTIPART_THRESHOLD));
|
assertTrue(pd.contains(PutS3Object.MULTIPART_THRESHOLD));
|
||||||
assertTrue(pd.contains(PutS3Object.MULTIPART_PART_SIZE));
|
assertTrue(pd.contains(PutS3Object.MULTIPART_PART_SIZE));
|
||||||
|
|
Loading…
Reference in New Issue