NIFI-6332: Add Cache Control property to PutS3Object processor

Add new property 'Cache Control' to allow user to
set the cache-control http header on the S3 object.

This property is not required, and has no default value.

The implementation is similar to the Content-Type property,
except that this property does not allow Expression Language.

Update property description

Add support EL for cache-control property

This closes #4422.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Kent Nguyen 2020-07-23 17:42:55 +07:00 committed by Peter Turcsanyi
parent 455f48fce4
commit 0cff54097e
3 changed files with 37 additions and 2 deletions

View File

@ -117,6 +117,7 @@ import com.amazonaws.services.s3.model.UploadPartResult;
@WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"), @WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"),
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"), @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"), @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
@WritesAttribute(attribute = "s3.cachecontrol", description = "The cache-control header of the S3 Object"),
@WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"), @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
@WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " + @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " +
"the S3 object, if one is set"), "the S3 object, if one is set"),
@ -152,6 +153,15 @@ public class PutS3Object extends AbstractS3Processor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor CACHE_CONTROL = new PropertyDescriptor.Builder()
.name("Cache Control")
.displayName("Cache Control")
.description("Sets the Cache-Control HTTP header indicating the caching directives of the associated object. Multiple directives are comma-separated.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder() public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
.name("Storage Class") .name("Storage Class")
.required(true) .required(true)
@ -232,7 +242,7 @@ public class PutS3Object extends AbstractS3Processor {
.build(); .build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX, Arrays.asList(KEY, BUCKET, CONTENT_TYPE, CACHE_CONTROL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX,
STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL,
MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, USE_CHUNKED_ENCODING, USE_PATH_STYLE_ACCESS, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, USE_CHUNKED_ENCODING, USE_PATH_STYLE_ACCESS,
@ -244,6 +254,7 @@ public class PutS3Object extends AbstractS3Processor {
final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId"; final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
final static String S3_VERSION_ATTR_KEY = "s3.version"; final static String S3_VERSION_ATTR_KEY = "s3.version";
final static String S3_ETAG_ATTR_KEY = "s3.etag"; final static String S3_ETAG_ATTR_KEY = "s3.etag";
final static String S3_CACHE_CONTROL = "s3.cachecontrol";
final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration"; final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration";
final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass"; final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass";
final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class"; final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class";
@ -461,6 +472,13 @@ public class PutS3Object extends AbstractS3Processor {
attributes.put(S3_CONTENT_TYPE, contentType); attributes.put(S3_CONTENT_TYPE, contentType);
} }
final String cacheControl = context.getProperty(CACHE_CONTROL)
.evaluateAttributeExpressions(ff).getValue();
if (cacheControl != null) {
objectMetadata.setCacheControl(cacheControl);
attributes.put(S3_CACHE_CONTROL, cacheControl);
}
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID) final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
.evaluateAttributeExpressions(ff).getValue(); .evaluateAttributeExpressions(ff).getValue();
if (expirationRule != null) { if (expirationRule != null) {

View File

@ -292,6 +292,22 @@ public class ITPutS3Object extends AbstractS3IT {
ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_TYPE, "text/plain"); ff1.assertAttributeEquals(PutS3Object.S3_CONTENT_TYPE, "text/plain");
} }
@Test
public void testCacheControl() throws IOException {
TestRunner runner = initTestRunner();
runner.setProperty(PutS3Object.CACHE_CONTROL, "no-cache");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
runner.run();
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
MockFlowFile ff1 = flowFiles.get(0);
ff1.assertAttributeEquals(PutS3Object.S3_CACHE_CONTROL, "no-cache");
}
@Test @Test
public void testPutInFolder() throws IOException { public void testPutInFolder() throws IOException {
TestRunner runner = initTestRunner(); TestRunner runner = initTestRunner();

View File

@ -209,7 +209,7 @@ public class TestPutS3Object {
public void testGetPropertyDescriptors() { public void testGetPropertyDescriptors() {
PutS3Object processor = new PutS3Object(); PutS3Object processor = new PutS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 36, pd.size()); assertEquals("size should be eq", 37, pd.size());
assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(PutS3Object.BUCKET)); assertTrue(pd.contains(PutS3Object.BUCKET));
@ -242,6 +242,7 @@ public class TestPutS3Object {
assertTrue(pd.contains(PutS3Object.OBJECT_TAGS_PREFIX)); assertTrue(pd.contains(PutS3Object.OBJECT_TAGS_PREFIX));
assertTrue(pd.contains(PutS3Object.REMOVE_TAG_PREFIX)); assertTrue(pd.contains(PutS3Object.REMOVE_TAG_PREFIX));
assertTrue(pd.contains(PutS3Object.CONTENT_TYPE)); assertTrue(pd.contains(PutS3Object.CONTENT_TYPE));
assertTrue(pd.contains(PutS3Object.CACHE_CONTROL));
assertTrue(pd.contains(PutS3Object.MULTIPART_THRESHOLD)); assertTrue(pd.contains(PutS3Object.MULTIPART_THRESHOLD));
assertTrue(pd.contains(PutS3Object.MULTIPART_PART_SIZE)); assertTrue(pd.contains(PutS3Object.MULTIPART_PART_SIZE));
assertTrue(pd.contains(PutS3Object.MULTIPART_S3_AGEOFF_INTERVAL)); assertTrue(pd.contains(PutS3Object.MULTIPART_S3_AGEOFF_INTERVAL));