NIFI-11228 Removed deprecated Azure Blob Storage Processors

This closes #7234

Signed-off-by: Nandor Soma Abonyi <nsabonyi@apache.org>
This commit is contained in:
exceptionfactory 2023-05-09 09:46:19 -05:00 committed by Nandor Soma Abonyi
parent fd2138b8cf
commit f7e36a07ac
No known key found for this signature in database
GPG Key ID: AFFFD8C3A1A88ED7
21 changed files with 7 additions and 2213 deletions

View File

@ -271,15 +271,6 @@ class TestRuntimeManifest {
assertNotNull(routeOnAttributeDef.getDynamicProperties().get(0).getValue());
assertNotNull(routeOnAttributeDef.getDynamicProperties().get(0).getExpressionLanguageScope());
// Verify DeleteAzureBlobStorage is deprecated
final ProcessorDefinition deleteAzureBlobDef = getProcessorDefinition(bundles, "nifi-azure-nar",
"org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage");
assertNotNull(deleteAzureBlobDef.getDeprecated());
assertTrue(deleteAzureBlobDef.getDeprecated().booleanValue());
assertNotNull(deleteAzureBlobDef.getDeprecationReason());
assertNotNull(deleteAzureBlobDef.getDeprecationAlternatives());
assertFalse(deleteAzureBlobDef.getDeprecationAlternatives().isEmpty());
// Verify SplitJson has @SystemResourceConsiderations
final ProcessorDefinition splitJsonDef = getProcessorDefinition(bundles, "nifi-standard-nar",
"org.apache.nifi.processors.standard.SplitJson");

View File

@ -19,9 +19,6 @@
</parent>
<artifactId>nifi-azure-processors</artifactId>
<packaging>jar</packaging>
<properties>
<azure-keyvault.version>1.2.6</azure-keyvault.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -96,11 +93,6 @@
<artifactId>azure-security-keyvault-keys</artifactId>
</dependency>
<!-- Legacy Microsoft Azure Libraries -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-keyvault</artifactId>
<version>${azure-keyvault.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
@ -113,6 +105,10 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>

View File

@ -1,116 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure;
import com.microsoft.azure.keyvault.cryptography.SymmetricKey;
import com.microsoft.azure.storage.blob.BlobEncryptionPolicy;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder()
.name("blob")
.displayName("Blob")
.description("The filename of the blob")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.defaultValue("${azure.blobname}")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All successfully processed FlowFiles are routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Unsuccessful operations will be transferred to the failure relationship.")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections
.unmodifiableList(Arrays.asList(
AzureStorageUtils.CONTAINER,
AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.ENDPOINT_SUFFIX,
BLOB,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(
AbstractAzureBlobProcessor.REL_SUCCESS,
AbstractAzureBlobProcessor.REL_FAILURE)));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> results = AzureStorageUtils.validateCredentialProperties(validationContext);
AzureStorageUtils.validateProxySpec(validationContext, results);
return results;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
protected BlobRequestOptions createBlobRequestOptions(ProcessContext context) throws DecoderException {
final String cseKeyTypeValue = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE).getValue();
final AzureBlobClientSideEncryptionMethod cseKeyType = AzureBlobClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
final String cseKeyId = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID).getValue();
final String cseSymmetricKeyHex = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX).getValue();
BlobRequestOptions blobRequestOptions = new BlobRequestOptions();
if (cseKeyType == AzureBlobClientSideEncryptionMethod.SYMMETRIC) {
byte[] keyBytes = Hex.decodeHex(cseSymmetricKeyHex.toCharArray());
SymmetricKey key = new SymmetricKey(cseKeyId, keyBytes);
BlobEncryptionPolicy policy = new BlobEncryptionPolicy(key, null);
blobRequestOptions.setEncryptionPolicy(policy);
}
return blobRequestOptions;
}
}

View File

