NIFI-8501 Added Azure blob client side encryption

This closes #5078

Signed-off-by: Joey Frazee <jfrazee@apache.org>
This commit is contained in:
Guillaume Schaer 2021-05-17 01:05:51 +02:00 committed by Joey Frazee
parent 54624bc26d
commit ace27e5f69
12 changed files with 742 additions and 17 deletions

View File

@ -22,6 +22,7 @@
<properties>
<azure-eventhubs.version>3.2.1</azure-eventhubs.version>
<azure-eventhubs-eph.version>3.2.1</azure-eventhubs-eph.version>
<azure-keyvault.version>1.2.4</azure-keyvault.version>
</properties>
<dependencies>
<dependency>
@ -83,6 +84,11 @@
<artifactId>azure-eventhubs</artifactId>
<version>${azure-eventhubs.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-keyvault</artifactId>
<version>${azure-keyvault.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>

View File

@ -16,13 +16,21 @@
*/
package org.apache.nifi.processors.azure;
import com.microsoft.azure.keyvault.cryptography.SymmetricKey;
import com.microsoft.azure.storage.blob.BlobEncryptionPolicy;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.Arrays;
@ -85,4 +93,24 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
protected BlobRequestOptions createBlobRequestOptions(ProcessContext context) throws DecoderException {
final String cseKeyTypeValue = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE).getValue();
final AzureBlobClientSideEncryptionMethod cseKeyType = AzureBlobClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
final String cseKeyId = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID).getValue();
final String cseSymmetricKeyHex = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX).getValue();
BlobRequestOptions blobRequestOptions = new BlobRequestOptions();
if (cseKeyType == AzureBlobClientSideEncryptionMethod.SYMMETRIC) {
byte[] keyBytes = Hex.decodeHex(cseSymmetricKeyHex.toCharArray());
SymmetricKey key = new SymmetricKey(cseKeyId, keyBytes);
BlobEncryptionPolicy policy = new BlobEncryptionPolicy(key, null);
blobRequestOptions.setEncryptionPolicy(policy);
}
return blobRequestOptions;
}
}

View File

