From 2bcc31330cafbba4f342e9ea464399fd6c7b575b Mon Sep 17 00:00:00 2001 From: Adam Lamar Date: Sat, 20 Feb 2016 23:12:56 -0700 Subject: [PATCH] NIFI-1180: Modify PutS3Object to enable encryption This closes #246. Signed-off-by: Andy LoPresto --- .../nifi/processors/aws/s3/FetchS3Object.java | 4 + .../nifi/processors/aws/s3/PutS3Object.java | 21 +++- .../PutS3ObjectTest.groovy | 95 +++++++++++++++++++ .../nifi/processors/aws/s3/AbstractS3IT.java | 9 ++ .../processors/aws/s3/ITFetchS3Object.java | 22 +++++ .../nifi/processors/aws/s3/ITPutS3Object.java | 29 +++++- 6 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/groovy/org.apache.nifi.processors.aws.s3/PutS3ObjectTest.groovy diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 7246bd32dc..4591a391eb 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -61,6 +61,7 @@ import com.amazonaws.services.s3.model.S3Object; @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), @WritesAttribute(attribute = "s3.expirationTime", description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"), @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"), + @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"), @WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),}) public class FetchS3Object extends AbstractS3Processor { @@ -137,6 +138,9 @@ public class FetchS3Object extends AbstractS3Processor { if (metadata.getUserMetadata() != null) { attributes.putAll(metadata.getUserMetadata()); } + if (metadata.getSSEAlgorithm() != null) { + attributes.put("s3.sseAlgorithm", metadata.getSSEAlgorithm()); + } if (metadata.getVersionId() != null) { attributes.put("s3.version", metadata.getVersionId()); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 87f00b6499..50d2453167 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -113,6 +113,7 @@ import com.amazonaws.services.s3.model.UploadPartResult; @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"), @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " + "the S3 object, if one is set"), + @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"), @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of " + "the S3 object, if any was set") }) @@ -121,6 +122,7 @@ public class PutS3Object extends AbstractS3Processor { public static final long MIN_S3_PART_SIZE = 50L * 1024L * 1024L; public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L; public static final String PERSISTENCE_ROOT = "conf/state/"; + public static final String NO_SERVER_SIDE_ENCRYPTION = "None"; public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder() .name("Expiration Time Rule") @@ -177,10 +179,20 @@ public class PutS3Object extends AbstractS3Processor { .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); + public static final PropertyDescriptor SERVER_SIDE_ENCRYPTION = new PropertyDescriptor.Builder() + .name("server-side-encryption") + .displayName("Server Side Encryption") + .description("Specifies the algorithm used for server side encryption.") + .required(true) + .allowableValues(NO_SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION) + .defaultValue(NO_SERVER_SIDE_ENCRYPTION) + .build(); + public static final List properties = Collections.unmodifiableList( Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, - ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, PROXY_HOST, PROXY_HOST_PORT)); + ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, + PROXY_HOST, PROXY_HOST_PORT)); final static String S3_BUCKET_KEY = "s3.bucket"; final static String S3_OBJECT_KEY = "s3.key"; @@ -194,6 +206,7 @@ public class PutS3Object extends AbstractS3Processor { final static String S3_API_METHOD_ATTR_KEY = "s3.apimethod"; final static String S3_API_METHOD_PUTOBJECT = "putobject"; final static String S3_API_METHOD_MULTIPARTUPLOAD = "multipartupload"; + final static String S3_SSE_ALGORITHM = "s3.sseAlgorithm"; final static String S3_PROCESS_UNSCHEDULED_MESSAGE = "Processor unscheduled, stopping upload"; @@ -407,6 +420,12 @@ public class PutS3Object extends AbstractS3Processor { } } + final String serverSideEncryption = context.getProperty(SERVER_SIDE_ENCRYPTION).getValue(); + if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) { + objectMetadata.setSSEAlgorithm(serverSideEncryption); + attributes.put(S3_SSE_ALGORITHM, serverSideEncryption); + } + if (!userMetadata.isEmpty()) { objectMetadata.setUserMetadata(userMetadata); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/groovy/org.apache.nifi.processors.aws.s3/PutS3ObjectTest.groovy b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/groovy/org.apache.nifi.processors.aws.s3/PutS3ObjectTest.groovy new file mode 100644 index 0000000000..05ef0620af --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/groovy/org.apache.nifi.processors.aws.s3/PutS3ObjectTest.groovy @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3 + +import com.amazonaws.services.s3.model.ObjectMetadata +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +@RunWith(JUnit4.class) +class PutS3ObjectTest extends GroovyTestCase { + private static final Logger logger = LoggerFactory.getLogger(PutS3ObjectTest.class); + + private static long mockFlowFileId = 0 + private PutS3Object putS3Object + + @BeforeClass + static void setUpOnce() { + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + void setUp() { + super.setUp() + + putS3Object = new PutS3Object() + + } + + @After + void tearDown() { + + } + + @Test + void testShouldIncludeServerSideEncryptionAlgorithmProperty() { + // Arrange + + // Act + def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors() + def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" } + + // Assert + assert ssePropertyDescriptor + assert ssePropertyDescriptor.name == "server-side-encryption" + assert ssePropertyDescriptor.displayName == "Server Side Encryption" + } + + @Test + void testShouldValidateServerSideEncryptionDefaultsToNone() { + // Arrange + + // Act + def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors() + def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" } + + // Assert + assert ssePropertyDescriptor + assert ssePropertyDescriptor.defaultValue == putS3Object.NO_SERVER_SIDE_ENCRYPTION + } + + @Test + void testShouldValidateServerSideEncryptionAllowableValues() { + // Arrange + + // Act + def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors() + def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" } + + // Assert + assert ssePropertyDescriptor + assert ssePropertyDescriptor.allowableValues*.toString() == [putS3Object.NO_SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION] + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java index 11fdffe2ff..697e2f2ced 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java @@ -22,6 +22,7 @@ import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CreateBucketRequest; import com.amazonaws.services.s3.model.DeleteBucketRequest; import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3ObjectSummary; import org.apache.nifi.util.file.FileUtils; @@ -132,6 +133,14 @@ public abstract class AbstractS3IT { client.putObject(putRequest); } + protected void putTestFileEncrypted(String key, File file) throws AmazonS3Exception, FileNotFoundException { + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, new FileInputStream(file), objectMetadata); + + client.putObject(putRequest); + } + protected Path getResourcePath(String resourceName) { Path path = null; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java index 8e5eb28cfe..db115bb450 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.aws.s3; +import com.amazonaws.services.s3.model.ObjectMetadata; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processors.aws.AbstractAWSProcessor; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; @@ -56,6 +57,27 @@ public class ITFetchS3Object extends AbstractS3IT { runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); } + @Test + public void testSimpleGetEncrypted() throws IOException { + putTestFileEncrypted("test-file", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + + final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object()); + + runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(FetchS3Object.REGION, REGION); + runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME); + + final Map attrs = new HashMap<>(); + attrs.put("filename", "test-file"); + runner.enqueue(new byte[0], attrs); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); + final List ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS); + ffs.get(0).assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + } + @Test public void testFetchS3ObjectUsingCredentialsProviderService() throws Throwable { putTestFile("test-file", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index bc5e7a2f87..5d7797e22e 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -32,6 +32,7 @@ import java.util.regex.Pattern; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -93,6 +94,31 @@ public class ITPutS3Object extends AbstractS3IT { runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3); } + @Test + public void testSimplePutEncrypted() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); + + runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + + Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); + + for (int i = 0; i < 3; i++) { + final Map attrs = new HashMap<>(); + attrs.put("filename", String.valueOf(i) + ".txt"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + } + runner.run(3); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3); + final List ffs = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + for (MockFlowFile flowFile : ffs) { + flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + } + } + @Test public void testPutS3ObjectUsingCredentialsProviderService() throws Throwable { final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); @@ -227,7 +253,7 @@ public class ITPutS3Object extends AbstractS3IT { public void testGetPropertyDescriptors() throws Exception { PutS3Object processor = new PutS3Object(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 24, pd.size()); + assertEquals("size should be eq", 25, pd.size()); assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(PutS3Object.BUCKET)); @@ -246,6 +272,7 @@ public class ITPutS3Object extends AbstractS3IT { assertTrue(pd.contains(PutS3Object.STORAGE_CLASS)); assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST)); assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST)); + assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION)); } @Test