@ -1,107 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class, PutAzureBlobStorage.class})
@CapabilityDescription("Deletes the provided blob from Azure Storage")
@InputRequirement(Requirement.INPUT_REQUIRED)
@DeprecationNotice(alternatives = DeleteAzureBlobStorage_v12.class, reason = "Processor depends on legacy Microsoft Azure SDK")
public class DeleteAzureBlobStorage extends AbstractAzureBlobProcessor {
private static final AllowableValue DELETE_SNAPSHOTS_NONE = new AllowableValue(DeleteSnapshotsOption.NONE.name(), "None", "Delete the blob only.");
private static final AllowableValue DELETE_SNAPSHOTS_ALSO = new AllowableValue(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS.name(), "Include Snapshots", "Delete the blob and its snapshots.");
private static final AllowableValue DELETE_SNAPSHOTS_ONLY = new AllowableValue(DeleteSnapshotsOption.DELETE_SNAPSHOTS_ONLY.name(), "Delete Snapshots Only", "Delete only the blob's snapshots.");
private static final PropertyDescriptor DELETE_SNAPSHOTS_OPTION = new PropertyDescriptor.Builder()
.name("delete-snapshots-option")
.displayName("Delete Snapshots Option")
.description("Specifies the snapshot deletion options to be used when deleting a blob.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues(DELETE_SNAPSHOTS_NONE, DELETE_SNAPSHOTS_ALSO, DELETE_SNAPSHOTS_ONLY)
.defaultValue(DELETE_SNAPSHOTS_NONE.getValue())
.required(true)
.build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(DELETE_SNAPSHOTS_OPTION);
return properties;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if(flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
final String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
final String deleteSnapshotOptions = context.getProperty(DELETE_SNAPSHOTS_OPTION).getValue();
try {
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile);
CloudBlobContainer container = blobClient.getContainerReference(containerName);
CloudBlob blob = container.getBlockBlobReference(blobPath);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
blob.deleteIfExists(DeleteSnapshotsOption.valueOf(deleteSnapshotOptions), null, null, operationContext);
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, blob.getSnapshotQualifiedUri().toString(), "Blob deleted");
} catch ( StorageException | URISyntaxException e) {
getLogger().error("Failed to delete the specified blob {} from Azure Storage. Routing to failure", new Object[]{blobPath}, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -1,167 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.microsoft.azure.storage.OperationContext;
import org.apache.commons.codec.DecoderException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile")
@SeeAlso({ ListAzureBlobStorage.class, PutAzureBlobStorage.class, DeleteAzureBlobStorage.class })
@InputRequirement(Requirement.INPUT_REQUIRED)
@WritesAttributes({
@WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched")
})
@DeprecationNotice(alternatives = FetchAzureBlobStorage_v12.class, reason = "Processor depends on legacy Microsoft Azure SDK")
public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder()
.name("range-start")
.displayName("Range Start")
.description("The byte position at which to start reading from the blob. An empty value or a value of " +
"zero will start reading at the beginning of the blob.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder()
.name("range-length")
.displayName("Range Length")
.description("The number of bytes to download from the blob, starting from the Range Start. An empty " +
"value or a value that extends beyond the end of the blob will read to the end of the blob.")
.addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
results.addAll(AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext));
return results;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(RANGE_START);
properties.add(RANGE_LENGTH);
properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE);
properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
properties.add(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
return properties;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
final String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);
AtomicReference<Exception> storedException = new AtomicReference<>();
try {
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile);
CloudBlobContainer container = blobClient.getContainerReference(containerName);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
final Map<String, String> attributes = new HashMap<>();
final CloudBlob blob = container.getBlockBlobReference(blobPath);
BlobRequestOptions blobRequestOptions = createBlobRequestOptions(context);
// TODO - we may be able do fancier things with ranges and
// distribution of download over threads, investigate
flowFile = session.write(flowFile, os -> {
try {
blob.downloadRange(rangeStart, rangeLength, os, null, blobRequestOptions, operationContext);
} catch (StorageException e) {
storedException.set(e);
throw new IOException(e);
}
});
long length = blob.getProperties().getLength();
attributes.put("azure.length", String.valueOf(length));
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
} catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException | DecoderException e) {
if (e instanceof ProcessException && storedException.get() == null) {
throw (ProcessException) e;
} else {
Exception failureException = Optional.ofNullable(storedException.get()).orElse(e);
getLogger().error("Failure to fetch Azure blob {}", new Object[]{blobPath}, failureException);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}
}

View File

@ -1,255 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.ResultContinuation;
import com.microsoft.azure.storage.ResultSegment;
import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.serialization.record.RecordSchema;
import java.util.Optional;
@PrimaryNodeOnly
@TriggerSerially
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class, DeleteAzureBlobStorage.class })
@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage. " +
"This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " +
"previous node left off without duplicating all of the data.")
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
@WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
@WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
@WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"),
@WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
@WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
@WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
@WritesAttribute(attribute = "mime.type", description = "MimeType of the content"),
@WritesAttribute(attribute = "lang", description = "Language code for the content"),
@WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
@Stateful(scopes = { Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. " +
"This allows the Processor to list only blobs that have been added or modified after this date the next time that the Processor is run. State is " +
"stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " +
"where the previous node left off, without duplicating the data.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
@DeprecationNotice(alternatives = ListAzureBlobStorage_v12.class, reason = "Processor depends on legacy Microsoft Azure SDK")
public class ListAzureBlobStorage extends AbstractListAzureProcessor<BlobInfo> {
private static final PropertyDescriptor PROP_PREFIX = new PropertyDescriptor.Builder()
.name("prefix")
.displayName("Prefix")
.description("Search prefix for listing")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
LISTING_STRATEGY,
AbstractListProcessor.RECORD_WRITER,
AzureStorageUtils.CONTAINER,
AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.ENDPOINT_SUFFIX,
PROP_PREFIX,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
ListedEntityTracker.TRACKING_STATE_CACHE,
ListedEntityTracker.TRACKING_TIME_WINDOW,
ListedEntityTracker.INITIAL_LISTING_TARGET,
MIN_AGE,
MAX_AGE,
MIN_SIZE,
MAX_SIZE
));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
results.addAll(AzureStorageUtils.validateCredentialProperties(validationContext));
AzureStorageUtils.validateProxySpec(validationContext, results);
}
@Override
protected Map<String, String> createAttributes(BlobInfo entity, ProcessContext context) {
final Map<String, String> attributes = new HashMap<>();
attributes.put("azure.container", entity.getContainerName());
attributes.put("azure.etag", entity.getEtag());
attributes.put("azure.primaryUri", entity.getPrimaryUri());
attributes.put("azure.secondaryUri", entity.getSecondaryUri());
attributes.put("azure.blobname", entity.getBlobName());
attributes.put("filename", entity.getName());
attributes.put("azure.blobtype", entity.getBlobType());
attributes.put("azure.length", String.valueOf(entity.getLength()));
attributes.put("azure.timestamp", String.valueOf(entity.getTimestamp()));
attributes.put("mime.type", entity.getContentType());
attributes.put("lang", entity.getContentLanguage());
return attributes;
}
@Override
protected String getListingContainerName(final ProcessContext context) {
return String.format("Azure Blob Storage Container [%s]", getPath(context));
}
@Override
protected String getPath(final ProcessContext context) {
return context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
}
@Override
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
// re-list if configuration changed, but not when security keys are rolled (not included in the condition)
return PROP_PREFIX.equals(property)
|| AzureStorageUtils.ACCOUNT_NAME.equals(property)
|| AzureStorageUtils.CONTAINER.equals(property)
|| AzureStorageUtils.PROP_SAS_TOKEN.equals(property);
}
@Override
protected Scope getStateScope(final PropertyContext context) {
return Scope.CLUSTER;
}
@Override
protected RecordSchema getRecordSchema() {
return BlobInfo.getRecordSchema();
}
@Override
protected String getDefaultTimePrecision() {
// User does not have to choose one.
// AUTO_DETECT can handle most cases, but it may incur longer latency
// when all listed files do not have SECOND part in their timestamps although Azure Blob Storage does support seconds.
return PRECISION_SECONDS.getValue();
}
@Override
protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
final String prefix = Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse("");
final List<BlobInfo> listing = new ArrayList<>();
final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp;
try {
final CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
final CloudBlobContainer container = blobClient.getContainerReference(containerName);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
ResultContinuation continuationToken = null;
do {
final ResultSegment<ListBlobItem> result = container.listBlobsSegmented(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, continuationToken, null, operationContext);
continuationToken = result.getContinuationToken();
for (final ListBlobItem blob : result.getResults()) {
if (blob instanceof CloudBlob) {
final CloudBlob cloudBlob = (CloudBlob) blob;
final BlobProperties properties = cloudBlob.getProperties();
if (isFileInfoMatchesWithAgeAndSize(context, minimumTimestamp, properties.getLastModified().getTime(), properties.getLength())) {
final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
final Builder builder = new BlobInfo.Builder()
.primaryUri(uri.getPrimaryUri().toString())
.blobName(cloudBlob.getName())
.containerName(containerName)
.contentType(properties.getContentType())
.contentLanguage(properties.getContentLanguage())
.etag(properties.getEtag())
.lastModifiedTime(properties.getLastModified().getTime())
.length(properties.getLength());
if (uri.getSecondaryUri() != null) {
builder.secondaryUri(uri.getSecondaryUri().toString());
}
if (blob instanceof CloudBlockBlob) {
builder.blobType(AzureStorageUtils.BLOCK);
} else {
builder.blobType(AzureStorageUtils.PAGE);
}
listing.add(builder.build());
}
}
}
} while (continuationToken != null);
} catch (final Throwable t) {
throw new IOException(ExceptionUtils.getRootCause(t));
}
return listing;
}
// Unfiltered listing is not supported - must provide a prefix
@Override
protected Integer countUnfilteredListing(final ProcessContext context) {
return null;
}
}