@ -23,10 +23,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.microsoft.azure.storage.OperationContext;
import org.apache.commons.codec.DecoderException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -35,6 +37,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
@ -43,12 +47,14 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile")
@ -79,11 +85,21 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
.required(false)
.build();
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
results.addAll(AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext));
return results;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(RANGE_START);
properties.add(RANGE_LENGTH);
properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE);
properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
properties.add(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
return properties;
}
@ -112,11 +128,13 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
final Map<String, String> attributes = new HashMap<>();
final CloudBlob blob = container.getBlockBlobReference(blobPath);
BlobRequestOptions blobRequestOptions = createBlobRequestOptions(context);
// TODO - we may be able do fancier things with ranges and
// distribution of download over threads, investigate
flowFile = session.write(flowFile, os -> {
try {
blob.downloadRange(rangeStart, rangeLength, os, null, null, operationContext);
blob.downloadRange(rangeStart, rangeLength, os, null, blobRequestOptions, operationContext);
} catch (StorageException e) {
storedException.set(e);
throw new IOException(e);
@ -133,7 +151,7 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
} catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) {
} catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException | DecoderException e) {
if (e instanceof ProcessException && storedException.get() == null) {
throw (ProcessException) e;
} else {

View File

@ -26,11 +26,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.storage.OperationContext;
import org.apache.commons.codec.DecoderException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -39,6 +41,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
@ -46,6 +50,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import com.microsoft.azure.storage.StorageException;
@ -53,6 +58,7 @@ import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class, DeleteAzureBlobStorage.class })
@ -88,12 +94,22 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
"will fail if the container does not exist.")
.build();
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
results.addAll(AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext));
return results;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.remove(BLOB);
properties.add(BLOB_NAME);
properties.add(CREATE_CONTAINER);
properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE);
properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
properties.add(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
return properties;
}
@ -124,6 +140,8 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
BlobRequestOptions blobRequestOptions = createBlobRequestOptions(context);
final Map<String, String> attributes = new HashMap<>();
long length = flowFile.getSize();
session.read(flowFile, rawIn -> {
@ -142,7 +160,7 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
}
try {
uploadBlob(blob, operationContext, in);
uploadBlob(blob, operationContext, blobRequestOptions, in);
BlobProperties properties = blob.getProperties();
attributes.put("azure.container", containerName);
attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
@ -163,7 +181,7 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
} catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException e) {
} catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException | DecoderException e) {
if (e instanceof ProcessException && storedException.get() == null) {
throw (ProcessException) e;
} else {
@ -177,8 +195,8 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
}
@VisibleForTesting
void uploadBlob(CloudBlob blob, OperationContext operationContext, InputStream in) throws StorageException, IOException {
blob.upload(in, -1, null, null, operationContext);
void uploadBlob(CloudBlob blob, OperationContext operationContext, BlobRequestOptions blobRequestOptions, InputStream in) throws StorageException, IOException {
blob.upload(in, -1, null, blobRequestOptions, operationContext);
}
// Used to help force Azure Blob SDK to write in blocks

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
/**
* Enumeration capturing essential information about the various client-side
* encryption methods supported by Azure
*/
public enum AzureBlobClientSideEncryptionMethod {
NONE("None", "The blobs sent to Azure are not encrypted."),
SYMMETRIC("Symmetric", "The blobs sent to Azure are encrypted using a symmetric algorithm.");
private final String cseName;
private final String description;
AzureBlobClientSideEncryptionMethod(String cseName, String description) {
this.cseName = cseName;
this.description = description;
}
public String getCseName() {
return cseName;
}
public String getDescription() {
return description;
}
@Override
public String toString() {
return description;
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import com.microsoft.azure.keyvault.cryptography.SymmetricKey;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
public class AzureBlobClientSideEncryptionUtils {
private static final String DEFAULT_KEY_ID = "nifi";
public static final PropertyDescriptor CSE_KEY_TYPE = new PropertyDescriptor.Builder()
.name("cse-key-type")
.displayName("Client-Side Encryption Key Type")
.required(true)
.allowableValues(buildCseEncryptionMethodAllowableValues())
.defaultValue(AzureBlobClientSideEncryptionMethod.NONE.name())
.description("Specifies the key type to use for client-side encryption.")
.build();
public static final PropertyDescriptor CSE_KEY_ID = new PropertyDescriptor.Builder()
.name("cse-key-id")
.displayName("Client-Side Encryption Key ID")
.description("Specifies the ID of the key to use for client-side encryption.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.dependsOn(CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name())
.build();
public static final PropertyDescriptor CSE_SYMMETRIC_KEY_HEX = new PropertyDescriptor.Builder()
.name("cse-symmetric-key-hex")
.displayName("Symmetric Key")
.description("When using symmetric client-side encryption, this is the raw key, encoded in hexadecimal")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.dependsOn(CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name())
.sensitive(true)
.build();
private static AllowableValue[] buildCseEncryptionMethodAllowableValues() {
return Arrays.stream(AzureBlobClientSideEncryptionMethod.values())
.map(v -> new AllowableValue(v.name(), v.name(), v.getDescription()))
.toArray(AllowableValue[]::new);
}
public static Collection<ValidationResult> validateClientSideEncryptionProperties(ValidationContext validationContext) {
final List<ValidationResult> validationResults = new ArrayList<>();
final String cseKeyTypeValue = validationContext.getProperty(CSE_KEY_TYPE).getValue();
final AzureBlobClientSideEncryptionMethod cseKeyType = AzureBlobClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
final String cseKeyId = validationContext.getProperty(CSE_KEY_ID).getValue();
final String cseSymmetricKeyHex = validationContext.getProperty(CSE_SYMMETRIC_KEY_HEX).getValue();
if (cseKeyType != AzureBlobClientSideEncryptionMethod.NONE && StringUtils.isBlank(cseKeyId)) {
validationResults.add(new ValidationResult.Builder().subject(CSE_KEY_ID.getDisplayName())
.explanation("a key ID must be set when client-side encryption is enabled.").build());
}
if (cseKeyType == AzureBlobClientSideEncryptionMethod.SYMMETRIC) {
validationResults.addAll(validateSymmetricKey(cseSymmetricKeyHex));
}
return validationResults;
}
private static List<ValidationResult> validateSymmetricKey(String keyHex) {
final List<ValidationResult> validationResults = new ArrayList<>();
if (StringUtils.isBlank(keyHex)) {
validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
.explanation("a symmetric key must not be set when client-side encryption is enabled with symmetric encryption.").build());
} else {
byte[] keyBytes;
try {
keyBytes = Hex.decodeHex(keyHex.toCharArray());
new SymmetricKey(DEFAULT_KEY_ID, keyBytes);
} catch (DecoderException e) {
validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
.explanation("the symmetric key must be a valid hexadecimal string.").build());
} catch (IllegalArgumentException e) {
validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
.explanation(e.getMessage()).build());
}
}
return validationResults;
}
}

View File

@ -0,0 +1,249 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ComparisonFailure;
import org.junit.Test;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.fail;
public class ITAzureBlobStorageE2E {
private static final Properties CONFIG;
private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
static {
CONFIG = new Properties();
try {
final FileInputStream fis = new FileInputStream(CREDENTIALS_FILE);
try {
CONFIG.load(fis);
} catch (IOException e) {
fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
} finally {
FileUtils.closeQuietly(fis);
}
} catch (FileNotFoundException e) {
fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
}
}
protected static String getAccountName() {
return CONFIG.getProperty("accountName");
}
protected static String getAccountKey() {
return CONFIG.getProperty("accountKey");
}
protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container";
protected static final String TEST_BLOB_NAME = "nifi-test-blob";
protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
private static final String KEY_ID_VALUE = "key:id";
private static final String KEY_64B_VALUE = "1234567890ABCDEF";
private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
protected TestRunner putRunner;
protected TestRunner listRunner;
protected TestRunner fetchRunner;
protected CloudBlobContainer container;
@Before
public void setupRunners() throws Exception {
putRunner = TestRunners.newTestRunner(new PutAzureBlobStorage());
listRunner = TestRunners.newTestRunner(new ListAzureBlobStorage());
fetchRunner = TestRunners.newTestRunner(new FetchAzureBlobStorage());
String containerName = String.format("%s-%s", TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
StorageCredentials storageCredentials = new StorageCredentialsAccountAndKey(getAccountName(), getAccountKey());
CloudStorageAccount storageAccount = new CloudStorageAccount(storageCredentials, true);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
container = blobClient.getContainerReference(containerName);
container.createIfNotExists();
setRunnerProperties(putRunner, containerName);
setRunnerProperties(listRunner, containerName);
setRunnerProperties(fetchRunner, containerName);
}
private void setRunnerProperties(TestRunner runner, String containerName) {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, getAccountName());
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
}
@After
public void tearDownAzureContainer() throws Exception {
container.deleteIfExists();
}
@Test
public void AzureBlobStorageE2ENoCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.NONE.name(),
null,
null,
AzureBlobClientSideEncryptionMethod.NONE.name(),
null,
null
);
}
@Test
public void AzureBlobStorageE2E128BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_128B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_128B_VALUE
);
}
@Test
public void AzureBlobStorageE2E192BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_192B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_192B_VALUE
);
}
@Test
public void AzureBlobStorageE2E256BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_256B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_256B_VALUE
);
}
@Test
public void AzureBlobStorageE2E384BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_384B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_384B_VALUE
);
}
@Test
public void AzureBlobStorageE2E512BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_512B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_512B_VALUE
);
}
@Test(expected = ComparisonFailure.class)
public void AzureBlobStorageE2E128BCSENoDecryption() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_128B_VALUE,
AzureBlobClientSideEncryptionMethod.NONE.name(),
KEY_ID_VALUE,
KEY_128B_VALUE
);
}
private void testE2E(String encryptionKeyType, String encryptionKeyId, String encryptionKeyHex, String decryptionKeyType, String decryptionKeyId, String decryptionKeyHex) throws Exception {
putRunner.setProperty(PutAzureBlobStorage.BLOB, TEST_BLOB_NAME);
putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, encryptionKeyType);
if (encryptionKeyId == null || encryptionKeyId.isEmpty() || encryptionKeyId.trim().isEmpty()) {
putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
} else {
putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, encryptionKeyId);
}
if (encryptionKeyHex == null || encryptionKeyHex.isEmpty() || encryptionKeyHex.trim().isEmpty()) {
putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
} else {
putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, encryptionKeyHex);
}
putRunner.assertValid();
putRunner.enqueue(TEST_FILE_CONTENT.getBytes());
putRunner.run();
putRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2);
listRunner.assertValid();
listRunner.run();
listRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
MockFlowFile entry = listRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0);
entry.assertAttributeEquals("mime.type", "application/octet-stream");
fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, decryptionKeyType);
if (decryptionKeyId == null || decryptionKeyId.isEmpty() || decryptionKeyId.trim().isEmpty()) {
fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
} else {
fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, decryptionKeyId);
}
if (decryptionKeyHex == null || decryptionKeyHex.isEmpty() || decryptionKeyHex.trim().isEmpty()) {
fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
} else {
fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, decryptionKeyHex);
}
fetchRunner.assertValid();
fetchRunner.enqueue(entry);
fetchRunner.run();
fetchRunner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
MockFlowFile fetchedEntry = fetchRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0);
fetchedEntry.assertContentEquals(TEST_FILE_CONTENT);
}
}

