NIFI-11360 Added Client-Side Encryption for Azure Blob v12 Processors

This closes #7182

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
mkalavala 2023-04-17 16:49:49 -04:00 committed by exceptionfactory
parent e4cdb90a75
commit aad7b40bd9
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
9 changed files with 566 additions and 12 deletions

View File

@ -87,6 +87,14 @@
<groupId>com.azure</groupId> <groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId> <artifactId>azure-storage-blob</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob-cryptography</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-security-keyvault-keys</artifactId>
</dependency>
<!-- Legacy Microsoft Azure Libraries --> <!-- Legacy Microsoft Azure Libraries -->
<dependency> <dependency>
<groupId>com.microsoft.azure</groupId> <groupId>com.microsoft.azure</groupId>

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure;
import com.azure.core.cryptography.AsyncKeyEncryptionKey;
import com.azure.security.keyvault.keys.cryptography.KeyEncryptionKeyClientBuilder;
import com.azure.security.keyvault.keys.cryptography.models.KeyWrapAlgorithm;
import com.azure.security.keyvault.keys.models.JsonWebKey;
import com.azure.security.keyvault.keys.models.KeyOperation;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.specialized.cryptography.EncryptedBlobClientBuilder;
import com.azure.storage.blob.specialized.cryptography.EncryptionVersion;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.ClientSideEncryptionMethod;
import org.apache.nifi.util.StringUtils;
import javax.crypto.spec.SecretKeySpec;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
public interface ClientSideEncryptionSupport {
List<KeyOperation> KEY_OPERATIONS = Arrays.asList(KeyOperation.WRAP_KEY, KeyOperation.UNWRAP_KEY);
PropertyDescriptor CSE_KEY_TYPE = new PropertyDescriptor.Builder()
.name("Client-Side Encryption Key Type")
.displayName("Client-Side Encryption Key Type")
.required(true)
.allowableValues(ClientSideEncryptionMethod.class)
.defaultValue(ClientSideEncryptionMethod.NONE.getValue())
.description("Specifies the key type to use for client-side encryption.")
.build();
PropertyDescriptor CSE_KEY_ID = new PropertyDescriptor.Builder()
.name("Client-Side Encryption Key ID")
.displayName("Client-Side Encryption Key ID")
.description("Specifies the ID of the key to use for client-side encryption.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.dependsOn(CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL)
.build();
PropertyDescriptor CSE_LOCAL_KEY = new PropertyDescriptor.Builder()
.name("Client-Side Encryption Local Key")
.displayName("Client-Side Encryption Local Key")
.description("When using local client-side encryption, this is the raw key, encoded in hexadecimal")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.dependsOn(CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL)
.sensitive(true)
.build();
default Collection<ValidationResult> validateClientSideEncryptionProperties(ValidationContext validationContext) {
final List<ValidationResult> validationResults = new ArrayList<>();
final String cseKeyTypeValue = validationContext.getProperty(CSE_KEY_TYPE).getValue();
final ClientSideEncryptionMethod cseKeyType = ClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
final String cseKeyId = validationContext.getProperty(CSE_KEY_ID).getValue();
final String cseLocalKey = validationContext.getProperty(CSE_LOCAL_KEY).getValue();
if (cseKeyType != ClientSideEncryptionMethod.NONE && StringUtils.isBlank(cseKeyId)) {
validationResults.add(new ValidationResult.Builder().subject(CSE_KEY_ID.getDisplayName())
.explanation("Key ID must be set when client-side encryption is enabled").build());
}
if (ClientSideEncryptionMethod.LOCAL == cseKeyType) {
validationResults.addAll(validateLocalKey(cseLocalKey));
}
return validationResults;
}
default List<ValidationResult> validateLocalKey(String keyHex) {
final List<ValidationResult> validationResults = new ArrayList<>();
if (StringUtils.isBlank(keyHex)) {
validationResults.add(new ValidationResult.Builder().subject(CSE_LOCAL_KEY.getDisplayName())
.explanation("Key must be set when client-side encryption is enabled").build());
} else {
try {
final byte[] keyBytes = Hex.decodeHex(keyHex);
if (getKeyWrapAlgorithm(keyBytes).isEmpty()) {
validationResults.add(new ValidationResult.Builder().subject(CSE_LOCAL_KEY.getDisplayName())
.explanation(String.format("Key size in bits must be one of [128, 192, 256, 384, 512] instead of [%d]", keyBytes.length * 8)).build());
}
} catch (DecoderException e) {
validationResults.add(new ValidationResult.Builder().subject(CSE_LOCAL_KEY.getDisplayName())
.explanation("Key must be a valid hexadecimal string").build());
} catch (IllegalArgumentException e) {
validationResults.add(new ValidationResult.Builder().subject(CSE_LOCAL_KEY.getDisplayName())
.explanation(e.getMessage()).build());
}
}
return validationResults;
}
default boolean isClientSideEncryptionEnabled(PropertyContext context) {
final String cseKeyTypeValue = context.getProperty(CSE_KEY_TYPE).getValue();
final ClientSideEncryptionMethod cseKeyType = ClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
return cseKeyType != ClientSideEncryptionMethod.NONE;
}
default BlobClient getEncryptedBlobClient(PropertyContext context, BlobContainerClient containerClient, String blobName) throws DecoderException {
final String cseKeyId = context.getProperty(CSE_KEY_ID).getValue();
final String cseLocalKeyHex = context.getProperty(CSE_LOCAL_KEY).getValue();
final BlobClient blobClient = containerClient.getBlobClient(blobName);
final byte[] keyBytes = Hex.decodeHex(cseLocalKeyHex);
JsonWebKey localKey = JsonWebKey.fromAes(new SecretKeySpec(keyBytes, "AES"), KEY_OPERATIONS)
.setId(cseKeyId);
AsyncKeyEncryptionKey akek = new KeyEncryptionKeyClientBuilder()
.buildAsyncKeyEncryptionKey(localKey).block();
final String keyWrapAlgorithm = getKeyWrapAlgorithm(keyBytes).orElseThrow(() -> new IllegalArgumentException("Failed to derive key wrap algorithm"));
return new EncryptedBlobClientBuilder(EncryptionVersion.V2)
.key(akek, keyWrapAlgorithm)
.blobClient(blobClient)
.buildEncryptedBlobClient();
}
default Optional<String> getKeyWrapAlgorithm(byte[] keyBytes) {
final int keySize128 = 16;
final int keySize192 = 24;
final int keySize256 = 32;
final int keySize384 = 48;
final int keySize512 = 64;
switch (keyBytes.length) {
case keySize128:
return Optional.of(KeyWrapAlgorithm.A128KW.toString());
case keySize192:
return Optional.of(KeyWrapAlgorithm.A192KW.toString());
case keySize256:
case keySize384:
case keySize512:
// Default to longest allowed key length for wrap
return Optional.of(KeyWrapAlgorithm.A256KW.toString());
default:
return Optional.empty();
}
}
}

View File

@ -28,6 +28,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
@ -36,9 +38,12 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
import org.apache.nifi.processors.azure.ClientSideEncryptionSupport;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -76,7 +81,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
@WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG), @WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG),
@WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP), @WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP),
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)}) @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 implements ClientSideEncryptionSupport {
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder() public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AzureStorageUtils.CONTAINER) .fromPropertyDescriptor(AzureStorageUtils.CONTAINER)
@ -114,9 +119,19 @@ public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
BLOB_NAME, BLOB_NAME,
RANGE_START, RANGE_START,
RANGE_LENGTH, RANGE_LENGTH,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
CSE_KEY_TYPE,
CSE_KEY_ID,
CSE_LOCAL_KEY
)); ));
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
results.addAll(validateClientSideEncryptionProperties(validationContext));
return results;
}
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES; return PROPERTIES;
@ -139,9 +154,14 @@ public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
try { try {
BlobServiceClient storageClient = getStorageClient(); BlobServiceClient storageClient = getStorageClient();
BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName); BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
BlobClient blobClient = containerClient.getBlobClient(blobName); final BlobClient blobClient;
if (isClientSideEncryptionEnabled(context)) {
blobClient = getEncryptedBlobClient(context, containerClient, blobName);
} else{
blobClient = containerClient.getBlobClient(blobName);
}
flowFile = session.write(flowFile, os -> blobClient.downloadWithResponse(os, new BlobRange(rangeStart, rangeLength), null, null, false, null, null)); flowFile = session.write(flowFile, os -> blobClient.downloadStreamWithResponse(os, new BlobRange(rangeStart, rangeLength), null, null, false, null, null));
Map<String, String> attributes = createBlobAttributesMap(blobClient); Map<String, String> attributes = createBlobAttributesMap(blobClient);
flowFile = session.putAllAttributes(flowFile, attributes); flowFile = session.putAllAttributes(flowFile, attributes);

View File

@ -35,6 +35,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -42,11 +44,14 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
import org.apache.nifi.processors.azure.ClientSideEncryptionSupport;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy; import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -93,7 +98,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH), @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH),
@WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description = ATTR_DESCRIPTION_ERROR_CODE), @WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description = ATTR_DESCRIPTION_ERROR_CODE),
@WritesAttribute(attribute = ATTR_NAME_IGNORED, description = ATTR_DESCRIPTION_IGNORED)}) @WritesAttribute(attribute = ATTR_NAME_IGNORED, description = ATTR_DESCRIPTION_IGNORED)})
public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 implements ClientSideEncryptionSupport {
public static final PropertyDescriptor CREATE_CONTAINER = new PropertyDescriptor.Builder() public static final PropertyDescriptor CREATE_CONTAINER = new PropertyDescriptor.Builder()
.name("create-container") .name("create-container")
@ -124,9 +129,19 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
CREATE_CONTAINER, CREATE_CONTAINER,
CONFLICT_RESOLUTION, CONFLICT_RESOLUTION,
BLOB_NAME, BLOB_NAME,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
CSE_KEY_TYPE,
CSE_KEY_ID,
CSE_LOCAL_KEY
)); ));
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
results.addAll(validateClientSideEncryptionProperties(validationContext));
return results;
}
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES; return PROPERTIES;
@ -151,7 +166,13 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
containerClient.create(); containerClient.create();
} }
BlobClient blobClient = containerClient.getBlobClient(blobName); final BlobClient blobClient;
if (isClientSideEncryptionEnabled(context)) {
blobClient = getEncryptedBlobClient(context, containerClient, blobName);
} else {
blobClient = containerClient.getBlobClient(blobName);
}
final BlobRequestConditions blobRequestConditions = new BlobRequestConditions(); final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
applyStandardBlobAttributes(attributes, blobClient); applyStandardBlobAttributes(attributes, blobClient);
@ -207,4 +228,5 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
attributes.put(ATTR_NAME_LANG, null); attributes.put(ATTR_NAME_LANG, null);
attributes.put(ATTR_NAME_MIME_TYPE, APPLICATION_OCTET_STREAM); attributes.put(ATTR_NAME_MIME_TYPE, APPLICATION_OCTET_STREAM);
} }
} }

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import org.apache.nifi.components.DescribedValue;
/**
* Enumeration capturing essential information about the various client-side
* encryption methods supported by Azure
*/
public enum ClientSideEncryptionMethod implements DescribedValue {
NONE("Client-Side Encryption disabled"),
LOCAL("Client-Side Encryption enabled using local key");
private final String description;
ClientSideEncryptionMethod(String description) {
this.description = description;
}
@Override
public String getValue() {
return this.name();
}
@Override
public String getDisplayName() {
return this.name();
}
public String getDescription() {
return description;
}
@Override
public String toString() {
return description;
}
}