View File

@ -1,221 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.FilterInputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.microsoft.azure.storage.OperationContext;
import org.apache.commons.codec.DecoderException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class, DeleteAzureBlobStorage.class })
@CapabilityDescription("Puts content into an Azure Storage Blob")
@InputRequirement(Requirement.INPUT_REQUIRED)
@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
@WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
@WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
@WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
@WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
@WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob")})
@DeprecationNotice(alternatives = PutAzureBlobStorage_v12.class, reason = "Processor depends on legacy Microsoft Azure SDK")
public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
public static final PropertyDescriptor BLOB_NAME = new PropertyDescriptor.Builder()
.name("blob")
.displayName("Blob")
.description("The filename of the blob")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor CREATE_CONTAINER = new PropertyDescriptor.Builder()
.name("azure-create-container")
.displayName("Create Container")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.description("Specifies whether to check if the container exists and to automatically create it if it does not. " +
"Permission to list containers is required. If false, this check is not made, but the Put operation " +
"will fail if the container does not exist.")
.build();
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
results.addAll(AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext));
return results;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.remove(BLOB);
properties.add(BLOB_NAME);
properties.add(CREATE_CONTAINER);
properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE);
properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
properties.add(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
return properties;
}
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
String blobPath = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue();
final boolean createContainer = context.getProperty(CREATE_CONTAINER).asBoolean();
AtomicReference<Exception> storedException = new AtomicReference<>();
try {
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile);
CloudBlobContainer container = blobClient.getContainerReference(containerName);
if (createContainer)
container.createIfNotExists();
CloudBlob blob = container.getBlockBlobReference(blobPath);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
BlobRequestOptions blobRequestOptions = createBlobRequestOptions(context);
final Map<String, String> attributes = new HashMap<>();
long length = flowFile.getSize();
session.read(flowFile, rawIn -> {
InputStream in = rawIn;
if (!(in instanceof BufferedInputStream)) {
// do not double-wrap
in = new BufferedInputStream(rawIn);
}
// If markSupported() is true and a file length is provided,
// Blobs are not uploaded in blocks resulting in OOME for large
// files. The UnmarkableInputStream wrapper class disables
// mark() and reset() to help force uploading files in chunks.
if (in.markSupported()) {
in = new UnmarkableInputStream(in);
}
try {
uploadBlob(blob, operationContext, blobRequestOptions, in);
BlobProperties properties = blob.getProperties();
attributes.put("azure.container", containerName);
attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
attributes.put("azure.etag", properties.getEtag());
attributes.put("azure.length", String.valueOf(length));
attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
} catch (StorageException | URISyntaxException | IOException e) {
storedException.set(e);
throw e instanceof IOException ? (IOException) e : new IOException(e);
}
});
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
} catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException | DecoderException e) {
if (e instanceof ProcessException && storedException.get() == null) {
throw (ProcessException) e;
} else {
Exception failureException = Optional.ofNullable(storedException.get()).orElse(e);
getLogger().error("Failed to put Azure blob {}", new Object[]{blobPath}, failureException);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}
void uploadBlob(CloudBlob blob, OperationContext operationContext, BlobRequestOptions blobRequestOptions, InputStream in) throws StorageException, IOException {
blob.upload(in, -1, null, blobRequestOptions, operationContext);
}
// Used to help force Azure Blob SDK to write in blocks
private static class UnmarkableInputStream extends FilterInputStream {
public UnmarkableInputStream(InputStream in) {
super(in);
}
@Override
public void mark(int readlimit) {
}
@Override
public void reset() throws IOException {
}
@Override
public boolean markSupported() {
return false;
}
}
}