View File

@ -20,7 +20,6 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class ITListAzureBlobStorage extends AbstractAzureBlobStorageIT {
@ -34,11 +33,11 @@ public class ITListAzureBlobStorage extends AbstractAzureBlobStorageIT {
public void setUp() throws Exception {
uploadTestBlob();
Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS));
Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2);
}
@Test
public void testListBlobs() {
public void testListBlobs() throws Exception {
runner.assertValid();
runner.run(1);

View File

@ -18,6 +18,8 @@ package org.apache.nifi.processors.azure.storage;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
@ -29,6 +31,16 @@ import static org.junit.Assert.assertTrue;
public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT {
public static final String TEST_FILE_CONTENT = "0123456789";
private static final String KEY_ID_VALUE = "key:id";
private static final String KEY_64B_VALUE = "1234567890ABCDEF";
private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
@Override
protected Class<? extends Processor> getProcessorClass() {
return PutAzureBlobStorage.class;
@ -42,7 +54,75 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT {
@Test
public void testPutBlob() throws Exception {
runner.assertValid();
runner.enqueue("0123456789".getBytes());
runner.enqueue(TEST_FILE_CONTENT.getBytes());
runner.run();
assertResult();
}
@Test
public void testPutBlob64BSymmetricCSE() {
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_64B_VALUE);
runner.assertNotValid();
}
@Test
public void testPutBlob128BSymmetricCSE() throws Exception {
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_128B_VALUE);
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes());
runner.run();
assertResult();
}
@Test
public void testPutBlob192BSymmetricCSE() throws Exception {
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_192B_VALUE);
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes());
runner.run();
assertResult();
}
@Test
public void testPutBlob256BSymmetricCSE() throws Exception {
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_256B_VALUE);
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes());
runner.run();
assertResult();
}
@Test
public void testPutBlob384BSymmetricCSE() throws Exception {
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_384B_VALUE);
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes());
runner.run();
assertResult();
}
@Test
public void testPutBlob512BSymmetricCSE() throws Exception {
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_512B_VALUE);
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes());
runner.run();
assertResult();
@ -53,7 +133,7 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT {
configureCredentialsService();
runner.assertValid();
runner.enqueue("0123456789".getBytes());
runner.enqueue(TEST_FILE_CONTENT.getBytes());
runner.run();
assertResult();
@ -64,7 +144,7 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "invalid");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "aW52YWxpZGludmFsaWQ=");
runner.assertValid();
runner.enqueue("test".getBytes());
runner.enqueue(TEST_FILE_CONTENT.getBytes());
runner.run();
runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1);
@ -74,7 +154,7 @@ public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT {
runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) {
flowFile.assertContentEquals("0123456789".getBytes());
flowFile.assertContentEquals(TEST_FILE_CONTENT.getBytes());
flowFile.assertAttributeEquals("azure.length", "10");
}

