mirror of
https://github.com/apache/nifi.git
synced 2025-03-06 09:29:33 +00:00
NIFI-6734: Fixed S3 multipart upload in case of SSE S3 and CSE* encryptions.
Removed unnecessary code from S3 CSE* encryptions. S3 Encryption Service documentation fixes and improvements. Renamed region property of StandardS3EncryptionService to kms-region. Renamed Client-side Customer Master Key in StandardS3EncryptionService. Use Client-side Customer Key on the GUI / documentation (similar to Server-side Customer Key). Use C suffix in constants and class names (similar to SSE_C). Fixed / extended StandardS3EncryptionService validation. FetchS3Object encryption strategy changes. Disable SSE S3 and SSE KMS for FetchS3Object. In case of fetching the S3 object, these strategies are handled implicitly / automatically. Set the encryption strategy on the fetched FF that was used to store the S3 object, instead of the one that is used to read the object (eg. non-encrypted or SSE S3 encrypted objects can be fetched with a CSE client). Typo fix. This closes #3787. Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
parent
df90c65246
commit
ba141690c5
@ -130,8 +130,9 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider
|
||||
public static final PropertyDescriptor ENCRYPTION_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("encryption-service")
|
||||
.displayName("Encryption Service")
|
||||
.description("Specifies the Encryption Service Controller used configure requests. "
|
||||
+ "For backward compatibility, this value is ignored when 'Server Side Encryption' is set.")
|
||||
.description("Specifies the Encryption Service Controller used to configure requests. " +
|
||||
"PutS3Object: For backward compatibility, this value is ignored when 'Server Side Encryption' is set. " +
|
||||
"FetchS3Object: Only needs to be configured in case of Server-side Customer Key, Client-side KMS and Client-side Customer Key encryptions.")
|
||||
.required(false)
|
||||
.identifiesControllerService(AmazonS3EncryptionService.class)
|
||||
.build();
|
||||
|
@ -17,13 +17,16 @@
|
||||
package org.apache.nifi.processors.aws.s3;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.amazonaws.services.s3.model.SSEAlgorithm;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
@ -35,6 +38,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
@ -67,7 +72,7 @@ import com.amazonaws.services.s3.model.S3Object;
|
||||
@WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"),
|
||||
@WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
|
||||
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),
|
||||
@WritesAttribute(attribute = "s3.encryptionStrategy", description = "The name of the encryption strategy, if any was set"),})
|
||||
@WritesAttribute(attribute = "s3.encryptionStrategy", description = "The name of the encryption strategy that was used to store the S3 object (if it is encrypted)"),})
|
||||
public class FetchS3Object extends AbstractS3Processor {
|
||||
|
||||
public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
|
||||
@ -100,6 +105,27 @@ public class FetchS3Object extends AbstractS3Processor {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
|
||||
|
||||
AmazonS3EncryptionService encryptionService = validationContext.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
|
||||
if (encryptionService != null) {
|
||||
String strategyName = encryptionService.getStrategyName();
|
||||
if (strategyName.equals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3) || strategyName.equals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS)) {
|
||||
problems.add(new ValidationResult.Builder()
|
||||
.subject(ENCRYPTION_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation(encryptionService.getStrategyDisplayName() + " is not a valid encryption strategy for fetching objects. Decryption will be handled automatically " +
|
||||
"during the fetch of S3 objects encrypted with " + encryptionService.getStrategyDisplayName())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return problems;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||
FlowFile flowFile = session.get();
|
||||
@ -169,7 +195,13 @@ public class FetchS3Object extends AbstractS3Processor {
|
||||
attributes.putAll(metadata.getUserMetadata());
|
||||
}
|
||||
if (metadata.getSSEAlgorithm() != null) {
|
||||
attributes.put("s3.sseAlgorithm", metadata.getSSEAlgorithm());
|
||||
String sseAlgorithmName = metadata.getSSEAlgorithm();
|
||||
attributes.put("s3.sseAlgorithm", sseAlgorithmName);
|
||||
if (sseAlgorithmName.equals(SSEAlgorithm.AES256.getAlgorithm())) {
|
||||
attributes.put("s3.encryptionStrategy", AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3);
|
||||
} else if (sseAlgorithmName.equals(SSEAlgorithm.KMS.getAlgorithm())) {
|
||||
attributes.put("s3.encryptionStrategy", AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS);
|
||||
}
|
||||
}
|
||||
if (metadata.getVersionId() != null) {
|
||||
attributes.put("s3.version", metadata.getVersionId());
|
||||
|
@ -664,6 +664,7 @@ public class PutS3Object extends AbstractS3Processor {
|
||||
// upload parts
|
||||
//------------------------------------------------------------
|
||||
long thisPartSize;
|
||||
boolean isLastPart;
|
||||
for (int part = currentState.getPartETags().size() + 1;
|
||||
currentState.getFilePosition() < currentState.getContentLength(); part++) {
|
||||
if (!PutS3Object.this.isScheduled()) {
|
||||
@ -672,13 +673,15 @@ public class PutS3Object extends AbstractS3Processor {
|
||||
}
|
||||
thisPartSize = Math.min(currentState.getPartSize(),
|
||||
(currentState.getContentLength() - currentState.getFilePosition()));
|
||||
isLastPart = currentState.getContentLength() == currentState.getFilePosition() + thisPartSize;
|
||||
UploadPartRequest uploadRequest = new UploadPartRequest()
|
||||
.withBucketName(bucket)
|
||||
.withKey(key)
|
||||
.withUploadId(currentState.getUploadId())
|
||||
.withInputStream(in)
|
||||
.withPartNumber(part)
|
||||
.withPartSize(thisPartSize);
|
||||
.withPartSize(thisPartSize)
|
||||
.withLastPart(isLastPart);
|
||||
if (encryptionService != null) {
|
||||
encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
|
||||
}
|
||||
@ -692,8 +695,14 @@ public class PutS3Object extends AbstractS3Processor {
|
||||
getLogger().info("Exception saving cache state processing flow file: " +
|
||||
e.getMessage());
|
||||
}
|
||||
int available = 0;
|
||||
try {
|
||||
available = in.available();
|
||||
} catch (IOException e) {
|
||||
// in case of the last part, the stream is already closed
|
||||
}
|
||||
getLogger().info("Success uploading part flowfile={} part={} available={} " +
|
||||
"etag={} uploadId={}", new Object[]{ffFilename, part, in.available(),
|
||||
"etag={} uploadId={}", new Object[]{ffFilename, part, available,
|
||||
uploadPartResult.getETag(), currentState.getUploadId()});
|
||||
} catch (AmazonClientException e) {
|
||||
getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " +
|
||||
|
@ -18,11 +18,8 @@ package org.apache.nifi.processors.aws.s3.encryption;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.regions.Region;
|
||||
import com.amazonaws.regions.Regions;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.AmazonS3EncryptionClient;
|
||||
import com.amazonaws.services.s3.model.CryptoConfiguration;
|
||||
import com.amazonaws.services.s3.model.EncryptionMaterials;
|
||||
import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
@ -38,58 +35,65 @@ import javax.crypto.spec.SecretKeySpec;
|
||||
* See https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingClientSideEncryption.html#client-side-encryption-client-side-master-key-intro
|
||||
*
|
||||
*/
|
||||
public class ClientSideCMKEncryptionStrategy implements S3EncryptionStrategy {
|
||||
public class ClientSideCEncryptionStrategy implements S3EncryptionStrategy {
|
||||
/**
|
||||
* Create an encryption client.
|
||||
*
|
||||
* @param credentialsProvider AWS credentials provider.
|
||||
* @param clientConfiguration Client configuration
|
||||
* @param region AWS region
|
||||
* @param kmsRegion not used by this encryption strategy
|
||||
* @param keyIdOrMaterial client master key, always base64 encoded
|
||||
* @return AWS S3 client
|
||||
*/
|
||||
@Override
|
||||
public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String region, String keyIdOrMaterial) throws SecurityException {
|
||||
if (!validateKey(keyIdOrMaterial).isValid()) {
|
||||
throw new SecurityException("Invalid client key; ensure key material is base64 encoded.");
|
||||
public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String kmsRegion, String keyIdOrMaterial) {
|
||||
ValidationResult keyValidationResult = validateKey(keyIdOrMaterial);
|
||||
if (!keyValidationResult.isValid()) {
|
||||
throw new IllegalArgumentException("Invalid client key; " + keyValidationResult.getExplanation());
|
||||
}
|
||||
|
||||
byte[] keyMaterial = Base64.decodeBase64(keyIdOrMaterial);
|
||||
SecretKeySpec symmetricKey = new SecretKeySpec(keyMaterial, "AES");
|
||||
StaticEncryptionMaterialsProvider encryptionMaterialsProvider = new StaticEncryptionMaterialsProvider(new EncryptionMaterials(symmetricKey));
|
||||
boolean haveRegion = StringUtils.isNotBlank(region);
|
||||
CryptoConfiguration cryptoConfig = new CryptoConfiguration();
|
||||
Region awsRegion = null;
|
||||
|
||||
if (haveRegion) {
|
||||
awsRegion = Region.getRegion(Regions.fromName(region));
|
||||
cryptoConfig.setAwsKmsRegion(awsRegion);
|
||||
}
|
||||
|
||||
AmazonS3EncryptionClient client = new AmazonS3EncryptionClient(credentialsProvider, encryptionMaterialsProvider, cryptoConfig);
|
||||
if (haveRegion && awsRegion != null) {
|
||||
client.setRegion(awsRegion);
|
||||
}
|
||||
AmazonS3EncryptionClient client = new AmazonS3EncryptionClient(credentialsProvider, encryptionMaterialsProvider);
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValidationResult validateKey(String keyValue) {
|
||||
if (StringUtils.isBlank(keyValue) || !Base64.isBase64(keyValue)) {
|
||||
return new ValidationResult.Builder().valid(false).build();
|
||||
if (StringUtils.isBlank(keyValue)) {
|
||||
return new ValidationResult.Builder()
|
||||
.subject("Key Material")
|
||||
.valid(false)
|
||||
.explanation("it is empty")
|
||||
.build();
|
||||
}
|
||||
|
||||
boolean decoded = false;
|
||||
boolean sized = false;
|
||||
byte[] keyMaterial;
|
||||
|
||||
try {
|
||||
if (!Base64.isBase64(keyValue)) {
|
||||
throw new Exception();
|
||||
}
|
||||
keyMaterial = Base64.decodeBase64(keyValue);
|
||||
decoded = true;
|
||||
sized = keyMaterial.length == 32 || keyMaterial.length == 24 || keyMaterial.length == 16;
|
||||
} catch (final Exception ignored) {
|
||||
} catch (Exception e) {
|
||||
return new ValidationResult.Builder()
|
||||
.subject("Key Material")
|
||||
.valid(false)
|
||||
.explanation("it is not in Base64 encoded form")
|
||||
.build();
|
||||
}
|
||||
|
||||
return new ValidationResult.Builder().valid(decoded && sized).build();
|
||||
if (!(keyMaterial.length == 32 || keyMaterial.length == 24 || keyMaterial.length == 16)) {
|
||||
return new ValidationResult.Builder()
|
||||
.subject("Key Material")
|
||||
.valid(false)
|
||||
.explanation("it is not a Base64 encoded AES-256, AES-192 or AES-128 key")
|
||||
.build();
|
||||
}
|
||||
|
||||
return new ValidationResult.Builder().valid(true).build();
|
||||
}
|
||||
}
|
@ -39,26 +39,22 @@ public class ClientSideKMSEncryptionStrategy implements S3EncryptionStrategy {
|
||||
*
|
||||
* @param credentialsProvider AWS credentials provider.
|
||||
* @param clientConfiguration Client configuration
|
||||
* @param region AWS region
|
||||
* @param kmsRegion AWS KMS region
|
||||
* @param keyIdOrMaterial KMS key id
|
||||
* @return AWS S3 client
|
||||
*/
|
||||
@Override
|
||||
public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String region, String keyIdOrMaterial) {
|
||||
public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String kmsRegion, String keyIdOrMaterial) {
|
||||
KMSEncryptionMaterialsProvider materialProvider = new KMSEncryptionMaterialsProvider(keyIdOrMaterial);
|
||||
boolean haveRegion = StringUtils.isNotBlank(region);
|
||||
Region awsRegion = null;
|
||||
boolean haveKmsRegion = StringUtils.isNotBlank(kmsRegion);
|
||||
|
||||
CryptoConfiguration cryptoConfig = new CryptoConfiguration();
|
||||
if (haveRegion) {
|
||||
awsRegion = Region.getRegion(Regions.fromName(region));
|
||||
if (haveKmsRegion) {
|
||||
Region awsRegion = Region.getRegion(Regions.fromName(kmsRegion));
|
||||
cryptoConfig.setAwsKmsRegion(awsRegion);
|
||||
}
|
||||
|
||||
AmazonS3EncryptionClient client = new AmazonS3EncryptionClient(credentialsProvider, materialProvider, cryptoConfig);
|
||||
if (haveRegion) {
|
||||
client.setRegion(awsRegion);
|
||||
}
|
||||
|
||||
return client;
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ public interface S3EncryptionStrategy {
|
||||
* @param clientConfiguration Client configuration.
|
||||
* @return {@link AmazonS3Client}, perhaps an {@link com.amazonaws.services.s3.AmazonS3EncryptionClient}
|
||||
*/
|
||||
default AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String region, String keyIdOrMaterial) throws SecurityException {
|
||||
default AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String kmsRegion, String keyIdOrMaterial) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -22,8 +22,9 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.amazonaws.services.s3.model.SSECustomerKey;
|
||||
import com.amazonaws.services.s3.model.UploadPartRequest;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.bouncycastle.util.encoders.Base64;
|
||||
|
||||
/**
|
||||
* This strategy uses a customer key to perform server-side encryption. Use this strategy when you want the server to perform the encryption,
|
||||
@ -32,7 +33,7 @@ import org.bouncycastle.util.encoders.Base64;
|
||||
* See https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
|
||||
*
|
||||
*/
|
||||
public class ServerSideCEKEncryptionStrategy implements S3EncryptionStrategy {
|
||||
public class ServerSideCEncryptionStrategy implements S3EncryptionStrategy {
|
||||
@Override
|
||||
public void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata, String keyValue) {
|
||||
SSECustomerKey customerKey = new SSECustomerKey(keyValue);
|
||||
@ -59,17 +60,37 @@ public class ServerSideCEKEncryptionStrategy implements S3EncryptionStrategy {
|
||||
|
||||
@Override
|
||||
public ValidationResult validateKey(String keyValue) {
|
||||
boolean decoded = false;
|
||||
boolean sized = false;
|
||||
if (StringUtils.isBlank(keyValue)) {
|
||||
return new ValidationResult.Builder()
|
||||
.subject("Key Material")
|
||||
.valid(false)
|
||||
.explanation("it is empty")
|
||||
.build();
|
||||
}
|
||||
|
||||
byte[] keyMaterial;
|
||||
|
||||
try {
|
||||
keyMaterial = Base64.decode(keyValue);
|
||||
decoded = true;
|
||||
sized = (keyMaterial.length > 0) && (keyMaterial.length % 32) == 0;
|
||||
} catch (final Exception ignored) {
|
||||
if (!org.apache.commons.codec.binary.Base64.isBase64(keyValue)) {
|
||||
throw new Exception();
|
||||
}
|
||||
keyMaterial = Base64.decodeBase64(keyValue);
|
||||
} catch (Exception e) {
|
||||
return new ValidationResult.Builder()
|
||||
.subject("Key Material")
|
||||
.valid(false)
|
||||
.explanation("it is not in Base64 encoded form")
|
||||
.build();
|
||||
}
|
||||
|
||||
return new ValidationResult.Builder().valid(decoded && sized).build();
|
||||
if (keyMaterial.length != 32) {
|
||||
return new ValidationResult.Builder()
|
||||
.subject("Key Material")
|
||||
.valid(false)
|
||||
.explanation("it is not a Base64 encoded AES-256 key")
|
||||
.build();
|
||||
}
|
||||
|
||||
return new ValidationResult.Builder().valid(true).build();
|
||||
}
|
||||
}
|
@ -16,6 +16,7 @@
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.s3.encryption;
|
||||
|
||||
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
|
||||
@ -33,4 +34,9 @@ public class ServerSideS3EncryptionStrategy implements S3EncryptionStrategy {
|
||||
public void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata, String keyValue) {
|
||||
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, ObjectMetadata objectMetadata, String keyValue) {
|
||||
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
|
||||
}
|
||||
}
|
||||
|
@ -30,17 +30,18 @@ import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService;
|
||||
import org.apache.nifi.processors.aws.s3.AbstractS3Processor;
|
||||
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -52,39 +53,41 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
@Tags({"service", "encryption", "encrypt", "decryption", "decrypt", "key"})
|
||||
@Tags({"service", "aws", "s3", "encryption", "encrypt", "decryption", "decrypt", "key"})
|
||||
@CapabilityDescription("Adds configurable encryption to S3 Put and S3 Fetch operations.")
|
||||
public class StandardS3EncryptionService extends AbstractControllerService implements AmazonS3EncryptionService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(StandardS3EncryptionService.class);
|
||||
|
||||
public static final String STRATEGY_NAME_NONE = "NONE";
|
||||
public static final String STRATEGY_NAME_SSE_S3 = "SSE_S3";
|
||||
public static final String STRATEGY_NAME_SSE_KMS = "SSE_KMS";
|
||||
public static final String STRATEGY_NAME_SSE_C = "SSE_C";
|
||||
public static final String STRATEGY_NAME_CSE_KMS = "CSE_KMS";
|
||||
public static final String STRATEGY_NAME_CSE_CMK = "CSE_CMK";
|
||||
|
||||
private static final Map<String, S3EncryptionStrategy> namedStrategies = new HashMap<String, S3EncryptionStrategy>() {{
|
||||
private static final Map<String, S3EncryptionStrategy> NAMED_STRATEGIES = new HashMap<String, S3EncryptionStrategy>() {{
|
||||
put(STRATEGY_NAME_NONE, new NoOpEncryptionStrategy());
|
||||
put(STRATEGY_NAME_SSE_S3, new ServerSideS3EncryptionStrategy());
|
||||
put(STRATEGY_NAME_SSE_KMS, new ServerSideKMSEncryptionStrategy());
|
||||
put(STRATEGY_NAME_SSE_C, new ServerSideCEKEncryptionStrategy());
|
||||
put(STRATEGY_NAME_SSE_C, new ServerSideCEncryptionStrategy());
|
||||
put(STRATEGY_NAME_CSE_KMS, new ClientSideKMSEncryptionStrategy());
|
||||
put(STRATEGY_NAME_CSE_CMK, new ClientSideCMKEncryptionStrategy());
|
||||
put(STRATEGY_NAME_CSE_C, new ClientSideCEncryptionStrategy());
|
||||
}};
|
||||
|
||||
private static final AllowableValue NONE = new AllowableValue(STRATEGY_NAME_NONE, "None","No encryption.");
|
||||
private static final AllowableValue SSE_S3 = new AllowableValue(STRATEGY_NAME_SSE_S3, "Server-side S3","Use server-side, S3-managed encryption.");
|
||||
private static final AllowableValue SSE_KMS = new AllowableValue(STRATEGY_NAME_SSE_KMS, "Server-side KMS","Use server-side, KMS key to perform encryption.");
|
||||
private static final AllowableValue SSE_C = new AllowableValue(STRATEGY_NAME_SSE_C, "Server-side Customer Key","Use server-side, customer-supplied key for encryption.");
|
||||
private static final AllowableValue SSE_C = new AllowableValue(STRATEGY_NAME_SSE_C, "Server-side Customer Key","Use server-side, customer-supplied key to perform encryption.");
|
||||
private static final AllowableValue CSE_KMS = new AllowableValue(STRATEGY_NAME_CSE_KMS, "Client-side KMS","Use client-side, KMS key to perform encryption.");
|
||||
private static final AllowableValue CSE_CMK = new AllowableValue(STRATEGY_NAME_CSE_CMK, "Client-side Customer Master Key","Use client-side, customer-supplied master key to perform encryption.");
|
||||
private static final AllowableValue CSE_C = new AllowableValue(STRATEGY_NAME_CSE_C, "Client-side Customer Key","Use client-side, customer-supplied key to perform encryption.");
|
||||
|
||||
public static final Map<String, AllowableValue> ENCRYPTION_STRATEGY_ALLOWABLE_VALUES = new HashMap<String, AllowableValue>() {{
|
||||
put(STRATEGY_NAME_NONE, NONE);
|
||||
put(STRATEGY_NAME_SSE_S3, SSE_S3);
|
||||
put(STRATEGY_NAME_SSE_KMS, SSE_KMS);
|
||||
put(STRATEGY_NAME_SSE_C, SSE_C);
|
||||
put(STRATEGY_NAME_CSE_KMS, CSE_KMS);
|
||||
put(STRATEGY_NAME_CSE_C, CSE_C);
|
||||
}};
|
||||
|
||||
public static final PropertyDescriptor ENCRYPTION_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("encryption-strategy")
|
||||
.displayName("Encryption Strategy")
|
||||
.description("Strategy to use for S3 data encryption and decryption.")
|
||||
.allowableValues(NONE, SSE_S3, SSE_KMS, SSE_C, CSE_KMS, CSE_CMK)
|
||||
.allowableValues(NONE, SSE_S3, SSE_KMS, SSE_C, CSE_KMS, CSE_C)
|
||||
.required(true)
|
||||
.defaultValue(NONE.getValue())
|
||||
.build();
|
||||
@ -92,34 +95,38 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
|
||||
public static final PropertyDescriptor ENCRYPTION_VALUE = new PropertyDescriptor.Builder()
|
||||
.name("key-id-or-key-material")
|
||||
.displayName("Key ID or Key Material")
|
||||
.description("For Server-side CEK and Client-side CMK, this is base64-encoded Key Material. For all others (except 'None'), it is the KMS Key ID.")
|
||||
.description("For None and Server-side S3: not used. For Server-side KMS and Client-side KMS: the KMS Key ID must be configured. " +
|
||||
"For Server-side Customer Key and Client-side Customer Key: the Key Material must be specified in Base64 encoded form. " +
|
||||
"In case of Server-side Customer Key, the key must be an AES-256 key. In case of Client-side Customer Key, it can be an AES-256, AES-192 or AES-128 key.")
|
||||
.required(false)
|
||||
.sensitive(true)
|
||||
.addValidator(new StandardValidators.StringLengthValidator(0, 4096))
|
||||
.addValidator((subject, input, context) -> new ValidationResult.Builder().valid(true).build()) // will be validated in customValidate()
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
|
||||
.name("region")
|
||||
public static final PropertyDescriptor KMS_REGION = new PropertyDescriptor.Builder()
|
||||
.name("kms-region")
|
||||
.displayName("KMS Region")
|
||||
.description("The Region of the AWS Key Management Service. Only used in case of Client-side KMS.")
|
||||
.required(false)
|
||||
.allowableValues(AbstractS3Processor.getAvailableRegions())
|
||||
.defaultValue(AbstractS3Processor.createAllowableValue(Regions.DEFAULT_REGION).getValue())
|
||||
.build();
|
||||
|
||||
private String keyValue = "";
|
||||
private String region = "";
|
||||
private String kmsRegion = "";
|
||||
private S3EncryptionStrategy encryptionStrategy = new NoOpEncryptionStrategy();
|
||||
private String strategyName = STRATEGY_NAME_NONE;
|
||||
|
||||
@OnEnabled
|
||||
public void onConfigured(final ConfigurationContext context) throws InitializationException {
|
||||
final String newStrategyName = context.getProperty(ENCRYPTION_STRATEGY).getValue();
|
||||
final String newKeyValue = context.getProperty(ENCRYPTION_VALUE).getValue();
|
||||
final S3EncryptionStrategy newEncryptionStrategy = namedStrategies.get(newStrategyName);
|
||||
String newRegion = null;
|
||||
final String newKeyValue = context.getProperty(ENCRYPTION_VALUE).evaluateAttributeExpressions().getValue();
|
||||
final S3EncryptionStrategy newEncryptionStrategy = NAMED_STRATEGIES.get(newStrategyName);
|
||||
String newKmsRegion = null;
|
||||
|
||||
if (context.getProperty(REGION) != null ) {
|
||||
newRegion = context.getProperty(REGION).getValue();
|
||||
if (context.getProperty(KMS_REGION) != null ) {
|
||||
newKmsRegion = context.getProperty(KMS_REGION).getValue();
|
||||
}
|
||||
|
||||
if (newEncryptionStrategy == null) {
|
||||
@ -131,13 +138,59 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
|
||||
strategyName = newStrategyName;
|
||||
encryptionStrategy = newEncryptionStrategy;
|
||||
keyValue = newKeyValue;
|
||||
region = newRegion;
|
||||
kmsRegion = newKmsRegion;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
Collection<ValidationResult> validationResults = new ArrayList<>();
|
||||
validationResults.add(encryptionStrategy.validateKey(validationContext.getProperty(ENCRYPTION_VALUE).getValue()));
|
||||
|
||||
String encryptionStrategyName = validationContext.getProperty(ENCRYPTION_STRATEGY).getValue();
|
||||
String encryptionStrategyDisplayName = ENCRYPTION_STRATEGY_ALLOWABLE_VALUES.get(encryptionStrategyName).getDisplayName();
|
||||
PropertyValue encryptionValueProperty = validationContext.getProperty(ENCRYPTION_VALUE);
|
||||
String encryptionValue = encryptionValueProperty.evaluateAttributeExpressions().getValue();
|
||||
|
||||
switch (encryptionStrategyName) {
|
||||
case STRATEGY_NAME_NONE:
|
||||
case STRATEGY_NAME_SSE_S3:
|
||||
if (encryptionValueProperty.isSet()) {
|
||||
validationResults.add(new ValidationResult.Builder()
|
||||
.subject(ENCRYPTION_VALUE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("the property cannot be specified for encryption strategy " + encryptionStrategyDisplayName)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
break;
|
||||
case STRATEGY_NAME_SSE_KMS:
|
||||
case STRATEGY_NAME_CSE_KMS:
|
||||
if (StringUtils.isEmpty(encryptionValue)) {
|
||||
validationResults.add(new ValidationResult.Builder()
|
||||
.subject(ENCRYPTION_VALUE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("a non-empty Key ID must be specified for encryption strategy " + encryptionStrategyDisplayName)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
break;
|
||||
case STRATEGY_NAME_SSE_C:
|
||||
case STRATEGY_NAME_CSE_C:
|
||||
if (StringUtils.isEmpty(encryptionValue)) {
|
||||
validationResults.add(new ValidationResult.Builder()
|
||||
.subject(ENCRYPTION_VALUE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("a non-empty Key Material must be specified for encryption strategy " + encryptionStrategyDisplayName)
|
||||
.build()
|
||||
);
|
||||
} else {
|
||||
S3EncryptionStrategy encryptionStrategy = NAMED_STRATEGIES.get(encryptionStrategyName);
|
||||
String keyIdOrMaterial = validationContext.getProperty(ENCRYPTION_VALUE).evaluateAttributeExpressions().getValue();
|
||||
|
||||
validationResults.add(encryptionStrategy.validateKey(keyIdOrMaterial));
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
return validationResults;
|
||||
}
|
||||
|
||||
@ -146,7 +199,7 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(ENCRYPTION_STRATEGY);
|
||||
properties.add(ENCRYPTION_VALUE);
|
||||
properties.add(REGION);
|
||||
properties.add(KMS_REGION);
|
||||
return Collections.unmodifiableList(properties);
|
||||
}
|
||||
|
||||
@ -172,18 +225,23 @@ public class StandardS3EncryptionService extends AbstractControllerService imple
|
||||
|
||||
@Override
|
||||
public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration) {
|
||||
return encryptionStrategy.createEncryptionClient(credentialsProvider, clientConfiguration, region, keyValue);
|
||||
return encryptionStrategy.createEncryptionClient(credentialsProvider, clientConfiguration, kmsRegion, keyValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRegion() {
|
||||
return region;
|
||||
public String getKmsRegion() {
|
||||
return kmsRegion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStrategyName() {
|
||||
return strategyName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStrategyDisplayName() {
|
||||
return ENCRYPTION_STRATEGY_ALLOWABLE_VALUES.get(strategyName).getDisplayName();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -21,15 +21,15 @@
|
||||
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
|
||||
</head>
|
||||
<body>
|
||||
|
||||
<h2>Description</h2>
|
||||
<div>
|
||||
The <code>S3EncryptionService</code> manages an encryption strategy and applies that strategy to various S3 operations.
|
||||
The <code>StandardS3EncryptionService</code> manages an encryption strategy and applies that strategy to various S3 operations.
|
||||
|
||||
<br>
|
||||
|
||||
<b>Note:</b> this service has no effect when a processor has the <code>SERVER_SIDE_ENCRYPTION</code> property set. To use
|
||||
<b>Note:</b> This service has no effect when a processor has the <code>Server Side Encryption</code> property set. To use
|
||||
this service with processors so configured, first create a service instance, set the <code>Encryption Strategy</code> to <code>Server-side S3</code>,
|
||||
disable the <code>SERVER_SIDE_ENCRYPTION</code> processor setting, and finally, associate the processor with the service.
|
||||
disable the <code>Server Side Encryption</code> processor setting, and finally, associate the processor with the service.
|
||||
</div>
|
||||
|
||||
|
||||
@ -44,27 +44,26 @@
|
||||
<li><code>Server-side S3</code> - encryption and decryption is managed by S3; no keys are required.</li>
|
||||
<li><code>Server-side KMS</code> - encryption and decryption are performed by S3 using the configured KMS key.</li>
|
||||
<li><code>Server-side Customer Key</code> - encryption and decryption are performed by S3 using the supplied customer key.</li>
|
||||
<li><code>Client-side KMS</code> - like the server-side KMS strategy, with the encryption and decryption performed by the client.</li>
|
||||
<li><code>Client-side Customer Master Key</code> - like the server-side CEK strategy, with the encryption and decryption performed by the client.</li>
|
||||
<li><code>Client-side KMS</code> - like the Server-side KMS strategy, with the encryption and decryption performed by the client.</li>
|
||||
<li><code>Client-side Customer Key</code> - like the Server-side Customer Key strategy, with the encryption and decryption performed by the client.</li>
|
||||
</ul>
|
||||
</div>
|
||||
|
||||
<h3>Key ID or Key Material</h3>
|
||||
<p>
|
||||
When configured for either the server-side or client-side KMS strategy, this field should contain the ID or alias
|
||||
of that key.
|
||||
When configured for either the Server-side or Client-side KMS strategies, this field should contain the KMS Key ID.
|
||||
</p>
|
||||
<p>
|
||||
When configured for either the server-side or client-side customer key strategies, this field should contain the key
|
||||
When configured for either the Server-side or Client-side Customer Key strategies, this field should contain the key
|
||||
material, and that material must be base64 encoded.
|
||||
</p>
|
||||
<p>
|
||||
All other encryption strategies ignore this field.
|
||||
</p>
|
||||
|
||||
<h3>Region</h3>
|
||||
<h3>KMS Region</h3>
|
||||
<div>
|
||||
KMS key region, if any. This value must match the actual region of the KMS key if supplied.
|
||||
KMS key region, if any. This value must match the actual region of the KMS key if supplied.
|
||||
</div>
|
||||
|
||||
</body>
|
@ -946,12 +946,23 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptionServiceWithServerSideS3ManagedEncryptionStrategy() throws IOException, InitializationException {
|
||||
TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_S3, "");
|
||||
public void testEncryptionServiceWithServerSideS3EncryptionStrategyUsingSingleUpload() throws IOException, InitializationException {
|
||||
byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
|
||||
testEncryptionServiceWithServerSideS3EncryptionStrategy(smallData);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptionServiceWithServerSideS3EncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
|
||||
byte[] largeData = new byte[51 * 1024 * 1024];
|
||||
testEncryptionServiceWithServerSideS3EncryptionStrategy(largeData);
|
||||
}
|
||||
|
||||
private void testEncryptionServiceWithServerSideS3EncryptionStrategy(byte[] data) throws IOException, InitializationException {
|
||||
TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3, null);
|
||||
|
||||
Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("filename", "test.txt");
|
||||
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
|
||||
runner.enqueue(data, attrs);
|
||||
runner.assertValid();
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
|
||||
@ -960,21 +971,32 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||
Assert.assertEquals(1, flowFiles.size());
|
||||
Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
|
||||
MockFlowFile putSuccess = flowFiles.get(0);
|
||||
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_SSE_S3);
|
||||
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3);
|
||||
|
||||
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_SSE_S3, "");
|
||||
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3, null);
|
||||
flowFile.assertContentEquals(data);
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_SSE_S3);
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptionServiceWithServerSideKMSEncryptionStrategy() throws IOException, InitializationException {
|
||||
TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
|
||||
public void testEncryptionServiceWithServerSideKMSEncryptionStrategyUsingSingleUpload() throws IOException, InitializationException {
|
||||
byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
|
||||
testEncryptionServiceWithServerSideKMSEncryptionStrategy(smallData);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptionServiceWithServerSideKMSEncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
|
||||
byte[] largeData = new byte[51 * 1024 * 1024];
|
||||
testEncryptionServiceWithServerSideKMSEncryptionStrategy(largeData);
|
||||
}
|
||||
|
||||
private void testEncryptionServiceWithServerSideKMSEncryptionStrategy(byte[] data) throws IOException, InitializationException {
|
||||
TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
|
||||
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("filename", "test.txt");
|
||||
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
|
||||
runner.enqueue(data, attrs);
|
||||
runner.assertValid();
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
|
||||
@ -983,21 +1005,32 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||
Assert.assertEquals(1, flowFiles.size());
|
||||
Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
|
||||
MockFlowFile putSuccess = flowFiles.get(0);
|
||||
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS);
|
||||
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS);
|
||||
|
||||
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
|
||||
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
|
||||
flowFile.assertContentEquals(data);
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, "aws:kms");
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS);
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptionServiceWithServerSideCPEKEncryptionStrategy() throws IOException, InitializationException {
|
||||
TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
|
||||
public void testEncryptionServiceWithServerSideCEncryptionStrategyUsingSingleUpload() throws IOException, InitializationException {
|
||||
byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
|
||||
testEncryptionServiceWithServerSideCEncryptionStrategy(smallData);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptionServiceWithServerSideCEncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
|
||||
byte[] largeData = new byte[51 * 1024 * 1024];
|
||||
testEncryptionServiceWithServerSideCEncryptionStrategy(largeData);
|
||||
}
|
||||
|
||||
private void testEncryptionServiceWithServerSideCEncryptionStrategy(byte[] data) throws IOException, InitializationException {
|
||||
TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
|
||||
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("filename", "test.txt");
|
||||
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
|
||||
runner.enqueue(data, attrs);
|
||||
runner.assertValid();
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
|
||||
@ -1006,23 +1039,34 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||
Assert.assertEquals(1, flowFiles.size());
|
||||
Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
|
||||
MockFlowFile putSuccess = flowFiles.get(0);
|
||||
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_SSE_C);
|
||||
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_SSE_C);
|
||||
|
||||
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
|
||||
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
|
||||
flowFile.assertContentEquals(data);
|
||||
// successful fetch does not indicate type of original encryption:
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, null);
|
||||
// but it does indicate it via our specific attribute:
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_SSE_C);
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_SSE_C);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptionServiceWithClientSideKMSEncryptionStrategy() throws InitializationException, IOException {
|
||||
TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
|
||||
public void testEncryptionServiceWithClientSideKMSEncryptionStrategyUsingSingleUpload() throws IOException, InitializationException {
|
||||
byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
|
||||
testEncryptionServiceWithClientSideKMSEncryptionStrategy(smallData);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptionServiceWithClientSideKMSEncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
|
||||
byte[] largeData = new byte[51 * 1024 * 1024];
|
||||
testEncryptionServiceWithClientSideKMSEncryptionStrategy(largeData);
|
||||
}
|
||||
|
||||
private void testEncryptionServiceWithClientSideKMSEncryptionStrategy(byte[] data) throws InitializationException, IOException {
|
||||
TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
|
||||
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("filename", "test.txt");
|
||||
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
|
||||
runner.enqueue(data, attrs);
|
||||
runner.assertValid();
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
|
||||
@ -1031,21 +1075,32 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||
Assert.assertEquals(1, flowFiles.size());
|
||||
Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
|
||||
MockFlowFile putSuccess = flowFiles.get(0);
|
||||
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS);
|
||||
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS);
|
||||
|
||||
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
|
||||
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
|
||||
flowFile.assertContentEquals(data);
|
||||
flowFile.assertAttributeEquals("x-amz-wrap-alg", "kms");
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS);
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptionServiceWithClientSideCMKEncryptionStrategy() throws InitializationException, IOException {
|
||||
TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK, randomKeyMaterial);
|
||||
public void testEncryptionServiceWithClientSideCEncryptionStrategyUsingSingleUpload() throws InitializationException, IOException {
|
||||
byte[] smallData = Files.readAllBytes(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
|
||||
testEncryptionServiceWithClientSideCEncryptionStrategy(smallData);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncryptionServiceWithClientSideCEncryptionStrategyUsingMultipartUpload() throws IOException, InitializationException {
|
||||
byte[] largeData = new byte[51 * 1024 * 1024];
|
||||
testEncryptionServiceWithClientSideCEncryptionStrategy(largeData);
|
||||
}
|
||||
|
||||
private void testEncryptionServiceWithClientSideCEncryptionStrategy(byte[] data) throws InitializationException, IOException {
|
||||
TestRunner runner = createPutEncryptionTestRunner(AmazonS3EncryptionService.STRATEGY_NAME_CSE_C, randomKeyMaterial);
|
||||
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("filename", "test.txt");
|
||||
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
|
||||
runner.enqueue(data, attrs);
|
||||
runner.assertValid();
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
|
||||
@ -1054,11 +1109,11 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||
Assert.assertEquals(1, flowFiles.size());
|
||||
Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
|
||||
MockFlowFile putSuccess = flowFiles.get(0);
|
||||
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK);
|
||||
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), AmazonS3EncryptionService.STRATEGY_NAME_CSE_C);
|
||||
|
||||
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK, randomKeyMaterial);
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK);
|
||||
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
|
||||
MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, AmazonS3EncryptionService.STRATEGY_NAME_CSE_C, randomKeyMaterial);
|
||||
flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, AmazonS3EncryptionService.STRATEGY_NAME_CSE_C);
|
||||
flowFile.assertContentEquals(data);
|
||||
|
||||
flowFile.assertAttributeExists("x-amz-key");
|
||||
flowFile.assertAttributeNotEquals("x-amz-key", "");
|
||||
@ -1068,11 +1123,24 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||
}
|
||||
|
||||
private static TestRunner createPutEncryptionTestRunner(String strategyName, String keyIdOrMaterial) throws InitializationException {
|
||||
return createEncryptionTestRunner(new PutS3Object(), strategyName, keyIdOrMaterial);
|
||||
TestRunner runner = createEncryptionTestRunner(new PutS3Object(), strategyName, keyIdOrMaterial);
|
||||
|
||||
runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, "50 MB");
|
||||
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, "50 MB");
|
||||
|
||||
return runner;
|
||||
}
|
||||
|
||||
private static TestRunner createFetchEncryptionTestRunner(String strategyName, String keyIdOrMaterial) throws InitializationException {
|
||||
if (strategyName.equals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3) || strategyName.equals(AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS)) {
|
||||
strategyName = null;
|
||||
}
|
||||
|
||||
return createEncryptionTestRunner(new FetchS3Object(), strategyName, keyIdOrMaterial);
|
||||
}
|
||||
|
||||
private static MockFlowFile fetchEncryptedFlowFile(Map<String, String> attributes, String strategyName, String keyIdOrMaterial) throws InitializationException {
|
||||
final TestRunner runner = createEncryptionTestRunner(new FetchS3Object(), strategyName, keyIdOrMaterial);
|
||||
final TestRunner runner = createFetchEncryptionTestRunner(strategyName, keyIdOrMaterial);
|
||||
runner.enqueue(new byte[0], attributes);
|
||||
runner.run(1);
|
||||
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
|
||||
@ -1082,24 +1150,28 @@ public class ITPutS3Object extends AbstractS3IT {
|
||||
|
||||
private static TestRunner createEncryptionTestRunner(Processor processor, String strategyName, String keyIdOrMaterial) throws InitializationException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||
final StandardS3EncryptionService service = new StandardS3EncryptionService();
|
||||
final ConfigurationContext context = mock(ConfigurationContext.class);
|
||||
|
||||
runner.addControllerService(PutS3Object.ENCRYPTION_SERVICE.getName(), service);
|
||||
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||
runner.setProperty(PutS3Object.REGION, REGION);
|
||||
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||
runner.setProperty(PutS3Object.ENCRYPTION_SERVICE, service.getIdentifier());
|
||||
runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_STRATEGY, strategyName);
|
||||
runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_VALUE, keyIdOrMaterial);
|
||||
runner.setProperty(service, StandardS3EncryptionService.REGION, REGION);
|
||||
|
||||
when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new MockPropertyValue(strategyName));
|
||||
when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new MockPropertyValue(keyIdOrMaterial));
|
||||
when(context.getProperty(StandardS3EncryptionService.REGION)).thenReturn(new MockPropertyValue(REGION));
|
||||
if (strategyName != null) {
|
||||
final StandardS3EncryptionService service = new StandardS3EncryptionService();
|
||||
runner.addControllerService(PutS3Object.ENCRYPTION_SERVICE.getName(), service);
|
||||
runner.setProperty(PutS3Object.ENCRYPTION_SERVICE, service.getIdentifier());
|
||||
|
||||
service.onConfigured(context);
|
||||
runner.enableControllerService(service);
|
||||
runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_STRATEGY, strategyName);
|
||||
runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_VALUE, keyIdOrMaterial);
|
||||
runner.setProperty(service, StandardS3EncryptionService.KMS_REGION, REGION);
|
||||
|
||||
when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new MockPropertyValue(strategyName));
|
||||
when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new MockPropertyValue(keyIdOrMaterial));
|
||||
when(context.getProperty(StandardS3EncryptionService.KMS_REGION)).thenReturn(new MockPropertyValue(REGION));
|
||||
|
||||
service.onConfigured(context);
|
||||
runner.enableControllerService(service);
|
||||
}
|
||||
|
||||
return runner;
|
||||
}
|
||||
|
@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.s3.encryption;
|
||||
|
||||
import java.util.Base64;
|
||||
import java.util.Random;
|
||||
|
||||
final class S3EncryptionTestUtil {
|
||||
|
||||
private static Random RANDOM = new Random();
|
||||
|
||||
private S3EncryptionTestUtil() {
|
||||
}
|
||||
|
||||
static String createKey(int keySize) {
|
||||
if (keySize % 8 != 0) {
|
||||
throw new IllegalArgumentException("Invalid test data");
|
||||
}
|
||||
|
||||
byte[] keyMaterial = new byte[keySize / 8];
|
||||
RANDOM.nextBytes(keyMaterial);
|
||||
return new String(Base64.getEncoder().encode(keyMaterial));
|
||||
}
|
||||
}
|
@ -0,0 +1,98 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.s3.encryption;
|
||||
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.nifi.processors.aws.s3.encryption.S3EncryptionTestUtil.createKey;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestClientSideCEncryptionStrategyKeyValidation {
|
||||
|
||||
private ClientSideCEncryptionStrategy strategy;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
strategy = new ClientSideCEncryptionStrategy();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValid256BitKey() {
|
||||
String key = createKey(256);
|
||||
|
||||
ValidationResult result = strategy.validateKey(key);
|
||||
|
||||
assertTrue(result.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValid192BitKey() {
|
||||
String key = createKey(192);
|
||||
|
||||
ValidationResult result = strategy.validateKey(key);
|
||||
|
||||
assertTrue(result.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValid128BitKey() {
|
||||
String key = createKey(128);
|
||||
|
||||
ValidationResult result = strategy.validateKey(key);
|
||||
|
||||
assertTrue(result.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotSupportedKeySize() {
|
||||
String key = createKey(512);
|
||||
|
||||
ValidationResult result = strategy.validateKey(key);
|
||||
|
||||
assertFalse(result.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullKey() {
|
||||
String key = null;
|
||||
|
||||
ValidationResult result = strategy.validateKey(key);
|
||||
|
||||
assertFalse(result.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyKey() {
|
||||
String key = "";
|
||||
|
||||
ValidationResult result = strategy.validateKey(key);
|
||||
|
||||
assertFalse(result.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotBase64EncodedKey() {
|
||||
String key = "NotBase64EncodedKey";
|
||||
|
||||
ValidationResult result = strategy.validateKey(key);
|
||||
|
||||
assertFalse(result.isValid());
|
||||
}
|
||||
}
|
@ -33,7 +33,7 @@ public class TestS3EncryptionStrategies {
|
||||
|
||||
private String randomKeyMaterial = "";
|
||||
private String randomKeyId = "mock-key-id";
|
||||
private String region = "us-west-1";
|
||||
private String kmsRegion = "us-west-1";
|
||||
|
||||
private ObjectMetadata metadata = null;
|
||||
private PutObjectRequest putObjectRequest = null;
|
||||
@ -60,7 +60,7 @@ public class TestS3EncryptionStrategies {
|
||||
S3EncryptionStrategy strategy = new ClientSideKMSEncryptionStrategy();
|
||||
|
||||
// This shows that the strategy builds a client:
|
||||
Assert.assertNotNull(strategy.createEncryptionClient(null, null, region, randomKeyMaterial));
|
||||
Assert.assertNotNull(strategy.createEncryptionClient(null, null, kmsRegion, randomKeyMaterial));
|
||||
|
||||
// This shows that the strategy does not modify the metadata or any of the requests:
|
||||
Assert.assertNull(metadata.getSSEAlgorithm());
|
||||
@ -76,11 +76,11 @@ public class TestS3EncryptionStrategies {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientSideCMKEncryptionStrategy() {
|
||||
S3EncryptionStrategy strategy = new ClientSideCMKEncryptionStrategy();
|
||||
public void testClientSideCEncryptionStrategy() {
|
||||
S3EncryptionStrategy strategy = new ClientSideCEncryptionStrategy();
|
||||
|
||||
// This shows that the strategy builds a client:
|
||||
Assert.assertNotNull(strategy.createEncryptionClient(null, null, region, randomKeyMaterial));
|
||||
Assert.assertNotNull(strategy.createEncryptionClient(null, null, null, randomKeyMaterial));
|
||||
|
||||
// This shows that the strategy does not modify the metadata or any of the requests:
|
||||
Assert.assertNull(metadata.getSSEAlgorithm());
|
||||
@ -96,11 +96,11 @@ public class TestS3EncryptionStrategies {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerSideCEKEncryptionStrategy() {
|
||||
S3EncryptionStrategy strategy = new ServerSideCEKEncryptionStrategy();
|
||||
public void testServerSideCEncryptionStrategy() {
|
||||
S3EncryptionStrategy strategy = new ServerSideCEncryptionStrategy();
|
||||
|
||||
// This shows that the strategy does *not* build a client:
|
||||
Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
|
||||
Assert.assertNull(strategy.createEncryptionClient(null, null, null, ""));
|
||||
|
||||
// This shows that the strategy sets the SSE customer key as expected:
|
||||
strategy.configurePutObjectRequest(putObjectRequest, metadata, randomKeyMaterial);
|
||||
@ -130,7 +130,7 @@ public class TestS3EncryptionStrategies {
|
||||
S3EncryptionStrategy strategy = new ServerSideKMSEncryptionStrategy();
|
||||
|
||||
// This shows that the strategy does *not* build a client:
|
||||
Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
|
||||
Assert.assertNull(strategy.createEncryptionClient(null, null, null, null));
|
||||
|
||||
// This shows that the strategy sets the SSE KMS key id as expected:
|
||||
strategy.configurePutObjectRequest(putObjectRequest, metadata, randomKeyId);
|
||||
@ -150,10 +150,14 @@ public class TestS3EncryptionStrategies {
|
||||
S3EncryptionStrategy strategy = new ServerSideS3EncryptionStrategy();
|
||||
|
||||
// This shows that the strategy does *not* build a client:
|
||||
Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
|
||||
Assert.assertNull(strategy.createEncryptionClient(null, null, null, null));
|
||||
|
||||
// This shows that the strategy sets the SSE algorithm field as expected:
|
||||
strategy.configurePutObjectRequest(putObjectRequest, metadata, "");
|
||||
strategy.configurePutObjectRequest(putObjectRequest, metadata, null);
|
||||
Assert.assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, metadata.getSSEAlgorithm());
|
||||
|
||||
// Same for InitiateMultipartUploadRequest:
|
||||
strategy.configureInitiateMultipartUploadRequest(initUploadRequest, metadata, null);
|
||||
Assert.assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, metadata.getSSEAlgorithm());
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.s3.encryption;
|
||||
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.nifi.processors.aws.s3.encryption.S3EncryptionTestUtil.createKey;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestServerSideCEncryptionStrategyKeyValidation {
|
||||
|
||||
private ServerSideCEncryptionStrategy strategy;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
strategy = new ServerSideCEncryptionStrategy();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValid256BitKey() {
|
||||
String key = createKey(256);
|
||||
|
||||
ValidationResult result = strategy.validateKey(key);
|
||||
|
||||
assertTrue(result.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotSupportedKeySize() {
|
||||
String key = createKey(512);
|
||||
|
||||
ValidationResult result = strategy.validateKey(key);
|
||||
|
||||
assertFalse(result.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullKey() {
|
||||
String key = null;
|
||||
|
||||
ValidationResult result = strategy.validateKey(key);
|
||||
|
||||
assertFalse(result.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyKey() {
|
||||
String key = "";
|
||||
|
||||
ValidationResult result = strategy.validateKey(key);
|
||||
|
||||
assertFalse(result.isValid());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotBase64EncodedKey() {
|
||||
String key = "NotBase64EncodedKey";
|
||||
|
||||
ValidationResult result = strategy.validateKey(key);
|
||||
|
||||
assertFalse(result.isValid());
|
||||
}
|
||||
}
|
@ -23,6 +23,7 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.amazonaws.services.s3.model.UploadPartRequest;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.MockPropertyValue;
|
||||
import org.junit.Assert;
|
||||
@ -37,26 +38,26 @@ public class TestStandardS3EncryptionService {
|
||||
private ConfigurationContext context;
|
||||
private String strategyName;
|
||||
private String keyIdOrMaterial;
|
||||
private String region;
|
||||
private String kmsRegion;
|
||||
|
||||
@Before
|
||||
public void setup() throws InitializationException {
|
||||
service = new StandardS3EncryptionService();
|
||||
context = Mockito.mock(ConfigurationContext.class);
|
||||
|
||||
strategyName = StandardS3EncryptionService.STRATEGY_NAME_NONE;
|
||||
strategyName = AmazonS3EncryptionService.STRATEGY_NAME_NONE;
|
||||
keyIdOrMaterial = "test-key-id";
|
||||
region = "us-west-1";
|
||||
kmsRegion = "us-west-1";
|
||||
|
||||
Mockito.when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new MockPropertyValue(strategyName));
|
||||
Mockito.when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new MockPropertyValue(keyIdOrMaterial));
|
||||
Mockito.when(context.getProperty(StandardS3EncryptionService.REGION)).thenReturn(new MockPropertyValue(region));
|
||||
Mockito.when(context.getProperty(StandardS3EncryptionService.KMS_REGION)).thenReturn(new MockPropertyValue(kmsRegion));
|
||||
service.onConfigured(context);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServiceProperties() {
|
||||
Assert.assertEquals(service.getRegion(), region);
|
||||
Assert.assertEquals(service.getKmsRegion(), kmsRegion);
|
||||
Assert.assertEquals(service.getStrategyName(), strategyName);
|
||||
}
|
||||
|
||||
@ -97,6 +98,6 @@ public class TestStandardS3EncryptionService {
|
||||
|
||||
Assert.assertEquals(properties.get(0).getName(), StandardS3EncryptionService.ENCRYPTION_STRATEGY.getName());
|
||||
Assert.assertEquals(properties.get(1).getName(), StandardS3EncryptionService.ENCRYPTION_VALUE.getName());
|
||||
Assert.assertEquals(properties.get(2).getName(), StandardS3EncryptionService.REGION.getName());
|
||||
Assert.assertEquals(properties.get(2).getName(), StandardS3EncryptionService.KMS_REGION.getName());
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,307 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.s3.encryption;
|
||||
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.NoOpProcessor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_CSE_C;
|
||||
import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_CSE_KMS;
|
||||
import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_NONE;
|
||||
import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_SSE_C;
|
||||
import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_SSE_KMS;
|
||||
import static org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService.STRATEGY_NAME_SSE_S3;
|
||||
import static org.apache.nifi.processors.aws.s3.encryption.S3EncryptionTestUtil.createKey;
|
||||
|
||||
public class TestStandardS3EncryptionServiceValidation {
|
||||
|
||||
private TestRunner runner;
|
||||
private StandardS3EncryptionService service;
|
||||
|
||||
@Before
|
||||
public void setUp() throws InitializationException {
|
||||
runner = TestRunners.newTestRunner(NoOpProcessor.class);
|
||||
service = new StandardS3EncryptionService();
|
||||
runner.addControllerService("s3-encryption-service", service);
|
||||
}
|
||||
|
||||
|
||||
// NoOpEncryptionStrategy
|
||||
|
||||
@Test
|
||||
public void testValidNoOpEncryptionStrategy() {
|
||||
configureService(STRATEGY_NAME_NONE, null);
|
||||
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidNoOpEncryptionStrategyBecauseKeyIdOrMaterialSpecified() {
|
||||
configureService(STRATEGY_NAME_NONE, "key-id");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidNoOpEncryptionStrategyBecauseKeyIdOrMaterialSpecifiedAsEmptyString() {
|
||||
configureService(STRATEGY_NAME_NONE, "");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidNoOpEncryptionStrategyBecauseKeyIdOrMaterialSpecifiedUsingEL() {
|
||||
configureService(STRATEGY_NAME_NONE, "${key-id-var}");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
|
||||
// ServerSideS3EncryptionStrategy
|
||||
|
||||
@Test
|
||||
public void testValidServerSideS3EncryptionStrategy() {
|
||||
configureService(STRATEGY_NAME_SSE_S3, null);
|
||||
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidServerSideS3EncryptionStrategyBecauseKeyIdOrMaterialSpecified() {
|
||||
configureService(STRATEGY_NAME_SSE_S3, "key-id");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidServerSideS3EncryptionStrategyBecauseKeyIdOrMaterialSpecifiedAsEmptyString() {
|
||||
configureService(STRATEGY_NAME_SSE_S3, "");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidServerSideS3EncryptionStrategyBecauseKeyIdOrMaterialSpecifiedUsingEL() {
|
||||
configureService(STRATEGY_NAME_SSE_S3, "${key-id-var}");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
|
||||
// ServerSideKMSEncryptionStrategy
|
||||
|
||||
@Test
|
||||
public void testValidServerSideKMSEncryptionStrategy() {
|
||||
configureService(STRATEGY_NAME_SSE_KMS, "key-id");
|
||||
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidServerSideKMSEncryptionStrategyUsingEL() {
|
||||
configureService(STRATEGY_NAME_SSE_KMS, "${key-id-var}");
|
||||
configureVariable("key-id-var", "key-id");
|
||||
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidServerSideKMSEncryptionStrategyBecauseKeyIdNotSpecified() {
|
||||
configureService(STRATEGY_NAME_SSE_KMS, null);
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidServerSideKMSEncryptionStrategyBecauseKeyIdSpecifiedAsEmptyString() {
|
||||
configureService(STRATEGY_NAME_SSE_KMS, "");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidServerSideKMSEncryptionStrategyBecauseKeyIdEvaluatedToNull() {
|
||||
configureService(STRATEGY_NAME_SSE_KMS, "${key-id-var}");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidServerSideKMSEncryptionStrategyBecauseKeyIdEvaluatedToEmptyString() {
|
||||
configureService(STRATEGY_NAME_SSE_KMS, "${key-id-var}");
|
||||
configureVariable("key-id-var", "");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
|
||||
// ServerSideCEncryptionStrategy
|
||||
|
||||
@Test
|
||||
public void testValidServerSideCEncryptionStrategy() {
|
||||
configureService(STRATEGY_NAME_SSE_C, createKey(256));
|
||||
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidServerSideCEncryptionStrategyUsingEL() {
|
||||
configureService(STRATEGY_NAME_SSE_C, "${key-material-var}");
|
||||
configureVariable("key-material-var", createKey(256));
|
||||
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidServerSideCEncryptionStrategyBecauseKeyMaterialNotSpecified() {
|
||||
configureService(STRATEGY_NAME_SSE_C, null);
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidServerSideCEncryptionStrategyBecauseKeyMaterialSpecifiedAsEmptyString() {
|
||||
configureService(STRATEGY_NAME_SSE_C, "");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidServerSideCEncryptionStrategyBecauseKeyMaterialEvaluatedToNull() {
|
||||
configureService(STRATEGY_NAME_SSE_C, "${key-material-var}");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidServerSideCEncryptionStrategyBecauseKeyMaterialEvaluatedToEmptyString() {
|
||||
configureService(STRATEGY_NAME_SSE_C, "${key-material-var}");
|
||||
configureVariable("key-material-var", "");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
|
||||
// ClientSideKMSEncryptionStrategy
|
||||
|
||||
@Test
|
||||
public void testValidClientSideKMSEncryptionStrategy() {
|
||||
configureService(STRATEGY_NAME_CSE_KMS, "key-id");
|
||||
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidClientSideKMSEncryptionStrategyUsingEL() {
|
||||
configureService(STRATEGY_NAME_CSE_KMS, "${key-id-var}");
|
||||
configureVariable("key-id-var", "key-id");
|
||||
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClientSideKMSEncryptionStrategyBecauseKeyIdNotSpecified() {
|
||||
configureService(STRATEGY_NAME_CSE_KMS, null);
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClientSideKMSEncryptionStrategyBecauseKeyIdSpecifiedAsEmptyString() {
|
||||
configureService(STRATEGY_NAME_CSE_KMS, "");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClientSideKMSEncryptionStrategyBecauseKeyIdEvaluatedToNull() {
|
||||
configureService(STRATEGY_NAME_CSE_KMS, "${key-id-var}");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClientSideKMSEncryptionStrategyBecauseKeyIdEvaluatedToEmptyString() {
|
||||
configureService(STRATEGY_NAME_CSE_KMS, "${key-id-var}");
|
||||
configureVariable("key-id-var", "");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
|
||||
// ClientSideCEncryptionStrategy
|
||||
|
||||
@Test
|
||||
public void testValidClientSideCEncryptionStrategy() {
|
||||
configureService(STRATEGY_NAME_CSE_C, createKey(256));
|
||||
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidClientSideCEncryptionStrategyUsingEL() {
|
||||
configureService(STRATEGY_NAME_CSE_C, "${key-material-var}");
|
||||
configureVariable("key-material-var", createKey(256));
|
||||
|
||||
runner.assertValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClientSideCEncryptionStrategyBecauseKeyMaterialNotSpecified() {
|
||||
configureService(STRATEGY_NAME_CSE_C, null);
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClientSideCEncryptionStrategyBecauseKeyMaterialSpecifiedAsEmptyString() {
|
||||
configureService(STRATEGY_NAME_CSE_C, "");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClientSideCEncryptionStrategyBecauseKeyMaterialEvaluatedToNull() {
|
||||
configureService(STRATEGY_NAME_CSE_C, "${key-material-var}");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidClientSideCEncryptionStrategyBecauseKeyMaterialEvaluatedToEmptyString() {
|
||||
configureService(STRATEGY_NAME_CSE_C, "${key-material-var}");
|
||||
configureVariable("key-material-var", "");
|
||||
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
|
||||
private void configureService(String encryptionStrategy, String keyIdOrMaterial) {
|
||||
runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_STRATEGY, encryptionStrategy);
|
||||
if (keyIdOrMaterial != null) {
|
||||
runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_VALUE, keyIdOrMaterial);
|
||||
}
|
||||
}
|
||||
|
||||
private void configureVariable(String name, String value) {
|
||||
runner.setVariable(name, value);
|
||||
}
|
||||
}
|
@ -31,6 +31,13 @@ import org.apache.nifi.controller.ControllerService;
|
||||
*/
|
||||
public interface AmazonS3EncryptionService extends ControllerService {
|
||||
|
||||
String STRATEGY_NAME_NONE = "NONE";
|
||||
String STRATEGY_NAME_SSE_S3 = "SSE_S3";
|
||||
String STRATEGY_NAME_SSE_KMS = "SSE_KMS";
|
||||
String STRATEGY_NAME_SSE_C = "SSE_C";
|
||||
String STRATEGY_NAME_CSE_KMS = "CSE_KMS";
|
||||
String STRATEGY_NAME_CSE_C = "CSE_C";
|
||||
|
||||
/**
|
||||
* Configure a {@link PutObjectRequest} for encryption.
|
||||
* @param request the request to configure.
|
||||
@ -69,12 +76,17 @@ public interface AmazonS3EncryptionService extends ControllerService {
|
||||
AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration);
|
||||
|
||||
/**
|
||||
* @return The region associated with the service, as a String.
|
||||
* @return The KMS region associated with the service, as a String.
|
||||
*/
|
||||
String getRegion();
|
||||
String getKmsRegion();
|
||||
|
||||
/**
|
||||
* @return The name of the encryption strategy associated with the service.
|
||||
*/
|
||||
String getStrategyName();
|
||||
|
||||
/**
|
||||
* @return The display name of the encryption strategy associated with the service.
|
||||
*/
|
||||
String getStrategyDisplayName();
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user