View File

@ -16,12 +16,19 @@
*/ */
package org.apache.nifi.processors.azure.storage; package org.apache.nifi.processors.azure.storage;
import com.azure.core.cryptography.AsyncKeyEncryptionKey;
import com.azure.security.keyvault.keys.cryptography.KeyEncryptionKeyClientBuilder;
import com.azure.security.keyvault.keys.models.JsonWebKey;
import com.azure.security.keyvault.keys.models.KeyOperation;
import com.azure.storage.blob.BlobClient; import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder; import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobType; import com.azure.storage.blob.models.BlobType;
import com.azure.storage.blob.specialized.cryptography.EncryptedBlobClientBuilder;
import com.azure.storage.blob.specialized.cryptography.EncryptionVersion;
import com.azure.storage.common.StorageSharedKeyCredential; import com.azure.storage.common.StorageSharedKeyCredential;
import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
import org.apache.nifi.processors.azure.AzureServiceEndpoints; import org.apache.nifi.processors.azure.AzureServiceEndpoints;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
@ -33,10 +40,12 @@ import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import javax.crypto.spec.SecretKeySpec;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@ -47,6 +56,13 @@ public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorag
protected static final String BLOB_NAME = "blob1"; protected static final String BLOB_NAME = "blob1";
protected static final byte[] BLOB_DATA = "0123456789".getBytes(StandardCharsets.UTF_8); protected static final byte[] BLOB_DATA = "0123456789".getBytes(StandardCharsets.UTF_8);
protected static final String KEY_ID_VALUE = "key:id";
protected static final String KEY_64B_VALUE = "1234567890ABCDEF";
protected static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
protected static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
protected static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
protected static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
protected static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
protected static final String EL_CONTAINER_NAME = "az.containername"; protected static final String EL_CONTAINER_NAME = "az.containername";
protected static final String EL_BLOB_NAME = "az.blobname"; protected static final String EL_BLOB_NAME = "az.blobname";
@ -128,6 +144,26 @@ public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorag
return blobClient; return blobClient;
} }
protected BlobClient uploadBlobWithCSE(String blobName, byte[] blobData, String hexKey, String keyId, String keyWrapAlgorithm) throws Exception {
BlobClient blobClient = containerClient.getBlobClient(blobName);
byte[] keyBytes = Hex.decodeHex(hexKey.toCharArray());
JsonWebKey localKey = JsonWebKey.fromAes(new SecretKeySpec(keyBytes, "AES"),
Arrays.asList(KeyOperation.WRAP_KEY, KeyOperation.UNWRAP_KEY))
.setId(keyId);
AsyncKeyEncryptionKey akek = new KeyEncryptionKeyClientBuilder()
.buildAsyncKeyEncryptionKey(localKey).block();
BlobClient encryptedBlobClient = new EncryptedBlobClientBuilder(EncryptionVersion.V2)
.key(akek, keyWrapAlgorithm)
.blobClient(blobClient)
.buildEncryptedBlobClient();
encryptedBlobClient.upload(new ByteArrayInputStream(blobData), blobData.length);
// waiting for the blob to be available
Thread.sleep(1000);
return encryptedBlobClient;
}
protected Map<String, String> initCommonExpressionLanguageAttributes() { protected Map<String, String> initCommonExpressionLanguageAttributes() {
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
attributes.put(EL_CONTAINER_NAME, getContainerName()); attributes.put(EL_CONTAINER_NAME, getContainerName());

View File

@ -16,7 +16,10 @@
*/ */
package org.apache.nifi.processors.azure.storage; package org.apache.nifi.processors.azure.storage;
import com.azure.security.keyvault.keys.cryptography.models.KeyWrapAlgorithm;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.ClientSideEncryptionSupport;
import org.apache.nifi.processors.azure.storage.utils.ClientSideEncryptionMethod;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
@ -53,6 +56,19 @@ public class ITFetchAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
assertSuccess(BLOB_NAME, BLOB_DATA); assertSuccess(BLOB_NAME, BLOB_DATA);
} }
@Test
public void testFetchBlobWithCSE() throws Exception {
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_128B_VALUE);
uploadBlobWithCSE(BLOB_NAME, BLOB_DATA, KEY_128B_VALUE, KEY_ID_VALUE, KeyWrapAlgorithm.A128KW.toString());
runProcessor();
//cannot validate blob size as azure api does not expose unencrypted data length
assertFlowFile(BLOB_NAME, BLOB_DATA, null);
assertProvenanceEvents();
}
@Test @Test
public void testFetchBlobWithSimpleNameUsingProxyConfigurationService() throws Exception { public void testFetchBlobWithSimpleNameUsingProxyConfigurationService() throws Exception {
uploadBlob(BLOB_NAME, BLOB_DATA); uploadBlob(BLOB_NAME, BLOB_DATA);
@ -186,14 +202,15 @@ public class ITFetchAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
assertProvenanceEvents(); assertProvenanceEvents();
} }
private void assertFlowFile(String blobName, byte[] blobData, int originalLength) throws Exception { private void assertFlowFile(String blobName, byte[] blobData, Integer originalLength) throws Exception {
runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage_v12.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage_v12.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchAzureBlobStorage_v12.REL_SUCCESS).get(0); MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchAzureBlobStorage_v12.REL_SUCCESS).get(0);
assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), blobName); assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), blobName);
assertFlowFileResultBlobAttributes(flowFile, originalLength); if(originalLength != null) {
assertFlowFileResultBlobAttributes(flowFile, originalLength);
}
flowFile.assertContentEquals(blobData); flowFile.assertContentEquals(blobData);
} }