View File

@ -1,118 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import com.microsoft.azure.keyvault.cryptography.SymmetricKey;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
public class AzureBlobClientSideEncryptionUtils {
private static final String DEFAULT_KEY_ID = "nifi";
public static final PropertyDescriptor CSE_KEY_TYPE = new PropertyDescriptor.Builder()
.name("cse-key-type")
.displayName("Client-Side Encryption Key Type")
.required(true)
.allowableValues(buildCseEncryptionMethodAllowableValues())
.defaultValue(AzureBlobClientSideEncryptionMethod.NONE.name())
.description("Specifies the key type to use for client-side encryption.")
.build();
public static final PropertyDescriptor CSE_KEY_ID = new PropertyDescriptor.Builder()
.name("cse-key-id")
.displayName("Client-Side Encryption Key ID")
.description("Specifies the ID of the key to use for client-side encryption.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.dependsOn(CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name())
.build();
public static final PropertyDescriptor CSE_SYMMETRIC_KEY_HEX = new PropertyDescriptor.Builder()
.name("cse-symmetric-key-hex")
.displayName("Symmetric Key")
.description("When using symmetric client-side encryption, this is the raw key, encoded in hexadecimal")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.dependsOn(CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name())
.sensitive(true)
.build();
private static AllowableValue[] buildCseEncryptionMethodAllowableValues() {
return Arrays.stream(AzureBlobClientSideEncryptionMethod.values())
.map(v -> new AllowableValue(v.name(), v.name(), v.getDescription()))
.toArray(AllowableValue[]::new);
}
public static Collection<ValidationResult> validateClientSideEncryptionProperties(ValidationContext validationContext) {
final List<ValidationResult> validationResults = new ArrayList<>();
final String cseKeyTypeValue = validationContext.getProperty(CSE_KEY_TYPE).getValue();
final AzureBlobClientSideEncryptionMethod cseKeyType = AzureBlobClientSideEncryptionMethod.valueOf(cseKeyTypeValue);
final String cseKeyId = validationContext.getProperty(CSE_KEY_ID).getValue();
final String cseSymmetricKeyHex = validationContext.getProperty(CSE_SYMMETRIC_KEY_HEX).getValue();
if (cseKeyType != AzureBlobClientSideEncryptionMethod.NONE && StringUtils.isBlank(cseKeyId)) {
validationResults.add(new ValidationResult.Builder().subject(CSE_KEY_ID.getDisplayName())
.explanation("a key ID must be set when client-side encryption is enabled.").build());
}
if (cseKeyType == AzureBlobClientSideEncryptionMethod.SYMMETRIC) {
validationResults.addAll(validateSymmetricKey(cseSymmetricKeyHex));
}
return validationResults;
}
private static List<ValidationResult> validateSymmetricKey(String keyHex) {
final List<ValidationResult> validationResults = new ArrayList<>();
if (StringUtils.isBlank(keyHex)) {
validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
.explanation("a symmetric key must not be set when client-side encryption is enabled with symmetric encryption.").build());
} else {
byte[] keyBytes;
try {
keyBytes = Hex.decodeHex(keyHex.toCharArray());
new SymmetricKey(DEFAULT_KEY_ID, keyBytes);
} catch (DecoderException e) {
validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
.explanation("the symmetric key must be a valid hexadecimal string.").build());
} catch (IllegalArgumentException e) {
validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName())
.explanation(e.getMessage()).build());
}
}
return validationResults;
}
}

View File

