NIFI-1180: Modify PutS3Object to enable encryption

This closes #246.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Adam Lamar 2016-02-20 23:12:56 -07:00 committed by Andy LoPresto
parent 9235a28f82
commit 2bcc31330c
No known key found for this signature in database
GPG Key ID: 3C6EF65B2F7DEF69
6 changed files with 178 additions and 2 deletions

View File

@ -61,6 +61,7 @@ import com.amazonaws.services.s3.model.S3Object;
@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
@WritesAttribute(attribute = "s3.expirationTime", description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"),
@WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"),
@WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),})
public class FetchS3Object extends AbstractS3Processor {
@ -137,6 +138,9 @@ public class FetchS3Object extends AbstractS3Processor {
if (metadata.getUserMetadata() != null) {
attributes.putAll(metadata.getUserMetadata());
}
if (metadata.getSSEAlgorithm() != null) {
attributes.put("s3.sseAlgorithm", metadata.getSSEAlgorithm());
}
if (metadata.getVersionId() != null) {
attributes.put("s3.version", metadata.getVersionId());
}

View File

@ -113,6 +113,7 @@ import com.amazonaws.services.s3.model.UploadPartResult;
@WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
@WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " +
"the S3 object, if one is set"),
@WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
@WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of " +
"the S3 object, if any was set")
})
@ -121,6 +122,7 @@ public class PutS3Object extends AbstractS3Processor {
public static final long MIN_S3_PART_SIZE = 50L * 1024L * 1024L;
public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L;
public static final String PERSISTENCE_ROOT = "conf/state/";
public static final String NO_SERVER_SIDE_ENCRYPTION = "None";
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
.name("Expiration Time Rule")
@ -177,10 +179,20 @@ public class PutS3Object extends AbstractS3Processor {
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor SERVER_SIDE_ENCRYPTION = new PropertyDescriptor.Builder()
.name("server-side-encryption")
.displayName("Server Side Encryption")
.description("Specifies the algorithm used for server side encryption.")
.required(true)
.allowableValues(NO_SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION)
.defaultValue(NO_SERVER_SIDE_ENCRYPTION)
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE,
ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, PROXY_HOST, PROXY_HOST_PORT));
ENDPOINT_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION,
PROXY_HOST, PROXY_HOST_PORT));
final static String S3_BUCKET_KEY = "s3.bucket";
final static String S3_OBJECT_KEY = "s3.key";
@ -194,6 +206,7 @@ public class PutS3Object extends AbstractS3Processor {
final static String S3_API_METHOD_ATTR_KEY = "s3.apimethod";
final static String S3_API_METHOD_PUTOBJECT = "putobject";
final static String S3_API_METHOD_MULTIPARTUPLOAD = "multipartupload";
final static String S3_SSE_ALGORITHM = "s3.sseAlgorithm";
final static String S3_PROCESS_UNSCHEDULED_MESSAGE = "Processor unscheduled, stopping upload";
@ -407,6 +420,12 @@ public class PutS3Object extends AbstractS3Processor {
}
}
final String serverSideEncryption = context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
objectMetadata.setSSEAlgorithm(serverSideEncryption);
attributes.put(S3_SSE_ALGORITHM, serverSideEncryption);
}
if (!userMetadata.isEmpty()) {
objectMetadata.setUserMetadata(userMetadata);
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3
import com.amazonaws.services.s3.model.ObjectMetadata
import org.junit.After
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@RunWith(JUnit4.class)
class PutS3ObjectTest extends GroovyTestCase {
private static final Logger logger = LoggerFactory.getLogger(PutS3ObjectTest.class);
private static long mockFlowFileId = 0
private PutS3Object putS3Object
@BeforeClass
static void setUpOnce() {
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
void setUp() {
super.setUp()
putS3Object = new PutS3Object()
}
@After
void tearDown() {
}
@Test
void testShouldIncludeServerSideEncryptionAlgorithmProperty() {
// Arrange
// Act
def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors()
def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" }
// Assert
assert ssePropertyDescriptor
assert ssePropertyDescriptor.name == "server-side-encryption"
assert ssePropertyDescriptor.displayName == "Server Side Encryption"
}
@Test
void testShouldValidateServerSideEncryptionDefaultsToNone() {
// Arrange
// Act
def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors()
def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" }
// Assert
assert ssePropertyDescriptor
assert ssePropertyDescriptor.defaultValue == putS3Object.NO_SERVER_SIDE_ENCRYPTION
}
@Test
void testShouldValidateServerSideEncryptionAllowableValues() {
// Arrange
// Act
def propertyDescriptors = putS3Object.getSupportedPropertyDescriptors()
def ssePropertyDescriptor = propertyDescriptors.find { it.name =~ "server-side-encryption" }
// Assert
assert ssePropertyDescriptor
assert ssePropertyDescriptor.allowableValues*.toString() == [putS3Object.NO_SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION]
}
}

View File

@ -22,6 +22,7 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CreateBucketRequest;
import com.amazonaws.services.s3.model.DeleteBucketRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.nifi.util.file.FileUtils;
@ -132,6 +133,14 @@ public abstract class AbstractS3IT {
client.putObject(putRequest);
}
protected void putTestFileEncrypted(String key, File file) throws AmazonS3Exception, FileNotFoundException {
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, new FileInputStream(file), objectMetadata);
client.putObject(putRequest);
}
protected Path getResourcePath(String resourceName) {
Path path = null;

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.aws.s3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
@ -56,6 +57,27 @@ public class ITFetchS3Object extends AbstractS3IT {
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
}
@Test
public void testSimpleGetEncrypted() throws IOException {
putTestFileEncrypted("test-file", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
runner.setProperty(FetchS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(FetchS3Object.REGION, REGION);
runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "test-file");
runner.enqueue(new byte[0], attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
ffs.get(0).assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
}
@Test
public void testFetchS3ObjectUsingCredentialsProviderService() throws Throwable {
putTestFile("test-file", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));

View File

@ -32,6 +32,7 @@ import java.util.regex.Pattern;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -93,6 +94,31 @@ public class ITPutS3Object extends AbstractS3IT {
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3);
}
@Test
public void testSimplePutEncrypted() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
for (int i = 0; i < 3; i++) {
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", String.valueOf(i) + ".txt");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
}
runner.run(3);
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
for (MockFlowFile flowFile : ffs) {
flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
}
}
@Test
public void testPutS3ObjectUsingCredentialsProviderService() throws Throwable {
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
@ -227,7 +253,7 @@ public class ITPutS3Object extends AbstractS3IT {
public void testGetPropertyDescriptors() throws Exception {
PutS3Object processor = new PutS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 24, pd.size());
assertEquals("size should be eq", 25, pd.size());
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(PutS3Object.BUCKET));
@ -246,6 +272,7 @@ public class ITPutS3Object extends AbstractS3IT {
assertTrue(pd.contains(PutS3Object.STORAGE_CLASS));
assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST));
assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION));
}
@Test