diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 624bc50878..197ab20e76 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -87,6 +87,14 @@
com.azure
azure-storage-blob
+
+ com.azure
+ azure-storage-blob-cryptography
+
+
+ com.azure
+ azure-security-keyvault-keys
+
com.microsoft.azure
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/ClientSideEncryptionSupport.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/ClientSideEncryptionSupport.java
new file mode 100644
index 0000000000..af3c90e3ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/ClientSideEncryptionSupport.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import com.azure.core.cryptography.AsyncKeyEncryptionKey;
+import com.azure.security.keyvault.keys.cryptography.KeyEncryptionKeyClientBuilder;
+import com.azure.security.keyvault.keys.cryptography.models.KeyWrapAlgorithm;
+import com.azure.security.keyvault.keys.models.JsonWebKey;
+import com.azure.security.keyvault.keys.models.KeyOperation;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.specialized.cryptography.EncryptedBlobClientBuilder;
+import com.azure.storage.blob.specialized.cryptography.EncryptionVersion;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.ClientSideEncryptionMethod;
+import org.apache.nifi.util.StringUtils;
+
+import javax.crypto.spec.SecretKeySpec;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+public interface ClientSideEncryptionSupport {
+ List KEY_OPERATIONS = Arrays.asList(KeyOperation.WRAP_KEY, KeyOperation.UNWRAP_KEY);
+
+ PropertyDescriptor CSE_KEY_TYPE = new PropertyDescriptor.Builder()
+ .name("Client-Side Encryption Key Type")
+ .displayName("Client-Side Encryption Key Type")
+ .required(true)
+ .allowableValues(ClientSideEncryptionMethod.class)
+ .defaultValue(ClientSideEncryptionMethod.NONE.getValue())
+ .description("Specifies the key type to use for client-side encryption.")
+ .build();
+
+ PropertyDescriptor CSE_KEY_ID = new PropertyDescriptor.Builder()
+ .name("Client-Side Encryption Key ID")
+ .displayName("Client-Side Encryption Key ID")
+ .description("Specifies the ID of the key to use for client-side encryption.")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL)
+ .build();
+
+ PropertyDescriptor CSE_LOCAL_KEY = new PropertyDescriptor.Builder()
+ .name("Client-Side Encryption Local Key")
+ .displayName("Client-Side Encryption Local Key")
+ .description("When using local client-side encryption, this is the raw key, encoded in hexadecimal")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL)
+ .sensitive(true)
+ .build();
+
+ default Collection validateClientSideEncryptionProperties(ValidationContext validationContext) {
+ final List validationResults = new ArrayList<>();
+ final String cseKeyTypeValue = validationContext.getProperty(CSE_KEY_TYPE).getValue();
+ final ClientSideEncryptionMethod cseKeyType = ClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
+ final String cseKeyId = validationContext.getProperty(CSE_KEY_ID).getValue();
+ final String cseLocalKey = validationContext.getProperty(CSE_LOCAL_KEY).getValue();
+ if (cseKeyType != ClientSideEncryptionMethod.NONE && StringUtils.isBlank(cseKeyId)) {
+ validationResults.add(new ValidationResult.Builder().subject(CSE_KEY_ID.getDisplayName())
+ .explanation("Key ID must be set when client-side encryption is enabled").build());
+ }
+ if (ClientSideEncryptionMethod.LOCAL == cseKeyType) {
+ validationResults.addAll(validateLocalKey(cseLocalKey));
+ }
+ return validationResults;
+ }
+
+ default List validateLocalKey(String keyHex) {
+ final List validationResults = new ArrayList<>();
+ if (StringUtils.isBlank(keyHex)) {
+ validationResults.add(new ValidationResult.Builder().subject(CSE_LOCAL_KEY.getDisplayName())
+ .explanation("Key must be set when client-side encryption is enabled").build());
+ } else {
+ try {
+ final byte[] keyBytes = Hex.decodeHex(keyHex);
+ if (getKeyWrapAlgorithm(keyBytes).isEmpty()) {
+ validationResults.add(new ValidationResult.Builder().subject(CSE_LOCAL_KEY.getDisplayName())
+ .explanation(String.format("Key size in bits must be one of [128, 192, 256, 384, 512] instead of [%d]", keyBytes.length * 8)).build());
+ }
+ } catch (DecoderException e) {
+ validationResults.add(new ValidationResult.Builder().subject(CSE_LOCAL_KEY.getDisplayName())
+ .explanation("Key must be a valid hexadecimal string").build());
+ } catch (IllegalArgumentException e) {
+ validationResults.add(new ValidationResult.Builder().subject(CSE_LOCAL_KEY.getDisplayName())
+ .explanation(e.getMessage()).build());
+ }
+ }
+
+ return validationResults;
+ }
+
+ default boolean isClientSideEncryptionEnabled(PropertyContext context) {
+ final String cseKeyTypeValue = context.getProperty(CSE_KEY_TYPE).getValue();
+ final ClientSideEncryptionMethod cseKeyType = ClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
+ return cseKeyType != ClientSideEncryptionMethod.NONE;
+ }
+
+ default BlobClient getEncryptedBlobClient(PropertyContext context, BlobContainerClient containerClient, String blobName) throws DecoderException {
+ final String cseKeyId = context.getProperty(CSE_KEY_ID).getValue();
+ final String cseLocalKeyHex = context.getProperty(CSE_LOCAL_KEY).getValue();
+ final BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final byte[] keyBytes = Hex.decodeHex(cseLocalKeyHex);
+ JsonWebKey localKey = JsonWebKey.fromAes(new SecretKeySpec(keyBytes, "AES"), KEY_OPERATIONS)
+ .setId(cseKeyId);
+ AsyncKeyEncryptionKey akek = new KeyEncryptionKeyClientBuilder()
+ .buildAsyncKeyEncryptionKey(localKey).block();
+ final String keyWrapAlgorithm = getKeyWrapAlgorithm(keyBytes).orElseThrow(() -> new IllegalArgumentException("Failed to derive key wrap algorithm"));
+
+ return new EncryptedBlobClientBuilder(EncryptionVersion.V2)
+ .key(akek, keyWrapAlgorithm)
+ .blobClient(blobClient)
+ .buildEncryptedBlobClient();
+ }
+
+ default Optional getKeyWrapAlgorithm(byte[] keyBytes) {
+ final int keySize128 = 16;
+ final int keySize192 = 24;
+ final int keySize256 = 32;
+ final int keySize384 = 48;
+ final int keySize512 = 64;
+ switch (keyBytes.length) {
+ case keySize128:
+ return Optional.of(KeyWrapAlgorithm.A128KW.toString());
+ case keySize192:
+ return Optional.of(KeyWrapAlgorithm.A192KW.toString());
+ case keySize256:
+ case keySize384:
+ case keySize512:
+ // Default to longest allowed key length for wrap
+ return Optional.of(KeyWrapAlgorithm.A256KW.toString());
+ default:
+ return Optional.empty();
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
index 0a12de3051..775c9d7ab0 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
@@ -28,6 +28,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
@@ -36,9 +38,12 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
+import org.apache.nifi.processors.azure.ClientSideEncryptionSupport;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -76,7 +81,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
@WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG),
@WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP),
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
-public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
+public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 implements ClientSideEncryptionSupport {
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AzureStorageUtils.CONTAINER)
@@ -114,9 +119,19 @@ public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
BLOB_NAME,
RANGE_START,
RANGE_LENGTH,
- AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
+ CSE_KEY_TYPE,
+ CSE_KEY_ID,
+ CSE_LOCAL_KEY
));
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final List results = new ArrayList<>(super.customValidate(validationContext));
+ results.addAll(validateClientSideEncryptionProperties(validationContext));
+ return results;
+ }
+
@Override
protected List getSupportedPropertyDescriptors() {
return PROPERTIES;
@@ -139,9 +154,14 @@ public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
try {
BlobServiceClient storageClient = getStorageClient();
BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
- BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobClient blobClient;
+ if (isClientSideEncryptionEnabled(context)) {
+ blobClient = getEncryptedBlobClient(context, containerClient, blobName);
+ } else{
+ blobClient = containerClient.getBlobClient(blobName);
+ }
- flowFile = session.write(flowFile, os -> blobClient.downloadWithResponse(os, new BlobRange(rangeStart, rangeLength), null, null, false, null, null));
+ flowFile = session.write(flowFile, os -> blobClient.downloadStreamWithResponse(os, new BlobRange(rangeStart, rangeLength), null, null, false, null, null));
Map attributes = createBlobAttributesMap(blobClient);
flowFile = session.putAllAttributes(flowFile, attributes);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
index 3b7896e5c3..a0aeb4e297 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
@@ -35,6 +35,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
@@ -42,11 +44,14 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
+import org.apache.nifi.processors.azure.ClientSideEncryptionSupport;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -93,7 +98,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH),
@WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description = ATTR_DESCRIPTION_ERROR_CODE),
@WritesAttribute(attribute = ATTR_NAME_IGNORED, description = ATTR_DESCRIPTION_IGNORED)})
-public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
+public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 implements ClientSideEncryptionSupport {
public static final PropertyDescriptor CREATE_CONTAINER = new PropertyDescriptor.Builder()
.name("create-container")
@@ -124,9 +129,19 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
CREATE_CONTAINER,
CONFLICT_RESOLUTION,
BLOB_NAME,
- AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
+ CSE_KEY_TYPE,
+ CSE_KEY_ID,
+ CSE_LOCAL_KEY
));
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final List results = new ArrayList<>(super.customValidate(validationContext));
+ results.addAll(validateClientSideEncryptionProperties(validationContext));
+ return results;
+ }
+
@Override
protected List getSupportedPropertyDescriptors() {
return PROPERTIES;
@@ -151,7 +166,13 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
containerClient.create();
}
- BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobClient blobClient;
+ if (isClientSideEncryptionEnabled(context)) {
+ blobClient = getEncryptedBlobClient(context, containerClient, blobName);
+ } else {
+ blobClient = containerClient.getBlobClient(blobName);
+ }
+
final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
Map attributes = new HashMap<>();
applyStandardBlobAttributes(attributes, blobClient);
@@ -207,4 +228,5 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
attributes.put(ATTR_NAME_LANG, null);
attributes.put(ATTR_NAME_MIME_TYPE, APPLICATION_OCTET_STREAM);
}
+
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ClientSideEncryptionMethod.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ClientSideEncryptionMethod.java
new file mode 100644
index 0000000000..f75754732d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/ClientSideEncryptionMethod.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Enumeration capturing essential information about the various client-side
+ * encryption methods supported by Azure
+ */
+public enum ClientSideEncryptionMethod implements DescribedValue {
+
+ NONE("Client-Side Encryption disabled"),
+ LOCAL("Client-Side Encryption enabled using local key");
+
+ private final String description;
+
+ ClientSideEncryptionMethod(String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return this.name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return this.name();
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
index 9d2af5d068..c9716e80d3 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
@@ -16,12 +16,19 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.azure.core.cryptography.AsyncKeyEncryptionKey;
+import com.azure.security.keyvault.keys.cryptography.KeyEncryptionKeyClientBuilder;
+import com.azure.security.keyvault.keys.models.JsonWebKey;
+import com.azure.security.keyvault.keys.models.KeyOperation;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobType;
+import com.azure.storage.blob.specialized.cryptography.EncryptedBlobClientBuilder;
+import com.azure.storage.blob.specialized.cryptography.EncryptionVersion;
import com.azure.storage.common.StorageSharedKeyCredential;
+import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
import org.apache.nifi.processors.azure.AzureServiceEndpoints;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
@@ -33,10 +40,12 @@ import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import javax.crypto.spec.SecretKeySpec;
import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -47,6 +56,13 @@ public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorag
protected static final String BLOB_NAME = "blob1";
protected static final byte[] BLOB_DATA = "0123456789".getBytes(StandardCharsets.UTF_8);
+ protected static final String KEY_ID_VALUE = "key:id";
+ protected static final String KEY_64B_VALUE = "1234567890ABCDEF";
+ protected static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
+ protected static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
+ protected static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
+ protected static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
+ protected static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
protected static final String EL_CONTAINER_NAME = "az.containername";
protected static final String EL_BLOB_NAME = "az.blobname";
@@ -128,6 +144,26 @@ public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorag
return blobClient;
}
+ protected BlobClient uploadBlobWithCSE(String blobName, byte[] blobData, String hexKey, String keyId, String keyWrapAlgorithm) throws Exception {
+ BlobClient blobClient = containerClient.getBlobClient(blobName);
+ byte[] keyBytes = Hex.decodeHex(hexKey.toCharArray());
+ JsonWebKey localKey = JsonWebKey.fromAes(new SecretKeySpec(keyBytes, "AES"),
+ Arrays.asList(KeyOperation.WRAP_KEY, KeyOperation.UNWRAP_KEY))
+ .setId(keyId);
+ AsyncKeyEncryptionKey akek = new KeyEncryptionKeyClientBuilder()
+ .buildAsyncKeyEncryptionKey(localKey).block();
+ BlobClient encryptedBlobClient = new EncryptedBlobClientBuilder(EncryptionVersion.V2)
+ .key(akek, keyWrapAlgorithm)
+ .blobClient(blobClient)
+ .buildEncryptedBlobClient();
+ encryptedBlobClient.upload(new ByteArrayInputStream(blobData), blobData.length);
+
+ // waiting for the blob to be available
+ Thread.sleep(1000);
+
+ return encryptedBlobClient;
+ }
+
protected Map initCommonExpressionLanguageAttributes() {
Map attributes = new HashMap<>();
attributes.put(EL_CONTAINER_NAME, getContainerName());
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java
index 270948fb8e..7f7e91bea4 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java
@@ -16,7 +16,10 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.azure.security.keyvault.keys.cryptography.models.KeyWrapAlgorithm;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.ClientSideEncryptionSupport;
+import org.apache.nifi.processors.azure.storage.utils.ClientSideEncryptionMethod;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
@@ -53,6 +56,19 @@ public class ITFetchAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
assertSuccess(BLOB_NAME, BLOB_DATA);
}
+ @Test
+ public void testFetchBlobWithCSE() throws Exception {
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_128B_VALUE);
+ uploadBlobWithCSE(BLOB_NAME, BLOB_DATA, KEY_128B_VALUE, KEY_ID_VALUE, KeyWrapAlgorithm.A128KW.toString());
+
+ runProcessor();
+ //cannot validate blob size as azure api does not expose unencrypted data length
+ assertFlowFile(BLOB_NAME, BLOB_DATA, null);
+ assertProvenanceEvents();
+ }
+
@Test
public void testFetchBlobWithSimpleNameUsingProxyConfigurationService() throws Exception {
uploadBlob(BLOB_NAME, BLOB_DATA);
@@ -186,14 +202,15 @@ public class ITFetchAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
assertProvenanceEvents();
}
- private void assertFlowFile(String blobName, byte[] blobData, int originalLength) throws Exception {
+ private void assertFlowFile(String blobName, byte[] blobData, Integer originalLength) throws Exception {
runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage_v12.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchAzureBlobStorage_v12.REL_SUCCESS).get(0);
assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), blobName);
- assertFlowFileResultBlobAttributes(flowFile, originalLength);
-
+ if(originalLength != null) {
+ assertFlowFileResultBlobAttributes(flowFile, originalLength);
+ }
flowFile.assertContentEquals(blobData);
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
index e3481d194c..78e2a65719 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
@@ -20,7 +20,9 @@ import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobErrorCode;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.ClientSideEncryptionSupport;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.processors.azure.storage.utils.ClientSideEncryptionMethod;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
@@ -40,8 +42,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
+
public static class ITProcessor extends PutAzureBlobStorage_v12 {
public boolean blobMetadataApplied = false;
+
@Override
protected void applyBlobMetadata(Map attributes, BlobClient blobClient) {
super.applyBlobMetadata(attributes, blobClient);
@@ -190,6 +194,60 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
}
+ @Test
+ public void testPutBlob64BLocalCSE() {
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_64B_VALUE);
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testPutBlob128BLocalCSE() throws Exception {
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_128B_VALUE);
+ runProcessor(BLOB_DATA);
+ assertSuccessForCSE(getContainerName(), BLOB_NAME, BLOB_DATA);
+ }
+
+ @Test
+ public void testPutBlob192BLocalCSE() throws Exception {
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_192B_VALUE);
+ runProcessor(BLOB_DATA);
+ assertSuccessForCSE(getContainerName(), BLOB_NAME, BLOB_DATA);
+ }
+
+ @Test
+ public void testPutBlob256BLocalCSE() throws Exception {
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_256B_VALUE);
+ runProcessor(BLOB_DATA);
+ assertSuccessForCSE(getContainerName(), BLOB_NAME, BLOB_DATA);
+ }
+
+ @Test
+ public void testPutBlob384BLocalCSE() throws Exception {
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_384B_VALUE);
+ runProcessor(BLOB_DATA);
+ assertSuccessForCSE(getContainerName(), BLOB_NAME, BLOB_DATA);
+ }
+
+ @Test
+ public void testPutBlob512BLocalCSE() throws Exception {
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_TYPE, ClientSideEncryptionMethod.LOCAL.name());
+ runner.setProperty(ClientSideEncryptionSupport.CSE_KEY_ID, KEY_ID_VALUE);
+ runner.setProperty(ClientSideEncryptionSupport.CSE_LOCAL_KEY, KEY_512B_VALUE);
+ runProcessor(BLOB_DATA);
+ assertSuccessForCSE(getContainerName(), BLOB_NAME, BLOB_DATA);
+ }
+
+
private void runProcessor(byte[] data) {
runProcessor(data, Collections.emptyMap());
}
@@ -207,6 +265,13 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
return flowFile;
}
+ private MockFlowFile assertSuccessForCSE(String containerName, String blobName, byte[] blobData) throws Exception {
+ MockFlowFile flowFile = assertFlowFile(containerName, blobName, blobData);
+ assertAzureBlobExists(containerName, blobName);
+ assertProvenanceEvents();
+ return flowFile;
+ }
+
private MockFlowFile assertIgnored(String containerName, String blobName) throws Exception {
MockFlowFile flowFile = assertFlowFile(containerName, blobName, null);
assertProvenanceEvents();
@@ -222,16 +287,21 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
if (blobData != null) {
assertFlowFileResultBlobAttributes(flowFile, blobData.length);
flowFile.assertContentEquals(blobData);
+ flowFile.assertAttributeEquals("azure.length", String.valueOf(blobData.length));
}
return flowFile;
}
private void assertAzureBlob(String containerName, String blobName, byte[] blobData) {
+ BlobClient blobClient = assertAzureBlobExists(containerName, blobName);
+ assertEquals(blobData.length, blobClient.getProperties().getBlobSize());
+ }
+
+ private BlobClient assertAzureBlobExists(String containerName, String blobName) {
BlobContainerClient containerClient = getStorageClient().getBlobContainerClient(containerName);
BlobClient blobClient = containerClient.getBlobClient(blobName);
-
assertTrue(blobClient.exists());
- assertEquals(blobData.length, blobClient.getProperties().getBlobSize());
+ return blobClient;
}
private void assertProvenanceEvents() {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestClientSideEncryptionSupport.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestClientSideEncryptionSupport.java
new file mode 100644
index 0000000000..2664cc81e9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestClientSideEncryptionSupport.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processors.azure.storage.utils.ClientSideEncryptionMethod;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockValidationContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestClientSideEncryptionSupport {
+ private static final String KEY_ID_VALUE = "key:id";
+ private static final String KEY_64B_VALUE = "1234567890ABCDEF";
+ private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
+ private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
+ private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
+ private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
+ private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
+
+ private MockProcessContext processContext;
+ private MockValidationContext validationContext;
+ private PutAzureBlobStorage_v12 putAzureBlobStorage_v12;
+
+ @BeforeEach
+ public void setUp() {
+ putAzureBlobStorage_v12 = new PutAzureBlobStorage_v12();
+ processContext = new MockProcessContext(putAzureBlobStorage_v12);
+ validationContext = new MockValidationContext(processContext);
+ }
+
+ @Test
+ public void testNoCesConfiguredOnProcessor() {
+ configureProcessorProperties(ClientSideEncryptionMethod.NONE, null, null);
+
+ Collection result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testLocalCesNoKeyIdOnProcessor() {
+ configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, null, KEY_128B_VALUE);
+
+ Collection result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ @Test
+ public void testLocalCesNoKeyOnProcessor() {
+ configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, null);
+
+ Collection result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ @Test
+ public void testLocalCesInvalidHexKeyOnProcessor() {
+ configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, "ZZ");
+
+ Collection result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
+
+ assertNotValid(result);
+ }
+
+ @Test
+ public void testLocalCesInvalidKeyLengthOnProcessor() {
+ configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, KEY_64B_VALUE);
+
+ Collection result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
+
+ assertNotValid(result);
+ assertContains(result, "Key size in bits must be one of [128, 192, 256, 384, 512] instead of [64]");
+ }
+
+ @Test
+ public void testLocalCes128BitKeyOnProcessor() {
+ configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, KEY_128B_VALUE);
+
+ Collection result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testLocalCes192BitKeyOnProcessor() {
+ configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, KEY_192B_VALUE);
+
+ Collection result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testLocalCes256BitKeyOnProcessor() {
+ configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, KEY_256B_VALUE);
+
+ Collection result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testLocalCes384BitKeyOnProcessor() {
+ configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, KEY_384B_VALUE);
+
+ Collection result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ @Test
+ public void testLocalCes512BitKeyOnProcessor() {
+ configureProcessorProperties(ClientSideEncryptionMethod.LOCAL, KEY_ID_VALUE, KEY_512B_VALUE);
+
+ Collection result = putAzureBlobStorage_v12.validateClientSideEncryptionProperties(validationContext);
+
+ assertValid(result);
+ }
+
+ private void configureProcessorProperties(ClientSideEncryptionMethod keyType, String keyId, String localKeyHex) {
+ if (keyType != null) {
+ processContext.setProperty(putAzureBlobStorage_v12.CSE_KEY_TYPE, keyType.getValue());
+ }
+ if (keyId != null) {
+ processContext.setProperty(putAzureBlobStorage_v12.CSE_KEY_ID, keyId);
+ }
+ if (localKeyHex != null) {
+ processContext.setProperty(putAzureBlobStorage_v12.CSE_LOCAL_KEY, localKeyHex);
+ }
+ }
+
+ private void assertValid(Collection result) {
+ assertTrue(result.isEmpty(), "There should be no validation error");
+ }
+
+ private void assertNotValid(Collection result) {
+ assertFalse(result.isEmpty(), "There should be validation error");
+ }
+
+ private void assertContains(Collection result, String explaination) {
+ assertFalse(result.isEmpty(), "There should be validation error");
+ assertTrue(result.stream().filter(v -> v.getExplanation().contains(explaination)).findFirst().isPresent());
+ }
+}
\ No newline at end of file