@ -32,7 +32,6 @@ import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
@ -41,7 +40,6 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
@ -206,18 +204,6 @@ public final class AzureStorageUtils {
// do not instantiate
}
/**
* Create CloudBlobClient instance.
* @param flowFile An incoming FlowFile can be used for NiFi Expression Language evaluation to derive
* Account Name, Account Key or SAS Token. This can be null if not available.
*/
public static CloudBlobClient createCloudBlobClient(ProcessContext context, ComponentLog logger, FlowFile flowFile) throws URISyntaxException {
final AzureStorageCredentialsDetails storageCredentialsDetails = getStorageCredentialsDetails(context, flowFile);
final CloudStorageAccount cloudStorageAccount = getCloudStorageAccount(storageCredentialsDetails);
final CloudBlobClient cloudBlobClient = cloudStorageAccount.createCloudBlobClient();
return cloudBlobClient;
}
public static CloudStorageAccount getCloudStorageAccount(final AzureStorageCredentialsDetails storageCredentialsDetails) throws URISyntaxException {
final CloudStorageAccount cloudStorageAccount;
if (storageCredentialsDetails instanceof AzureStorageEmulatorCredentialsDetails) {

View File

@ -15,10 +15,6 @@
org.apache.nifi.processors.azure.eventhub.PutAzureEventHub
org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
org.apache.nifi.processors.azure.eventhub.ConsumeAzureEventHub
org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage
org.apache.nifi.processors.azure.storage.ListAzureBlobStorage
org.apache.nifi.processors.azure.storage.PutAzureBlobStorage
org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage
org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage
org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage

View File

@ -1,72 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import static org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_BLOB_ENDPOINT_SUFFIX;
public abstract class AbstractAzureBlobStorageIT extends AbstractAzureStorageIT {
protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container";
protected static final String TEST_BLOB_NAME = "nifi-test-blob";
protected static final String TEST_FILE_NAME = "nifi-test-file";
protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
protected CloudBlobContainer container;
@Override
protected String getDefaultEndpointSuffix() {
return DEFAULT_BLOB_ENDPOINT_SUFFIX;
}
@BeforeEach
public void setUpAzureBlobStorageIT() throws Exception {
String containerName = String.format("%s-%s", TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
CloudBlobClient blobClient = getStorageAccount().createCloudBlobClient();
container = blobClient.getContainerReference(containerName);
container.createIfNotExists();
runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
}
@AfterEach
public void tearDownAzureBlobStorageIT() throws Exception {
container.deleteIfExists();
}
protected void uploadTestBlob() throws Exception {
uploadTestBlob(TEST_BLOB_NAME, TEST_FILE_CONTENT);
}
protected void uploadTestBlob(final String blobName, final String fileContent) throws Exception {
CloudBlob blob = container.getBlockBlobReference(blobName);
byte[] buf = fileContent.getBytes(StandardCharsets.UTF_8);
InputStream in = new ByteArrayInputStream(buf);
blob.upload(in, buf.length);
}
}

View File

@ -1,245 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
public class ITAzureBlobStorageE2E {
private static final Properties CONFIG;
private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
static {
CONFIG = new Properties();
try {
final FileInputStream fis = new FileInputStream(CREDENTIALS_FILE);
assertDoesNotThrow(() -> CONFIG.load(fis),
"Could not open credentials file " + CREDENTIALS_FILE);
FileUtils.closeQuietly(fis);
} catch (FileNotFoundException e) {
fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
}
}
protected static String getAccountName() {
return CONFIG.getProperty("accountName");
}
protected static String getAccountKey() {
return CONFIG.getProperty("accountKey");
}
protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container";
protected static final String TEST_BLOB_NAME = "nifi-test-blob";
protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
private static final String KEY_ID_VALUE = "key:id";
private static final String KEY_64B_VALUE = "1234567890ABCDEF";
private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
protected TestRunner putRunner;
protected TestRunner listRunner;
protected TestRunner fetchRunner;
protected CloudBlobContainer container;
@BeforeEach
public void setupRunners() throws Exception {
putRunner = TestRunners.newTestRunner(new PutAzureBlobStorage());
listRunner = TestRunners.newTestRunner(new ListAzureBlobStorage());
fetchRunner = TestRunners.newTestRunner(new FetchAzureBlobStorage());
String containerName = String.format("%s-%s", TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
StorageCredentials storageCredentials = new StorageCredentialsAccountAndKey(getAccountName(), getAccountKey());
CloudStorageAccount storageAccount = new CloudStorageAccount(storageCredentials, true);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
container = blobClient.getContainerReference(containerName);
container.createIfNotExists();
setRunnerProperties(putRunner, containerName);
setRunnerProperties(listRunner, containerName);
setRunnerProperties(fetchRunner, containerName);
}
private void setRunnerProperties(TestRunner runner, String containerName) {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, getAccountName());
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
}
@AfterEach
public void tearDownAzureContainer() throws Exception {
container.deleteIfExists();
}
@Test
public void AzureBlobStorageE2ENoCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.NONE.name(),
null,
null,
AzureBlobClientSideEncryptionMethod.NONE.name(),
null,
null
);
}
@Test
public void AzureBlobStorageE2E128BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_128B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_128B_VALUE
);
}
@Test
public void AzureBlobStorageE2E192BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_192B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_192B_VALUE
);
}
@Test
public void AzureBlobStorageE2E256BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_256B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_256B_VALUE
);
}
@Test
public void AzureBlobStorageE2E384BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_384B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_384B_VALUE
);
}
@Test
public void AzureBlobStorageE2E512BCSE() throws Exception {
testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_512B_VALUE,
AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_512B_VALUE
);
}
@Test
public void AzureBlobStorageE2E128BCSENoDecryption() {
assertThrows(Exception.class, () -> testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(),
KEY_ID_VALUE,
KEY_128B_VALUE,
AzureBlobClientSideEncryptionMethod.NONE.name(),
KEY_ID_VALUE,
KEY_128B_VALUE
));
}
private void testE2E(String encryptionKeyType, String encryptionKeyId, String encryptionKeyHex, String decryptionKeyType, String decryptionKeyId, String decryptionKeyHex) throws Exception {
putRunner.setProperty(PutAzureBlobStorage.BLOB, TEST_BLOB_NAME);
putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, encryptionKeyType);
if (encryptionKeyId == null || encryptionKeyId.isEmpty() || encryptionKeyId.trim().isEmpty()) {
putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
} else {
putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, encryptionKeyId);
}
if (encryptionKeyHex == null || encryptionKeyHex.isEmpty() || encryptionKeyHex.trim().isEmpty()) {
putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
} else {
putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, encryptionKeyHex);
}
putRunner.assertValid();
putRunner.enqueue(TEST_FILE_CONTENT.getBytes());
putRunner.run();
putRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2);
listRunner.assertValid();
listRunner.run();
listRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
MockFlowFile entry = listRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0);
entry.assertAttributeEquals("mime.type", "application/octet-stream");
fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, decryptionKeyType);
if (decryptionKeyId == null || decryptionKeyId.isEmpty() || decryptionKeyId.trim().isEmpty()) {
fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID);
} else {
fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, decryptionKeyId);
}
if (decryptionKeyHex == null || decryptionKeyHex.isEmpty() || decryptionKeyHex.trim().isEmpty()) {
fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX);
} else {
fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, decryptionKeyHex);
}
fetchRunner.assertValid();
fetchRunner.enqueue(entry);
fetchRunner.run();
fetchRunner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
MockFlowFile fetchedEntry = fetchRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0);
fetchedEntry.assertContentEquals(TEST_FILE_CONTENT);
}
}