View File

@ -20,7 +20,9 @@ import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobErrorCode; import com.azure.storage.blob.models.BlobErrorCode;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.ClientSideEncryptionSupport;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.ClientSideEncryptionMethod;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy; import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
@ -40,8 +42,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT { public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
public static class ITProcessor extends PutAzureBlobStorage_v12 { public static class ITProcessor extends PutAzureBlobStorage_v12 {
public boolean blobMetadataApplied = false; public boolean blobMetadataApplied = false;
@Override @Override
protected void applyBlobMetadata(Map<String, String> attributes, BlobClient blobClient) { protected void applyBlobMetadata(Map<String, String> attributes, BlobClient blobClient) {
super.applyBlobMetadata(attributes, blobClient); super.applyBlobMetadata(attributes, blobClient);
@ -190,6 +194,60 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA); assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
} }
@Test
public void testPutBlob64BLocalCSE() {
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_64B_VALUE);
runner.assertNotValid();
}
@Test
public void testPutBlob128BLocalCSE() throws Exception {
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_128B_VALUE);
runProcessor(BLOB_DATA);
assertSuccessForCSE(getContainerName(), BLOB_NAME, BLOB_DATA);
}
@Test
public void testPutBlob192BLocalCSE() throws Exception {
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_192B_VALUE);
runProcessor(BLOB_DATA);
assertSuccessForCSE(getContainerName(), BLOB_NAME, BLOB_DATA);
}
@Test
public void testPutBlob256BLocalCSE() throws Exception {
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_256B_VALUE);
runProcessor(BLOB_DATA);
assertSuccessForCSE(getContainerName(), BLOB_NAME, BLOB_DATA);
}
@Test
public void testPutBlob384BLocalCSE() throws Exception {
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_384B_VALUE);
runProcessor(BLOB_DATA);
assertSuccessForCSE(getContainerName(), BLOB_NAME, BLOB_DATA);
}
@Test
public void testPutBlob512BLocalCSE() throws Exception {
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_512B_VALUE);
runProcessor(BLOB_DATA);
assertSuccessForCSE(getContainerName(), BLOB_NAME, BLOB_DATA);
}
private void runProcessor(byte[] data) { private void runProcessor(byte[] data) {
runProcessor(data, Collections.emptyMap()); runProcessor(data, Collections.emptyMap());
} }
@ -207,6 +265,13 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
return flowFile; return flowFile;
} }
private MockFlowFile assertSuccessForCSE(String containerName, String blobName, byte[] blobData) throws Exception {
MockFlowFile flowFile = assertFlowFile(containerName, blobName, blobData);
assertAzureBlobExists(containerName, blobName);
assertProvenanceEvents();
return flowFile;
}
private MockFlowFile assertIgnored(String containerName, String blobName) throws Exception { private MockFlowFile assertIgnored(String containerName, String blobName) throws Exception {
MockFlowFile flowFile = assertFlowFile(containerName, blobName, null); MockFlowFile flowFile = assertFlowFile(containerName, blobName, null);
assertProvenanceEvents(); assertProvenanceEvents();
@ -222,16 +287,21 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
if (blobData != null) { if (blobData != null) {
assertFlowFileResultBlobAttributes(flowFile, blobData.length); assertFlowFileResultBlobAttributes(flowFile, blobData.length);
flowFile.assertContentEquals(blobData); flowFile.assertContentEquals(blobData);
flowFile.assertAttributeEquals("azure.length", String.valueOf(blobData.length));
} }
return flowFile; return flowFile;
} }
private void assertAzureBlob(String containerName, String blobName, byte[] blobData) { private void assertAzureBlob(String containerName, String blobName, byte[] blobData) {
BlobClient blobClient = assertAzureBlobExists(containerName, blobName);
assertEquals(blobData.length, blobClient.getProperties().getBlobSize());
}
private BlobClient assertAzureBlobExists(String containerName, String blobName) {
BlobContainerClient containerClient = getStorageClient().getBlobContainerClient(containerName); BlobContainerClient containerClient = getStorageClient().getBlobContainerClient(containerName);
BlobClient blobClient = containerClient.getBlobClient(blobName); BlobClient blobClient = containerClient.getBlobClient(blobName);
assertTrue(blobClient.exists()); assertTrue(blobClient.exists());
assertEquals(blobData.length, blobClient.getProperties().getBlobSize()); return blobClient;
} }
private void assertProvenanceEvents() { private void assertProvenanceEvents() {

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processors.azure.storage.utils.ClientSideEncryptionMethod;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockValidationContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collection;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestClientSideEncryptionSupport {
private static final String KEY_ID_VALUE = "key:id";
private static final String KEY_64B_VALUE = "1234567890ABCDEF";
private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
private MockProcessContext processContext;
private MockValidationContext validationContext;
private PutAzureBlobStorage_v12 putAzureBlobStorage_v12;
@BeforeEach
public void setUp() {
putAzureBlobStorage_v12 = new PutAzureBlobStorage_v12();
processContext = new MockProcessContext(putAzureBlobStorage_v12);
validationContext = new MockValidationContext(processContext);
}
@Test
public void testNoCesConfiguredOnProcessor() {
configureProcessorProperties(ClientSideEncryptionMethod.NONE, null, null);
Collection<ValidationResult> result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testLocalCesNoKeyIdOnProcessor() {
configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, null, KEY_128B_VALUE);
Collection<ValidationResult> result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
assertNotValid(result);
}
@Test
public void testLocalCesNoKeyOnProcessor() {
configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, null);
Collection<ValidationResult> result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
assertNotValid(result);
}
@Test
public void testLocalCesInvalidHexKeyOnProcessor() {
configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, "ZZ");
Collection<ValidationResult> result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
assertNotValid(result);
}
@Test
public void testLocalCesInvalidKeyLengthOnProcessor() {
configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, KEY_64B_VALUE);
Collection<ValidationResult> result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
assertNotValid(result);
assertContains(result, "Key size in bits must be one of [128, 192, 256, 384, 512] instead of [64]");
}
@Test
public void testLocalCes128BitKeyOnProcessor() {
configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, KEY_128B_VALUE);
Collection<ValidationResult> result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testLocalCes192BitKeyOnProcessor() {
configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, KEY_192B_VALUE);
Collection<ValidationResult> result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testLocalCes256BitKeyOnProcessor() {
configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, KEY_256B_VALUE);
Collection<ValidationResult> result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testLocalCes384BitKeyOnProcessor() {
configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, KEY_384B_VALUE);
Collection<ValidationResult> result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testLocalCes512BitKeyOnProcessor() {
configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, KEY_512B_VALUE);
Collection<ValidationResult> result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
private void configureProcessorProperties(ClientSideEncryptionMethod keyType, String keyId, String localKeyHex) {
if (keyType != null) {
processContext.setProperty(putAzureBlobStorage_v12.CSE_KEY_TYPE, keyType.getValue());
}
if (keyId != null) {
processContext.setProperty(putAzureBlobStorage_v12.CSE_KEY_ID, keyId);
}
if (localKeyHex != null) {
processContext.setProperty(putAzureBlobStorage_v12.CSE_LOCAL_KEY, localKeyHex);
}
}
private void assertValid(Collection<ValidationResult> result) {
assertTrue(result.isEmpty(), "There should be no validation error");
}
private void assertNotValid(Collection<ValidationResult> result) {
assertFalse(result.isEmpty(), "There should be validation error");
}
private void assertContains(Collection<ValidationResult> result, String explaination) {
assertFalse(result.isEmpty(), "There should be validation error");
assertTrue(result.stream().filter(v -> v.getExplanation().contains(explaination)).findFirst().isPresent());
}
}