View File

@ -32,7 +32,7 @@ public class TestPutAzureBlobStorage {
@Test
public void testIOExceptionDuringUploadTransfersToFailure() throws Exception {
PutAzureBlobStorage processor = Mockito.spy(new PutAzureBlobStorage());
doThrow(IOException.class).when(processor).uploadBlob(any(), any(), any());
doThrow(IOException.class).when(processor).uploadBlob(any(), any(), any(), any());
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutAzureBlobStorage.BLOB, "test");

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.PutAzureBlobStorage;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockValidationContext;
import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
public class TestAzureBlobClientSideEncryptionUtils {
private static final String KEY_ID_VALUE = "key:id";
private static final String KEY_64B_VALUE = "1234567890ABCDEF";
private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
private MockProcessContext processContext;
private MockValidationContext validationContext;
@Before
public void setUp() {
Processor processor = new PutAzureBlobStorage();
processContext = new MockProcessContext(processor);
validationContext = new MockValidationContext(processContext);
}
@Test
public void testNoCesConfiguredOnProcessor() {
configureProcessorProperties("NONE", null,null);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testSymmetricCesNoKeyIdOnProcessor() {
configureProcessorProperties("SYMMETRIC", null, KEY_128B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertNotValid(result);
}
@Test
public void testSymmetricCesNoKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,null);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertNotValid(result);
}
@Test
public void testSymmetricCesInvalidHexKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,"ZZ");
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertNotValid(result);
}
@Test
public void testSymmetricCes64BitKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_64B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertNotValid(result);
}
@Test
public void testSymmetricCes128BitKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_128B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testSymmetricCes192BitKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_192B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testSymmetricCes256BitKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_256B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testSymmetricCes384BitKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_384B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testSymmetricCes512BitKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_512B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
private void configureProcessorProperties(String keyType, String keyId, String symmetricKeyHex) {
if (keyType != null) {
processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, keyType);
}
if (keyId != null) {
processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, keyId);
}
if (symmetricKeyHex != null) {
processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, symmetricKeyHex);
}
}
private void assertValid(Collection<ValidationResult> result) {
assertTrue("There should be no validation error", result.isEmpty());
}
private void assertNotValid(Collection<ValidationResult> result) {
assertFalse("There should be validation error", result.isEmpty());
}
}

View File

@ -116,10 +116,10 @@ public class TestAzureStorageUtilsGetStorageCredentialsDetails {
private void configureProcessorProperties(String accountName, String accountKey, String sasToken) {
if (accountName != null) {
processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, ACCOUNT_NAME_VALUE);
processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, accountName);
}
if (accountKey != null) {
processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, ACCOUNT_KEY_VALUE);
processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, accountKey);
}
if (sasToken != null) {
processContext.setProperty(AzureStorageUtils.PROP_SAS_TOKEN, sasToken);