View File

@ -1,66 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.nifi.processor.Processor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class ITDeleteAzureBlobStorage extends AbstractAzureBlobStorageIT {
@Override
protected Class<? extends Processor> getProcessorClass() {
return DeleteAzureBlobStorage.class;
}
@BeforeEach
public void setUp() throws Exception {
runner.setProperty(DeleteAzureBlobStorage.BLOB, TEST_BLOB_NAME);
uploadTestBlob();
}
@Test
public void testDeleteBlob() {
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run(1);
assertResult();
}
@Test
public void testDeleteBlobUsingCredentialsService() throws Exception {
configureCredentialsService();
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run(1);
assertResult();
}
private void assertResult() {
runner.assertAllFlowFilesTransferred(DeleteAzureBlobStorage.REL_SUCCESS);
Iterable<ListBlobItem> blobs = container.listBlobs(TEST_BLOB_NAME);
assertFalse(blobs.iterator().hasNext());
}
}

View File

@ -1,138 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
public class ITFetchAzureBlobStorage extends AbstractAzureBlobStorageIT {
@Override
protected Class<? extends Processor> getProcessorClass() {
return FetchAzureBlobStorage.class;
}
@BeforeEach
public void setUp() throws Exception {
runner.setProperty(FetchAzureBlobStorage.BLOB, TEST_BLOB_NAME);
uploadTestBlob();
}
@Test
public void testFetchBlob() throws Exception {
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult();
}
@Test
public void testFetchBlobWithRangeZeroOne() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B");
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult(TEST_FILE_CONTENT.substring(0, 1));
}
@Test
public void testFetchBlobWithRangeOneOne() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "1B");
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult(TEST_FILE_CONTENT.substring(1, 2));
}
@Test
public void testFetchBlobWithRangeTwentyThreeTwentySix() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "23B");
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "3B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult(TEST_FILE_CONTENT.substring(23, 26));
}
@Test
public void testFetchBlobWithRangeLengthGreater() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B");
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1KB");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult(TEST_FILE_CONTENT);
}
@Test
public void testFetchBlobWithRangeLengthUnset() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult(TEST_FILE_CONTENT);
}
@Test
public void testFetchBlobWithRangeStartOutOfRange() throws Exception {
runner.setProperty(FetchAzureBlobStorage.RANGE_START, String.format("%sB", TEST_FILE_CONTENT.length() + 1));
runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B");
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_FAILURE, 1);
}
@Test
public void testFetchBlobUsingCredentialService() throws Exception {
configureCredentialsService();
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult();
}
private void assertResult() throws Exception {
assertResult(TEST_FILE_CONTENT);
}
private void assertResult(final String expectedContent) throws Exception {
runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) {
flowFile.assertContentEquals(expectedContent);
flowFile.assertAttributeEquals("azure.length", String.valueOf(TEST_FILE_CONTENT.length()));
}
}
}

View File

@ -1,129 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ITListAzureBlobStorage extends AbstractAzureBlobStorageIT {
@Override
protected Class<? extends Processor> getProcessorClass() {
return ListAzureBlobStorage.class;
}
@BeforeEach
public void setUp() throws Exception {
uploadTestBlob();
waitForUpload();
}
@Test
public void testListBlobs() throws Exception {
runner.assertValid();
runner.run(1);
assertResult();
}
@Test
public void testListBlobsUsingCredentialService() throws Exception {
configureCredentialsService();
runner.assertValid();
runner.run(1);
assertResult();
}
@Test
public void testListWithMinAge() throws Exception {
runner.setProperty(ListAzureBlobStorage.MIN_AGE, "1 hour");
runner.assertValid();
runner.run(1);
runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 0);
}
@Test
public void testListWithMaxAge() throws Exception {
runner.setProperty(ListAzureBlobStorage.MAX_AGE, "1 hour");
runner.assertValid();
runner.run(1);
assertResult(TEST_FILE_CONTENT);
}
@Test
public void testListWithMinSize() throws Exception {
uploadTestBlob("nifi-test-blob2", "Test");
waitForUpload();
assertListCount();
runner.setProperty(ListAzureBlobStorage.MIN_SIZE, "5 B");
runner.assertValid();
runner.run(1);
assertResult(TEST_FILE_CONTENT);
}
@Test
public void testListWithMaxSize() throws Exception {
uploadTestBlob("nifi-test-blob2", "Test");
waitForUpload();
assertListCount();
runner.setProperty(ListAzureBlobStorage.MAX_SIZE, "5 B");
runner.assertValid();
runner.run(1);
assertResult("Test");
}
private void waitForUpload() throws InterruptedException {
Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2);
}
private void assertResult() {
assertResult(TEST_FILE_CONTENT);
}
private void assertResult(final String content) {
runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1);
for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
entry.assertAttributeEquals("azure.length", String.valueOf(content.getBytes(StandardCharsets.UTF_8).length));
entry.assertAttributeEquals("mime.type", "application/octet-stream");
}
}
private void assertListCount() {
final long listCount = StreamSupport.stream(container.listBlobs().spliterator(), false).count();
assertEquals(2, listCount, "There should be 2 uploaded files but found only " + listCount);
}
}

View File

@ -1,165 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod;
import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT {
public static final String TEST_FILE_CONTENT = "0123456789";
private static final String KEY_ID_VALUE = "key:id";
private static final String KEY_64B_VALUE = "1234567890ABCDEF";
private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
@Override
protected Class<? extends Processor> getProcessorClass() {
return PutAzureBlobStorage.class;
}
@BeforeEach
public void setUp() {
runner.setProperty(PutAzureBlobStorage.BLOB, TEST_BLOB_NAME);
}
@Test
public void testPutBlob() throws Exception {
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
runner.run();
assertResult();
}
@Test
public void testPutBlob64BSymmetricCSE() {
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_64B_VALUE);
runner.assertNotValid();
}
@Test
public void testPutBlob128BSymmetricCSE() throws Exception {
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_128B_VALUE);
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
runner.run();
assertResult();
}
@Test
public void testPutBlob192BSymmetricCSE() throws Exception {
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_192B_VALUE);
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
runner.run();
assertResult();
}
@Test
public void testPutBlob256BSymmetricCSE() throws Exception {
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_256B_VALUE);
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
runner.run();
assertResult();
}
@Test
public void testPutBlob384BSymmetricCSE() throws Exception {
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_384B_VALUE);
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
runner.run();
assertResult();
}
@Test
public void testPutBlob512BSymmetricCSE() throws Exception {
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name());
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE);
runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_512B_VALUE);
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
runner.run();
assertResult();
}
@Test
public void testPutBlobUsingCredentialsService() throws Exception {
configureCredentialsService();
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
runner.run();
assertResult();
}
@Test
public void testInvalidCredentialsRoutesToFailure() {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "invalid");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "aW52YWxpZGludmFsaWQ=");
runner.assertValid();
runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
runner.run();
runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1);
}
private void assertResult() throws Exception {
runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) {
flowFile.assertContentEquals(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8));
flowFile.assertAttributeEquals("azure.length", "10");
}
Iterable<ListBlobItem> blobs = container.listBlobs(TEST_BLOB_NAME);
assertTrue(blobs.iterator().hasNext());
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
public class TestPutAzureBlobStorage {
@Test
public void testIOExceptionDuringUploadTransfersToFailure() throws Exception {
PutAzureBlobStorage processor = Mockito.spy(new PutAzureBlobStorage());
doThrow(IOException.class).when(processor).uploadBlob(any(), any(), any(), any());
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutAzureBlobStorage.BLOB, "test");
runner.setProperty(AzureStorageUtils.CONTAINER, "test");
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "test");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "test");
runner.enqueue("test data");
runner.run();
runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1);
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.StorageException;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
@ -25,8 +24,6 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@ -39,7 +36,7 @@ public class TestPutAzureQueueStorage {
private final TestRunner runner = TestRunners.newTestRunner(PutAzureQueueStorage.class);
@Test
public void testInvalidTTLAndVisibilityDelay() throws StorageException, URISyntaxException, InvalidKeyException {
public void testInvalidTTLAndVisibilityDelay() {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");

View File

@ -1,160 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.PutAzureBlobStorage;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockValidationContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collection;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestAzureBlobClientSideEncryptionUtils {
private static final String KEY_ID_VALUE = "key:id";
private static final String KEY_64B_VALUE = "1234567890ABCDEF";
private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE;
private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE;
private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE;
private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE;
private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE;
private MockProcessContext processContext;
private MockValidationContext validationContext;
@BeforeEach
public void setUp() {
Processor processor = new PutAzureBlobStorage();
processContext = new MockProcessContext(processor);
validationContext = new MockValidationContext(processContext);
}
@Test
public void testNoCesConfiguredOnProcessor() {
configureProcessorProperties("NONE", null,null);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testSymmetricCesNoKeyIdOnProcessor() {
configureProcessorProperties("SYMMETRIC", null, KEY_128B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertNotValid(result);
}
@Test
public void testSymmetricCesNoKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,null);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertNotValid(result);
}
@Test
public void testSymmetricCesInvalidHexKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,"ZZ");
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertNotValid(result);
}
@Test
public void testSymmetricCes64BitKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_64B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertNotValid(result);
}
@Test
public void testSymmetricCes128BitKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_128B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testSymmetricCes192BitKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_192B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testSymmetricCes256BitKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_256B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testSymmetricCes384BitKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_384B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
@Test
public void testSymmetricCes512BitKeyOnProcessor() {
configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_512B_VALUE);
Collection<ValidationResult> result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext);
assertValid(result);
}
private void configureProcessorProperties(String keyType, String keyId, String symmetricKeyHex) {
if (keyType != null) {
processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, keyType);
}
if (keyId != null) {
processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, keyId);
}
if (symmetricKeyHex != null) {
processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, symmetricKeyHex);
}
}
private void assertValid(Collection<ValidationResult> result) {
assertTrue(result.isEmpty(), "There should be no validation error");
}
private void assertNotValid(Collection<ValidationResult> result) {
assertFalse(result.isEmpty(), "There should be validation error");
}
}

View File

@ -1,165 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.core.Base64;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.MockProcessContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestAzureStorageUtilsGetStorageCredentialsDetails {
private static final String CREDENTIALS_SERVICE_VALUE = "CredentialsService";
private static final String ACCOUNT_NAME_VALUE = "AccountName";
private static final String ACCOUNT_KEY_VALUE = Base64.encode("AccountKey".getBytes());
private static final String SAS_TOKEN_VALUE = "SasToken";
private MockProcessContext processContext;
@BeforeEach
public void setUp() {
Processor processor = new ListAzureBlobStorage();
processContext = new MockProcessContext(processor);
}
@Test
public void testAccountNameAndAccountKeyConfiguredOnProcessor() {
configureProcessorProperties(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE, null);
AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
assertStorageCredentialsDetailsAccountNameAndAccountKey(storageCredentialsDetails);
}
@Test
public void testAccountNameAndSasTokenConfiguredOnProcessor() {
configureProcessorProperties(ACCOUNT_NAME_VALUE, null, SAS_TOKEN_VALUE);
AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
assertStorageCredentialsDetailsAccountNameAndSasToken(storageCredentialsDetails);
}
@Test
public void testAccountNameAndAccountKeyConfiguredOnControllerService() {
configureControllerService(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE, null);
AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
assertStorageCredentialsDetailsAccountNameAndAccountKey(storageCredentialsDetails);
}
@Test
public void testAccountNameAndSasTokenConfiguredOnControllerService() {
configureControllerService(ACCOUNT_NAME_VALUE, null, SAS_TOKEN_VALUE);
AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null);
assertStorageCredentialsDetailsAccountNameAndSasToken(storageCredentialsDetails);
}
@Test
public void testAccountNameMissingConfiguredOnProcessor() {
configureProcessorProperties(null, ACCOUNT_KEY_VALUE, null);
assertThrows(IllegalArgumentException.class, () -> AzureStorageUtils.getStorageCredentialsDetails(processContext, null));
}
@Test
public void testAccountKeyAndSasTokenMissingConfiguredOnProcessor() {
configureProcessorProperties(ACCOUNT_NAME_VALUE, null, null);
assertThrows(IllegalArgumentException.class, () -> AzureStorageUtils.getStorageCredentialsDetails(processContext, null));
}
@Test
public void testAccountNameMissingConfiguredOnControllerService() {
configureControllerService(null, ACCOUNT_KEY_VALUE, null);
assertThrows(IllegalArgumentException.class, () -> AzureStorageUtils.getStorageCredentialsDetails(processContext, null));
}
@Test
public void testAccountKeyAndSasTokenMissingConfiguredOnControllerService() {
configureControllerService(ACCOUNT_NAME_VALUE, null, null);
assertThrows(IllegalArgumentException.class, () -> AzureStorageUtils.getStorageCredentialsDetails(processContext, null));
}
private void configureProcessorProperties(String accountName, String accountKey, String sasToken) {
if (accountName != null) {
processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, accountName);
}
if (accountKey != null) {
processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, accountKey);
}
if (sasToken != null) {
processContext.setProperty(AzureStorageUtils.PROP_SAS_TOKEN, sasToken);
}
}
private void configureControllerService(String accountName, String accountKey, String sasToken) {
AzureStorageCredentialsControllerService credentialsService = new AzureStorageCredentialsControllerService();
Map<PropertyDescriptor, String> properties = new HashMap<>();
if (accountName != null) {
properties.put(AzureStorageUtils.ACCOUNT_NAME, accountName);
}
if (accountKey != null) {
properties.put(AzureStorageUtils.ACCOUNT_KEY, accountKey);
}
if (sasToken != null) {
properties.put(AzureStorageUtils.PROP_SAS_TOKEN, sasToken);
}
MockConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
credentialsService.onEnabled(configurationContext);
processContext.addControllerService(credentialsService, CREDENTIALS_SERVICE_VALUE);
processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, CREDENTIALS_SERVICE_VALUE);
}
private void assertStorageCredentialsDetailsAccountNameAndAccountKey(AzureStorageCredentialsDetails storageCredentialsDetails) {
assertEquals(ACCOUNT_NAME_VALUE, storageCredentialsDetails.getStorageAccountName());
assertTrue(storageCredentialsDetails.getStorageCredentials() instanceof StorageCredentialsAccountAndKey);
StorageCredentialsAccountAndKey storageCredentials = (StorageCredentialsAccountAndKey) storageCredentialsDetails.getStorageCredentials();
assertEquals(ACCOUNT_NAME_VALUE, storageCredentials.getAccountName());
assertEquals(ACCOUNT_KEY_VALUE, storageCredentials.exportBase64EncodedKey());
}
private void assertStorageCredentialsDetailsAccountNameAndSasToken(AzureStorageCredentialsDetails storageCredentialsDetails) {
assertEquals(ACCOUNT_NAME_VALUE, storageCredentialsDetails.getStorageAccountName());
assertTrue(storageCredentialsDetails.getStorageCredentials() instanceof StorageCredentialsSharedAccessSignature);
StorageCredentialsSharedAccessSignature storageCredentials = (StorageCredentialsSharedAccessSignature) storageCredentialsDetails.getStorageCredentials();
assertEquals(SAS_TOKEN_VALUE, storageCredentials.getToken());
}
}

View File

@ -18,7 +18,7 @@ package org.apache.nifi.processors.azure.storage.utils;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage;
import org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockValidationContext;
import org.junit.jupiter.api.BeforeEach;
@ -41,7 +41,7 @@ public class TestAzureStorageUtilsValidateCredentialProperties {
@BeforeEach
public void setUp() {
Processor processor = new ListAzureBlobStorage();
Processor processor = new GetAzureQueueStorage();
processContext = new MockProcessContext(processor);
validationContext = new MockValidationContext(processContext);
}