From f7e36a07acb09a6aced0149450757445b76a06fb Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Tue, 9 May 2023 09:46:19 -0500 Subject: [PATCH] NIFI-11228 Removed deprecated Azure Blob Storage Processors This closes #7234 Signed-off-by: Nandor Soma Abonyi --- .../runtime/manifest/TestRuntimeManifest.java | 9 - .../nifi-azure-processors/pom.xml | 12 +- .../azure/AbstractAzureBlobProcessor.java | 116 -------- .../azure/storage/DeleteAzureBlobStorage.java | 107 -------- .../azure/storage/FetchAzureBlobStorage.java | 167 ------------ .../azure/storage/ListAzureBlobStorage.java | 255 ------------------ .../azure/storage/PutAzureBlobStorage.java | 221 --------------- .../AzureBlobClientSideEncryptionUtils.java | 118 -------- .../storage/utils/AzureStorageUtils.java | 14 - .../org.apache.nifi.processor.Processor | 4 - .../storage/AbstractAzureBlobStorageIT.java | 72 ----- .../azure/storage/ITAzureBlobStorageE2E.java | 245 ----------------- .../storage/ITDeleteAzureBlobStorage.java | 66 ----- .../storage/ITFetchAzureBlobStorage.java | 138 ---------- .../azure/storage/ITListAzureBlobStorage.java | 129 --------- .../azure/storage/ITPutAzureBlobStorage.java | 165 ------------ .../storage/TestPutAzureBlobStorage.java | 48 ---- .../queue/TestPutAzureQueueStorage.java | 5 +- ...estAzureBlobClientSideEncryptionUtils.java | 160 ----------- ...rageUtilsGetStorageCredentialsDetails.java | 165 ------------ ...rageUtilsValidateCredentialProperties.java | 4 +- 21 files changed, 7 insertions(+), 2213 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java delete mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java diff --git a/nifi-manifest/nifi-runtime-manifest-test/src/test/java/org/apache/nifi/runtime/manifest/TestRuntimeManifest.java b/nifi-manifest/nifi-runtime-manifest-test/src/test/java/org/apache/nifi/runtime/manifest/TestRuntimeManifest.java index d1ccd7a929..817a891596 100644 --- a/nifi-manifest/nifi-runtime-manifest-test/src/test/java/org/apache/nifi/runtime/manifest/TestRuntimeManifest.java +++ b/nifi-manifest/nifi-runtime-manifest-test/src/test/java/org/apache/nifi/runtime/manifest/TestRuntimeManifest.java @@ -271,15 +271,6 @@ class TestRuntimeManifest { assertNotNull(routeOnAttributeDef.getDynamicProperties().get(0).getValue()); assertNotNull(routeOnAttributeDef.getDynamicProperties().get(0).getExpressionLanguageScope()); - // Verify DeleteAzureBlobStorage is deprecated - final ProcessorDefinition deleteAzureBlobDef = getProcessorDefinition(bundles, "nifi-azure-nar", - "org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage"); - assertNotNull(deleteAzureBlobDef.getDeprecated()); - assertTrue(deleteAzureBlobDef.getDeprecated().booleanValue()); - assertNotNull(deleteAzureBlobDef.getDeprecationReason()); - assertNotNull(deleteAzureBlobDef.getDeprecationAlternatives()); - assertFalse(deleteAzureBlobDef.getDeprecationAlternatives().isEmpty()); - // Verify SplitJson has @SystemResourceConsiderations final ProcessorDefinition splitJsonDef = getProcessorDefinition(bundles, "nifi-standard-nar", "org.apache.nifi.processors.standard.SplitJson"); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 197ab20e76..87888cac8c 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -19,9 +19,6 @@ nifi-azure-processors jar - - 1.2.6 - org.apache.nifi @@ -96,11 +93,6 @@ azure-security-keyvault-keys - - com.microsoft.azure - azure-keyvault - ${azure-keyvault.version} - com.microsoft.azure azure-storage @@ -113,6 +105,10 @@ commons-io commons-io + + commons-codec + commons-codec + com.github.ben-manes.caffeine caffeine diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java deleted file mode 100644 index 9e2b855cc7..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure; - -import com.microsoft.azure.keyvault.cryptography.SymmetricKey; -import com.microsoft.azure.storage.blob.BlobEncryptionPolicy; -import com.microsoft.azure.storage.blob.BlobRequestOptions; -import org.apache.commons.codec.DecoderException; -import org.apache.commons.codec.binary.Hex; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod; -import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils; -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -public abstract class AbstractAzureBlobProcessor extends AbstractProcessor { - - public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder() - .name("blob") - .displayName("Blob") - .description("The filename of the blob") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(true) - .defaultValue("${azure.blobname}") - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All successfully processed FlowFiles are routed to this relationship") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Unsuccessful operations will be transferred to the failure relationship.") - .build(); - - private static final List PROPERTIES = Collections - .unmodifiableList(Arrays.asList( - AzureStorageUtils.CONTAINER, - AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, - AzureStorageUtils.ACCOUNT_NAME, - AzureStorageUtils.ACCOUNT_KEY, - AzureStorageUtils.PROP_SAS_TOKEN, - AzureStorageUtils.ENDPOINT_SUFFIX, - BLOB, - AzureStorageUtils.PROXY_CONFIGURATION_SERVICE)); - - private static final Set RELATIONSHIPS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList( - AbstractAzureBlobProcessor.REL_SUCCESS, - AbstractAzureBlobProcessor.REL_FAILURE))); - - @Override - protected List getSupportedPropertyDescriptors() { - return PROPERTIES; - } - - @Override - protected Collection customValidate(ValidationContext validationContext) { - final Collection results = AzureStorageUtils.validateCredentialProperties(validationContext); - AzureStorageUtils.validateProxySpec(validationContext, results); - return results; - } - - @Override - public Set getRelationships() { - return RELATIONSHIPS; - } - - protected BlobRequestOptions createBlobRequestOptions(ProcessContext context) throws DecoderException { - final String cseKeyTypeValue = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE).getValue(); - final AzureBlobClientSideEncryptionMethod cseKeyType = AzureBlobClientSideEncryptionMethod.valueOf(cseKeyTypeValue); - - final String cseKeyId = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID).getValue(); - - final String cseSymmetricKeyHex = context.getProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX).getValue(); - - BlobRequestOptions blobRequestOptions = new BlobRequestOptions(); - - if (cseKeyType == AzureBlobClientSideEncryptionMethod.SYMMETRIC) { - byte[] keyBytes = Hex.decodeHex(cseSymmetricKeyHex.toCharArray()); - SymmetricKey key = new SymmetricKey(cseKeyId, keyBytes); - BlobEncryptionPolicy policy = new BlobEncryptionPolicy(key, null); - blobRequestOptions.setEncryptionPolicy(policy); - } - - return blobRequestOptions; - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java deleted file mode 100644 index 3e4b236299..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import com.microsoft.azure.storage.OperationContext; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.DeleteSnapshotsOption; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.DeprecationNotice; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; - -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -@Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) -@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class, PutAzureBlobStorage.class}) -@CapabilityDescription("Deletes the provided blob from Azure Storage") -@InputRequirement(Requirement.INPUT_REQUIRED) -@DeprecationNotice(alternatives = DeleteAzureBlobStorage_v12.class, reason = "Processor depends on legacy Microsoft Azure SDK") -public class DeleteAzureBlobStorage extends AbstractAzureBlobProcessor { - - private static final AllowableValue DELETE_SNAPSHOTS_NONE = new AllowableValue(DeleteSnapshotsOption.NONE.name(), "None", "Delete the blob only."); - - private static final AllowableValue DELETE_SNAPSHOTS_ALSO = new AllowableValue(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS.name(), "Include Snapshots", "Delete the blob and its snapshots."); - - private static final AllowableValue DELETE_SNAPSHOTS_ONLY = new AllowableValue(DeleteSnapshotsOption.DELETE_SNAPSHOTS_ONLY.name(), "Delete Snapshots Only", "Delete only the blob's snapshots."); - - private static final PropertyDescriptor DELETE_SNAPSHOTS_OPTION = new PropertyDescriptor.Builder() - .name("delete-snapshots-option") - .displayName("Delete Snapshots Option") - .description("Specifies the snapshot deletion options to be used when deleting a blob.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .allowableValues(DELETE_SNAPSHOTS_NONE, DELETE_SNAPSHOTS_ALSO, DELETE_SNAPSHOTS_ONLY) - .defaultValue(DELETE_SNAPSHOTS_NONE.getValue()) - .required(true) - .build(); - - @Override - public List getSupportedPropertyDescriptors() { - List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); - properties.add(DELETE_SNAPSHOTS_OPTION); - return properties; - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - - if(flowFile == null) { - return; - } - - final long startNanos = System.nanoTime(); - final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); - final String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); - final String deleteSnapshotOptions = context.getProperty(DELETE_SNAPSHOTS_OPTION).getValue(); - - try { - CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile); - CloudBlobContainer container = blobClient.getContainerReference(containerName); - CloudBlob blob = container.getBlockBlobReference(blobPath); - - final OperationContext operationContext = new OperationContext(); - AzureStorageUtils.setProxy(operationContext, context); - blob.deleteIfExists(DeleteSnapshotsOption.valueOf(deleteSnapshotOptions), null, null, operationContext); - session.transfer(flowFile, REL_SUCCESS); - - final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().invokeRemoteProcess(flowFile, blob.getSnapshotQualifiedUri().toString(), "Blob deleted"); - } catch ( StorageException | URISyntaxException e) { - getLogger().error("Failed to delete the specified blob {} from Azure Storage. Routing to failure", new Object[]{blobPath}, e); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java deleted file mode 100644 index c4567edecd..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Collection; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import com.microsoft.azure.storage.OperationContext; -import org.apache.commons.codec.DecoderException; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.DeprecationNotice; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; -import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils; -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; - -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.BlobRequestOptions; - -@Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) -@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile") -@SeeAlso({ ListAzureBlobStorage.class, PutAzureBlobStorage.class, DeleteAzureBlobStorage.class }) -@InputRequirement(Requirement.INPUT_REQUIRED) -@WritesAttributes({ - @WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched") -}) -@DeprecationNotice(alternatives = FetchAzureBlobStorage_v12.class, reason = "Processor depends on legacy Microsoft Azure SDK") -public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { - - public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder() - .name("range-start") - .displayName("Range Start") - .description("The byte position at which to start reading from the blob. An empty value or a value of " + - "zero will start reading at the beginning of the blob.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(false) - .build(); - - public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder() - .name("range-length") - .displayName("Range Length") - .description("The number of bytes to download from the blob, starting from the Range Start. An empty " + - "value or a value that extends beyond the end of the blob will read to the end of the blob.") - .addValidator(StandardValidators.createDataSizeBoundsValidator(1, Long.MAX_VALUE)) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(false) - .build(); - - @Override - protected Collection customValidate(ValidationContext validationContext) { - final List results = new ArrayList<>(super.customValidate(validationContext)); - results.addAll(AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext)); - return results; - } - - @Override - public List getSupportedPropertyDescriptors() { - List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); - properties.add(RANGE_START); - properties.add(RANGE_LENGTH); - properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE); - properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID); - properties.add(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX); - return properties; - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final long startNanos = System.nanoTime(); - - final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); - final String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); - final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L); - final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null); - - AtomicReference storedException = new AtomicReference<>(); - try { - CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile); - CloudBlobContainer container = blobClient.getContainerReference(containerName); - - final OperationContext operationContext = new OperationContext(); - AzureStorageUtils.setProxy(operationContext, context); - - final Map attributes = new HashMap<>(); - final CloudBlob blob = container.getBlockBlobReference(blobPath); - - BlobRequestOptions blobRequestOptions = createBlobRequestOptions(context); - - // TODO - we may be able do fancier things with ranges and - // distribution of download over threads, investigate - flowFile = session.write(flowFile, os -> { - try { - blob.downloadRange(rangeStart, rangeLength, os, null, blobRequestOptions, operationContext); - } catch (StorageException e) { - storedException.set(e); - throw new IOException(e); - } - }); - - long length = blob.getProperties().getLength(); - attributes.put("azure.length", String.valueOf(length)); - - if (!attributes.isEmpty()) { - flowFile = session.putAllAttributes(flowFile, attributes); - } - - session.transfer(flowFile, REL_SUCCESS); - final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis); - } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException | DecoderException e) { - if (e instanceof ProcessException && storedException.get() == null) { - throw (ProcessException) e; - } else { - Exception failureException = Optional.ofNullable(storedException.get()).orElse(e); - getLogger().error("Failure to fetch Azure blob {}", new Object[]{blobPath}, failureException); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } - } - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java deleted file mode 100644 index f2d0f6bbc3..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.microsoft.azure.storage.OperationContext; -import com.microsoft.azure.storage.ResultContinuation; -import com.microsoft.azure.storage.ResultSegment; -import com.microsoft.azure.storage.StorageUri; -import com.microsoft.azure.storage.blob.BlobListingDetails; -import com.microsoft.azure.storage.blob.BlobProperties; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.CloudBlockBlob; -import com.microsoft.azure.storage.blob.ListBlobItem; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; -import org.apache.nifi.annotation.behavior.Stateful; -import org.apache.nifi.annotation.behavior.TriggerSerially; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.configuration.DefaultSchedule; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.DeprecationNotice; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.state.Scope; -import org.apache.nifi.context.PropertyContext; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processor.util.list.AbstractListProcessor; -import org.apache.nifi.processor.util.list.ListedEntityTracker; -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; -import org.apache.nifi.processors.azure.storage.utils.BlobInfo; -import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder; -import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.serialization.record.RecordSchema; - -import java.util.Optional; - -@PrimaryNodeOnly -@TriggerSerially -@Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) -@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class, DeleteAzureBlobStorage.class }) -@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage. " + - "This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " + - "previous node left off without duplicating all of the data.") -@InputRequirement(Requirement.INPUT_FORBIDDEN) -@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"), - @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"), - @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"), - @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"), - @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"), - @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), - @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"), - @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), - @WritesAttribute(attribute = "lang", description = "Language code for the content"), - @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") }) -@Stateful(scopes = { Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. " + - "This allows the Processor to list only blobs that have been added or modified after this date the next time that the Processor is run. State is " + - "stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up " + - "where the previous node left off, without duplicating the data.") -@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") -@DeprecationNotice(alternatives = ListAzureBlobStorage_v12.class, reason = "Processor depends on legacy Microsoft Azure SDK") -public class ListAzureBlobStorage extends AbstractListAzureProcessor { - - private static final PropertyDescriptor PROP_PREFIX = new PropertyDescriptor.Builder() - .name("prefix") - .displayName("Prefix") - .description("Search prefix for listing") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .required(false) - .build(); - - private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( - LISTING_STRATEGY, - AbstractListProcessor.RECORD_WRITER, - AzureStorageUtils.CONTAINER, - AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, - AzureStorageUtils.ACCOUNT_NAME, - AzureStorageUtils.ACCOUNT_KEY, - AzureStorageUtils.PROP_SAS_TOKEN, - AzureStorageUtils.ENDPOINT_SUFFIX, - PROP_PREFIX, - AzureStorageUtils.PROXY_CONFIGURATION_SERVICE, - ListedEntityTracker.TRACKING_STATE_CACHE, - ListedEntityTracker.TRACKING_TIME_WINDOW, - ListedEntityTracker.INITIAL_LISTING_TARGET, - MIN_AGE, - MAX_AGE, - MIN_SIZE, - MAX_SIZE - )); - - @Override - protected List getSupportedPropertyDescriptors() { - return PROPERTIES; - } - - @Override - protected void customValidate(ValidationContext validationContext, Collection results) { - results.addAll(AzureStorageUtils.validateCredentialProperties(validationContext)); - AzureStorageUtils.validateProxySpec(validationContext, results); - } - - @Override - protected Map createAttributes(BlobInfo entity, ProcessContext context) { - final Map attributes = new HashMap<>(); - attributes.put("azure.container", entity.getContainerName()); - attributes.put("azure.etag", entity.getEtag()); - attributes.put("azure.primaryUri", entity.getPrimaryUri()); - attributes.put("azure.secondaryUri", entity.getSecondaryUri()); - attributes.put("azure.blobname", entity.getBlobName()); - attributes.put("filename", entity.getName()); - attributes.put("azure.blobtype", entity.getBlobType()); - attributes.put("azure.length", String.valueOf(entity.getLength())); - attributes.put("azure.timestamp", String.valueOf(entity.getTimestamp())); - attributes.put("mime.type", entity.getContentType()); - attributes.put("lang", entity.getContentLanguage()); - - return attributes; - } - - @Override - protected String getListingContainerName(final ProcessContext context) { - return String.format("Azure Blob Storage Container [%s]", getPath(context)); - } - - @Override - protected String getPath(final ProcessContext context) { - return context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue(); - } - - @Override - protected boolean isListingResetNecessary(final PropertyDescriptor property) { - // re-list if configuration changed, but not when security keys are rolled (not included in the condition) - return PROP_PREFIX.equals(property) - || AzureStorageUtils.ACCOUNT_NAME.equals(property) - || AzureStorageUtils.CONTAINER.equals(property) - || AzureStorageUtils.PROP_SAS_TOKEN.equals(property); - } - - @Override - protected Scope getStateScope(final PropertyContext context) { - return Scope.CLUSTER; - } - - @Override - protected RecordSchema getRecordSchema() { - return BlobInfo.getRecordSchema(); - } - - @Override - protected String getDefaultTimePrecision() { - // User does not have to choose one. - // AUTO_DETECT can handle most cases, but it may incur longer latency - // when all listed files do not have SECOND part in their timestamps although Azure Blob Storage does support seconds. - return PRECISION_SECONDS.getValue(); - } - - @Override - protected List performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException { - final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue(); - final String prefix = Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse(""); - final List listing = new ArrayList<>(); - final long minimumTimestamp = minTimestamp == null ? 0 : minTimestamp; - - try { - final CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null); - final CloudBlobContainer container = blobClient.getContainerReference(containerName); - - final OperationContext operationContext = new OperationContext(); - AzureStorageUtils.setProxy(operationContext, context); - - ResultContinuation continuationToken = null; - - do { - final ResultSegment result = container.listBlobsSegmented(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, continuationToken, null, operationContext); - continuationToken = result.getContinuationToken(); - - for (final ListBlobItem blob : result.getResults()) { - if (blob instanceof CloudBlob) { - final CloudBlob cloudBlob = (CloudBlob) blob; - final BlobProperties properties = cloudBlob.getProperties(); - - if (isFileInfoMatchesWithAgeAndSize(context, minimumTimestamp, properties.getLastModified().getTime(), properties.getLength())) { - final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri(); - - final Builder builder = new BlobInfo.Builder() - .primaryUri(uri.getPrimaryUri().toString()) - .blobName(cloudBlob.getName()) - .containerName(containerName) - .contentType(properties.getContentType()) - .contentLanguage(properties.getContentLanguage()) - .etag(properties.getEtag()) - .lastModifiedTime(properties.getLastModified().getTime()) - .length(properties.getLength()); - - if (uri.getSecondaryUri() != null) { - builder.secondaryUri(uri.getSecondaryUri().toString()); - } - - if (blob instanceof CloudBlockBlob) { - builder.blobType(AzureStorageUtils.BLOCK); - } else { - builder.blobType(AzureStorageUtils.PAGE); - } - listing.add(builder.build()); - } - } - } - } while (continuationToken != null); - } catch (final Throwable t) { - throw new IOException(ExceptionUtils.getRootCause(t)); - } - return listing; - } - - // Unfiltered listing is not supported - must provide a prefix - @Override - protected Integer countUnfilteredListing(final ProcessContext context) { - return null; - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java deleted file mode 100644 index 6cd779f381..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.FilterInputStream; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Collection; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import com.microsoft.azure.storage.OperationContext; -import org.apache.commons.codec.DecoderException; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.DeprecationNotice; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; -import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils; -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; - -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.BlobProperties; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.BlobRequestOptions; - -@Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) -@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class, DeleteAzureBlobStorage.class }) -@CapabilityDescription("Puts content into an Azure Storage Blob") -@InputRequirement(Requirement.INPUT_REQUIRED) -@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"), - @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"), - @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"), - @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"), - @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), - @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob")}) -@DeprecationNotice(alternatives = PutAzureBlobStorage_v12.class, reason = "Processor depends on legacy Microsoft Azure SDK") -public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { - - public static final PropertyDescriptor BLOB_NAME = new PropertyDescriptor.Builder() - .name("blob") - .displayName("Blob") - .description("The filename of the blob") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(true) - .build(); - - public static final PropertyDescriptor CREATE_CONTAINER = new PropertyDescriptor.Builder() - .name("azure-create-container") - .displayName("Create Container") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .required(true) - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .allowableValues("true", "false") - .defaultValue("false") - .description("Specifies whether to check if the container exists and to automatically create it if it does not. " + - "Permission to list containers is required. If false, this check is not made, but the Put operation " + - "will fail if the container does not exist.") - .build(); - - @Override - protected Collection customValidate(ValidationContext validationContext) { - final List results = new ArrayList<>(super.customValidate(validationContext)); - results.addAll(AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext)); - return results; - } - - @Override - public List getSupportedPropertyDescriptors() { - List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); - properties.remove(BLOB); - properties.add(BLOB_NAME); - properties.add(CREATE_CONTAINER); - properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE); - properties.add(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID); - properties.add(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX); - return properties; - } - - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final long startNanos = System.nanoTime(); - - String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); - - String blobPath = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue(); - - final boolean createContainer = context.getProperty(CREATE_CONTAINER).asBoolean(); - - AtomicReference storedException = new AtomicReference<>(); - try { - CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), flowFile); - CloudBlobContainer container = blobClient.getContainerReference(containerName); - - if (createContainer) - container.createIfNotExists(); - - CloudBlob blob = container.getBlockBlobReference(blobPath); - - final OperationContext operationContext = new OperationContext(); - AzureStorageUtils.setProxy(operationContext, context); - - BlobRequestOptions blobRequestOptions = createBlobRequestOptions(context); - - final Map attributes = new HashMap<>(); - long length = flowFile.getSize(); - session.read(flowFile, rawIn -> { - InputStream in = rawIn; - if (!(in instanceof BufferedInputStream)) { - // do not double-wrap - in = new BufferedInputStream(rawIn); - } - - // If markSupported() is true and a file length is provided, - // Blobs are not uploaded in blocks resulting in OOME for large - // files. The UnmarkableInputStream wrapper class disables - // mark() and reset() to help force uploading files in chunks. - if (in.markSupported()) { - in = new UnmarkableInputStream(in); - } - - try { - uploadBlob(blob, operationContext, blobRequestOptions, in); - BlobProperties properties = blob.getProperties(); - attributes.put("azure.container", containerName); - attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString()); - attributes.put("azure.etag", properties.getEtag()); - attributes.put("azure.length", String.valueOf(length)); - attributes.put("azure.timestamp", String.valueOf(properties.getLastModified())); - } catch (StorageException | URISyntaxException | IOException e) { - storedException.set(e); - throw e instanceof IOException ? (IOException) e : new IOException(e); - } - }); - - if (!attributes.isEmpty()) { - flowFile = session.putAllAttributes(flowFile, attributes); - } - session.transfer(flowFile, REL_SUCCESS); - - final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis); - - } catch (IllegalArgumentException | URISyntaxException | StorageException | ProcessException | DecoderException e) { - if (e instanceof ProcessException && storedException.get() == null) { - throw (ProcessException) e; - } else { - Exception failureException = Optional.ofNullable(storedException.get()).orElse(e); - getLogger().error("Failed to put Azure blob {}", new Object[]{blobPath}, failureException); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } - } - - } - - void uploadBlob(CloudBlob blob, OperationContext operationContext, BlobRequestOptions blobRequestOptions, InputStream in) throws StorageException, IOException { - blob.upload(in, -1, null, blobRequestOptions, operationContext); - } - - // Used to help force Azure Blob SDK to write in blocks - private static class UnmarkableInputStream extends FilterInputStream { - public UnmarkableInputStream(InputStream in) { - super(in); - } - - @Override - public void mark(int readlimit) { - } - - @Override - public void reset() throws IOException { - } - - @Override - public boolean markSupported() { - return false; - } - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java deleted file mode 100644 index 6e7a2ce712..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureBlobClientSideEncryptionUtils.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage.utils; - -import com.microsoft.azure.keyvault.cryptography.SymmetricKey; -import org.apache.commons.codec.DecoderException; -import org.apache.commons.codec.binary.Hex; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.StringUtils; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -public class AzureBlobClientSideEncryptionUtils { - - private static final String DEFAULT_KEY_ID = "nifi"; - - public static final PropertyDescriptor CSE_KEY_TYPE = new PropertyDescriptor.Builder() - .name("cse-key-type") - .displayName("Client-Side Encryption Key Type") - .required(true) - .allowableValues(buildCseEncryptionMethodAllowableValues()) - .defaultValue(AzureBlobClientSideEncryptionMethod.NONE.name()) - .description("Specifies the key type to use for client-side encryption.") - .build(); - - public static final PropertyDescriptor CSE_KEY_ID = new PropertyDescriptor.Builder() - .name("cse-key-id") - .displayName("Client-Side Encryption Key ID") - .description("Specifies the ID of the key to use for client-side encryption.") - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(false) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .dependsOn(CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name()) - .build(); - - public static final PropertyDescriptor CSE_SYMMETRIC_KEY_HEX = new PropertyDescriptor.Builder() - .name("cse-symmetric-key-hex") - .displayName("Symmetric Key") - .description("When using symmetric client-side encryption, this is the raw key, encoded in hexadecimal") - .required(false) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .dependsOn(CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name()) - .sensitive(true) - .build(); - - private static AllowableValue[] buildCseEncryptionMethodAllowableValues() { - return Arrays.stream(AzureBlobClientSideEncryptionMethod.values()) - .map(v -> new AllowableValue(v.name(), v.name(), v.getDescription())) - .toArray(AllowableValue[]::new); - } - - public static Collection validateClientSideEncryptionProperties(ValidationContext validationContext) { - final List validationResults = new ArrayList<>(); - - final String cseKeyTypeValue = validationContext.getProperty(CSE_KEY_TYPE).getValue(); - final AzureBlobClientSideEncryptionMethod cseKeyType = AzureBlobClientSideEncryptionMethod.valueOf(cseKeyTypeValue); - - final String cseKeyId = validationContext.getProperty(CSE_KEY_ID).getValue(); - - final String cseSymmetricKeyHex = validationContext.getProperty(CSE_SYMMETRIC_KEY_HEX).getValue(); - - if (cseKeyType != AzureBlobClientSideEncryptionMethod.NONE && StringUtils.isBlank(cseKeyId)) { - validationResults.add(new ValidationResult.Builder().subject(CSE_KEY_ID.getDisplayName()) - .explanation("a key ID must be set when client-side encryption is enabled.").build()); - } - - if (cseKeyType == AzureBlobClientSideEncryptionMethod.SYMMETRIC) { - validationResults.addAll(validateSymmetricKey(cseSymmetricKeyHex)); - } - - return validationResults; - } - - private static List validateSymmetricKey(String keyHex) { - final List validationResults = new ArrayList<>(); - if (StringUtils.isBlank(keyHex)) { - validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName()) - .explanation("a symmetric key must not be set when client-side encryption is enabled with symmetric encryption.").build()); - } else { - byte[] keyBytes; - try { - keyBytes = Hex.decodeHex(keyHex.toCharArray()); - new SymmetricKey(DEFAULT_KEY_ID, keyBytes); - } catch (DecoderException e) { - validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName()) - .explanation("the symmetric key must be a valid hexadecimal string.").build()); - } catch (IllegalArgumentException e) { - validationResults.add(new ValidationResult.Builder().subject(CSE_SYMMETRIC_KEY_HEX.getDisplayName()) - .explanation(e.getMessage()).build()); - } - } - - return validationResults; - } - -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java index 5f17fbd43e..18015f560b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java @@ -32,7 +32,6 @@ import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.StorageCredentials; import com.microsoft.azure.storage.StorageCredentialsAccountAndKey; import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; -import com.microsoft.azure.storage.blob.CloudBlobClient; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.PropertyDescriptor; @@ -41,7 +40,6 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.proxy.ProxyConfiguration; @@ -206,18 +204,6 @@ public final class AzureStorageUtils { // do not instantiate } - /** - * Create CloudBlobClient instance. - * @param flowFile An incoming FlowFile can be used for NiFi Expression Language evaluation to derive - * Account Name, Account Key or SAS Token. This can be null if not available. - */ - public static CloudBlobClient createCloudBlobClient(ProcessContext context, ComponentLog logger, FlowFile flowFile) throws URISyntaxException { - final AzureStorageCredentialsDetails storageCredentialsDetails = getStorageCredentialsDetails(context, flowFile); - final CloudStorageAccount cloudStorageAccount = getCloudStorageAccount(storageCredentialsDetails); - final CloudBlobClient cloudBlobClient = cloudStorageAccount.createCloudBlobClient(); - return cloudBlobClient; - } - public static CloudStorageAccount getCloudStorageAccount(final AzureStorageCredentialsDetails storageCredentialsDetails) throws URISyntaxException { final CloudStorageAccount cloudStorageAccount; if (storageCredentialsDetails instanceof AzureStorageEmulatorCredentialsDetails) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index ce7e074a0e..f9e86a4add 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -15,10 +15,6 @@ org.apache.nifi.processors.azure.eventhub.PutAzureEventHub org.apache.nifi.processors.azure.eventhub.GetAzureEventHub org.apache.nifi.processors.azure.eventhub.ConsumeAzureEventHub -org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage -org.apache.nifi.processors.azure.storage.ListAzureBlobStorage -org.apache.nifi.processors.azure.storage.PutAzureBlobStorage -org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java deleted file mode 100644 index afd6a97502..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.UUID; - -import static org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_BLOB_ENDPOINT_SUFFIX; - -public abstract class AbstractAzureBlobStorageIT extends AbstractAzureStorageIT { - - protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container"; - protected static final String TEST_BLOB_NAME = "nifi-test-blob"; - protected static final String TEST_FILE_NAME = "nifi-test-file"; - protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"; - - protected CloudBlobContainer container; - - @Override - protected String getDefaultEndpointSuffix() { - return DEFAULT_BLOB_ENDPOINT_SUFFIX; - } - - @BeforeEach - public void setUpAzureBlobStorageIT() throws Exception { - String containerName = String.format("%s-%s", TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID()); - CloudBlobClient blobClient = getStorageAccount().createCloudBlobClient(); - container = blobClient.getContainerReference(containerName); - container.createIfNotExists(); - - runner.setProperty(AzureStorageUtils.CONTAINER, containerName); - } - - @AfterEach - public void tearDownAzureBlobStorageIT() throws Exception { - container.deleteIfExists(); - } - - protected void uploadTestBlob() throws Exception { - uploadTestBlob(TEST_BLOB_NAME, TEST_FILE_CONTENT); - } - - protected void uploadTestBlob(final String blobName, final String fileContent) throws Exception { - CloudBlob blob = container.getBlockBlobReference(blobName); - byte[] buf = fileContent.getBytes(StandardCharsets.UTF_8); - InputStream in = new ByteArrayInputStream(buf); - blob.upload(in, buf.length); - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java deleted file mode 100644 index 37c30a7ca9..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITAzureBlobStorageE2E.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import com.microsoft.azure.storage.CloudStorageAccount; -import com.microsoft.azure.storage.StorageCredentials; -import com.microsoft.azure.storage.StorageCredentialsAccountAndKey; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; -import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod; -import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils; -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.apache.nifi.util.file.FileUtils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.fail; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; - -public class ITAzureBlobStorageE2E { - - private static final Properties CONFIG; - - private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES"; - - static { - CONFIG = new Properties(); - try { - final FileInputStream fis = new FileInputStream(CREDENTIALS_FILE); - assertDoesNotThrow(() -> CONFIG.load(fis), - "Could not open credentials file " + CREDENTIALS_FILE); - FileUtils.closeQuietly(fis); - } catch (FileNotFoundException e) { - fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage()); - } - } - - protected static String getAccountName() { - return CONFIG.getProperty("accountName"); - } - - protected static String getAccountKey() { - return CONFIG.getProperty("accountKey"); - } - - protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container"; - protected static final String TEST_BLOB_NAME = "nifi-test-blob"; - protected static final String TEST_FILE_CONTENT = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"; - - private static final String KEY_ID_VALUE = "key:id"; - private static final String KEY_64B_VALUE = "1234567890ABCDEF"; - private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE; - private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE; - private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE; - private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE; - private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE; - - protected TestRunner putRunner; - protected TestRunner listRunner; - protected TestRunner fetchRunner; - - protected CloudBlobContainer container; - - @BeforeEach - public void setupRunners() throws Exception { - putRunner = TestRunners.newTestRunner(new PutAzureBlobStorage()); - listRunner = TestRunners.newTestRunner(new ListAzureBlobStorage()); - fetchRunner = TestRunners.newTestRunner(new FetchAzureBlobStorage()); - - String containerName = String.format("%s-%s", TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID()); - - StorageCredentials storageCredentials = new StorageCredentialsAccountAndKey(getAccountName(), getAccountKey()); - CloudStorageAccount storageAccount = new CloudStorageAccount(storageCredentials, true); - - CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); - container = blobClient.getContainerReference(containerName); - container.createIfNotExists(); - - setRunnerProperties(putRunner, containerName); - setRunnerProperties(listRunner, containerName); - setRunnerProperties(fetchRunner, containerName); - } - - private void setRunnerProperties(TestRunner runner, String containerName) { - runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, getAccountName()); - runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, getAccountKey()); - runner.setProperty(AzureStorageUtils.CONTAINER, containerName); - } - - @AfterEach - public void tearDownAzureContainer() throws Exception { - container.deleteIfExists(); - } - - @Test - public void AzureBlobStorageE2ENoCSE() throws Exception { - testE2E(AzureBlobClientSideEncryptionMethod.NONE.name(), - null, - null, - AzureBlobClientSideEncryptionMethod.NONE.name(), - null, - null - ); - } - - @Test - public void AzureBlobStorageE2E128BCSE() throws Exception { - testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(), - KEY_ID_VALUE, - KEY_128B_VALUE, - AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(), - KEY_ID_VALUE, - KEY_128B_VALUE - ); - } - - @Test - public void AzureBlobStorageE2E192BCSE() throws Exception { - testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(), - KEY_ID_VALUE, - KEY_192B_VALUE, - AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(), - KEY_ID_VALUE, - KEY_192B_VALUE - ); - } - - @Test - public void AzureBlobStorageE2E256BCSE() throws Exception { - testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(), - KEY_ID_VALUE, - KEY_256B_VALUE, - AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(), - KEY_ID_VALUE, - KEY_256B_VALUE - ); - } - - @Test - public void AzureBlobStorageE2E384BCSE() throws Exception { - testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(), - KEY_ID_VALUE, - KEY_384B_VALUE, - AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(), - KEY_ID_VALUE, - KEY_384B_VALUE - ); - } - - @Test - public void AzureBlobStorageE2E512BCSE() throws Exception { - testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(), - KEY_ID_VALUE, - KEY_512B_VALUE, - AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(), - KEY_ID_VALUE, - KEY_512B_VALUE - ); - } - - @Test - public void AzureBlobStorageE2E128BCSENoDecryption() { - assertThrows(Exception.class, () -> testE2E(AzureBlobClientSideEncryptionMethod.SYMMETRIC.name(), - KEY_ID_VALUE, - KEY_128B_VALUE, - AzureBlobClientSideEncryptionMethod.NONE.name(), - KEY_ID_VALUE, - KEY_128B_VALUE - )); - } - - private void testE2E(String encryptionKeyType, String encryptionKeyId, String encryptionKeyHex, String decryptionKeyType, String decryptionKeyId, String decryptionKeyHex) throws Exception { - putRunner.setProperty(PutAzureBlobStorage.BLOB, TEST_BLOB_NAME); - putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, encryptionKeyType); - if (encryptionKeyId == null || encryptionKeyId.isEmpty() || encryptionKeyId.trim().isEmpty()) { - putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID); - } else { - putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, encryptionKeyId); - } - if (encryptionKeyHex == null || encryptionKeyHex.isEmpty() || encryptionKeyHex.trim().isEmpty()) { - putRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX); - } else { - putRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, encryptionKeyHex); - } - - putRunner.assertValid(); - putRunner.enqueue(TEST_FILE_CONTENT.getBytes()); - putRunner.run(); - putRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1); - - Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2); - - listRunner.assertValid(); - listRunner.run(); - listRunner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1); - - MockFlowFile entry = listRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0); - entry.assertAttributeEquals("mime.type", "application/octet-stream"); - - fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, decryptionKeyType); - if (decryptionKeyId == null || decryptionKeyId.isEmpty() || decryptionKeyId.trim().isEmpty()) { - fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID); - } else { - fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, decryptionKeyId); - } - if (decryptionKeyHex == null || decryptionKeyHex.isEmpty() || decryptionKeyHex.trim().isEmpty()) { - fetchRunner.removeProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX); - } else { - fetchRunner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, decryptionKeyHex); - } - fetchRunner.assertValid(); - fetchRunner.enqueue(entry); - fetchRunner.run(); - fetchRunner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1); - MockFlowFile fetchedEntry = fetchRunner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS).get(0); - fetchedEntry.assertContentEquals(TEST_FILE_CONTENT); - } - -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java deleted file mode 100644 index 26b8fea1bf..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import com.microsoft.azure.storage.blob.ListBlobItem; -import org.apache.nifi.processor.Processor; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertFalse; - -public class ITDeleteAzureBlobStorage extends AbstractAzureBlobStorageIT { - - @Override - protected Class getProcessorClass() { - return DeleteAzureBlobStorage.class; - } - - @BeforeEach - public void setUp() throws Exception { - runner.setProperty(DeleteAzureBlobStorage.BLOB, TEST_BLOB_NAME); - - uploadTestBlob(); - } - - @Test - public void testDeleteBlob() { - runner.assertValid(); - runner.enqueue(new byte[0]); - runner.run(1); - - assertResult(); - } - - @Test - public void testDeleteBlobUsingCredentialsService() throws Exception { - configureCredentialsService(); - - runner.assertValid(); - runner.enqueue(new byte[0]); - runner.run(1); - - assertResult(); - } - - private void assertResult() { - runner.assertAllFlowFilesTransferred(DeleteAzureBlobStorage.REL_SUCCESS); - - Iterable blobs = container.listBlobs(TEST_BLOB_NAME); - assertFalse(blobs.iterator().hasNext()); - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java deleted file mode 100644 index cd7e062901..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; -import org.apache.nifi.util.MockFlowFile; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.List; - -public class ITFetchAzureBlobStorage extends AbstractAzureBlobStorageIT { - - @Override - protected Class getProcessorClass() { - return FetchAzureBlobStorage.class; - } - - @BeforeEach - public void setUp() throws Exception { - runner.setProperty(FetchAzureBlobStorage.BLOB, TEST_BLOB_NAME); - - uploadTestBlob(); - } - - @Test - public void testFetchBlob() throws Exception { - runner.assertValid(); - runner.enqueue(new byte[0]); - runner.run(); - - assertResult(); - } - - @Test - public void testFetchBlobWithRangeZeroOne() throws Exception { - runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B"); - runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B"); - runner.assertValid(); - runner.enqueue(new byte[0]); - runner.run(); - - assertResult(TEST_FILE_CONTENT.substring(0, 1)); - } - - @Test - public void testFetchBlobWithRangeOneOne() throws Exception { - runner.setProperty(FetchAzureBlobStorage.RANGE_START, "1B"); - runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B"); - runner.assertValid(); - runner.enqueue(new byte[0]); - runner.run(); - - assertResult(TEST_FILE_CONTENT.substring(1, 2)); - } - - @Test - public void testFetchBlobWithRangeTwentyThreeTwentySix() throws Exception { - runner.setProperty(FetchAzureBlobStorage.RANGE_START, "23B"); - runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "3B"); - runner.assertValid(); - runner.enqueue(new byte[0]); - runner.run(); - - assertResult(TEST_FILE_CONTENT.substring(23, 26)); - } - - @Test - public void testFetchBlobWithRangeLengthGreater() throws Exception { - runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B"); - runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1KB"); - runner.assertValid(); - runner.enqueue(new byte[0]); - runner.run(); - - assertResult(TEST_FILE_CONTENT); - } - - @Test - public void testFetchBlobWithRangeLengthUnset() throws Exception { - runner.setProperty(FetchAzureBlobStorage.RANGE_START, "0B"); - runner.assertValid(); - runner.enqueue(new byte[0]); - runner.run(); - - assertResult(TEST_FILE_CONTENT); - } - - @Test - public void testFetchBlobWithRangeStartOutOfRange() throws Exception { - runner.setProperty(FetchAzureBlobStorage.RANGE_START, String.format("%sB", TEST_FILE_CONTENT.length() + 1)); - runner.setProperty(FetchAzureBlobStorage.RANGE_LENGTH, "1B"); - runner.assertValid(); - runner.enqueue(new byte[0]); - runner.run(); - - runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_FAILURE, 1); - } - - @Test - public void testFetchBlobUsingCredentialService() throws Exception { - configureCredentialsService(); - - runner.assertValid(); - runner.enqueue(new byte[0]); - runner.run(); - - assertResult(); - } - - private void assertResult() throws Exception { - assertResult(TEST_FILE_CONTENT); - } - - private void assertResult(final String expectedContent) throws Exception { - runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1); - List flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS); - for (MockFlowFile flowFile : flowFilesForRelationship) { - flowFile.assertContentEquals(expectedContent); - flowFile.assertAttributeEquals("azure.length", String.valueOf(TEST_FILE_CONTENT.length())); - } - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java deleted file mode 100644 index ef1670bdd9..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import org.apache.nifi.processor.Processor; -import org.apache.nifi.util.MockFlowFile; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.nio.charset.StandardCharsets; -import java.util.concurrent.TimeUnit; -import java.util.stream.StreamSupport; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class ITListAzureBlobStorage extends AbstractAzureBlobStorageIT { - - @Override - protected Class getProcessorClass() { - return ListAzureBlobStorage.class; - } - - @BeforeEach - public void setUp() throws Exception { - uploadTestBlob(); - waitForUpload(); - } - - @Test - public void testListBlobs() throws Exception { - runner.assertValid(); - runner.run(1); - - assertResult(); - } - - @Test - public void testListBlobsUsingCredentialService() throws Exception { - configureCredentialsService(); - - runner.assertValid(); - runner.run(1); - - assertResult(); - } - - @Test - public void testListWithMinAge() throws Exception { - runner.setProperty(ListAzureBlobStorage.MIN_AGE, "1 hour"); - - runner.assertValid(); - runner.run(1); - - runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 0); - } - - @Test - public void testListWithMaxAge() throws Exception { - runner.setProperty(ListAzureBlobStorage.MAX_AGE, "1 hour"); - - runner.assertValid(); - runner.run(1); - - assertResult(TEST_FILE_CONTENT); - } - - @Test - public void testListWithMinSize() throws Exception { - uploadTestBlob("nifi-test-blob2", "Test"); - waitForUpload(); - assertListCount(); - runner.setProperty(ListAzureBlobStorage.MIN_SIZE, "5 B"); - - runner.assertValid(); - runner.run(1); - - assertResult(TEST_FILE_CONTENT); - } - - @Test - public void testListWithMaxSize() throws Exception { - uploadTestBlob("nifi-test-blob2", "Test"); - waitForUpload(); - assertListCount(); - runner.setProperty(ListAzureBlobStorage.MAX_SIZE, "5 B"); - - runner.assertValid(); - runner.run(1); - - assertResult("Test"); - } - - private void waitForUpload() throws InterruptedException { - Thread.sleep(ListAzureBlobStorage.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS) * 2); - } - - private void assertResult() { - assertResult(TEST_FILE_CONTENT); - } - - private void assertResult(final String content) { - runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1); - runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1); - - for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) { - entry.assertAttributeEquals("azure.length", String.valueOf(content.getBytes(StandardCharsets.UTF_8).length)); - entry.assertAttributeEquals("mime.type", "application/octet-stream"); - } - } - - private void assertListCount() { - final long listCount = StreamSupport.stream(container.listBlobs().spliterator(), false).count(); - assertEquals(2, listCount, "There should be 2 uploaded files but found only " + listCount); - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java deleted file mode 100644 index 212b48d8b1..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import com.microsoft.azure.storage.blob.ListBlobItem; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionMethod; -import org.apache.nifi.processors.azure.storage.utils.AzureBlobClientSideEncryptionUtils; -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; -import org.apache.nifi.util.MockFlowFile; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.nio.charset.StandardCharsets; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class ITPutAzureBlobStorage extends AbstractAzureBlobStorageIT { - - public static final String TEST_FILE_CONTENT = "0123456789"; - private static final String KEY_ID_VALUE = "key:id"; - private static final String KEY_64B_VALUE = "1234567890ABCDEF"; - private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE; - private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE; - private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE; - private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE; - private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE; - - - @Override - protected Class getProcessorClass() { - return PutAzureBlobStorage.class; - } - - @BeforeEach - public void setUp() { - runner.setProperty(PutAzureBlobStorage.BLOB, TEST_BLOB_NAME); - } - - @Test - public void testPutBlob() throws Exception { - runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); - runner.run(); - - assertResult(); - } - - @Test - public void testPutBlob64BSymmetricCSE() { - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name()); - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE); - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_64B_VALUE); - runner.assertNotValid(); - } - - @Test - public void testPutBlob128BSymmetricCSE() throws Exception { - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name()); - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE); - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_128B_VALUE); - runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); - runner.run(); - - assertResult(); - } - - @Test - public void testPutBlob192BSymmetricCSE() throws Exception { - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name()); - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE); - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_192B_VALUE); - runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); - runner.run(); - - assertResult(); - } - - @Test - public void testPutBlob256BSymmetricCSE() throws Exception { - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name()); - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE); - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_256B_VALUE); - runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); - runner.run(); - - assertResult(); - } - - @Test - public void testPutBlob384BSymmetricCSE() throws Exception { - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name()); - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE); - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_384B_VALUE); - runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); - runner.run(); - - assertResult(); - } - - @Test - public void testPutBlob512BSymmetricCSE() throws Exception { - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, AzureBlobClientSideEncryptionMethod.SYMMETRIC.name()); - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, KEY_ID_VALUE); - runner.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, KEY_512B_VALUE); - runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); - runner.run(); - - assertResult(); - } - - @Test - public void testPutBlobUsingCredentialsService() throws Exception { - configureCredentialsService(); - - runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); - runner.run(); - - assertResult(); - } - - @Test - public void testInvalidCredentialsRoutesToFailure() { - runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "invalid"); - runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "aW52YWxpZGludmFsaWQ="); - runner.assertValid(); - runner.enqueue(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); - runner.run(); - - runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1); - } - - private void assertResult() throws Exception { - runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1); - List flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS); - for (MockFlowFile flowFile : flowFilesForRelationship) { - flowFile.assertContentEquals(TEST_FILE_CONTENT.getBytes(StandardCharsets.UTF_8)); - flowFile.assertAttributeEquals("azure.length", "10"); - } - - Iterable blobs = container.listBlobs(TEST_BLOB_NAME); - assertTrue(blobs.iterator().hasNext()); - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java deleted file mode 100644 index c139b04460..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import java.io.IOException; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doThrow; - -public class TestPutAzureBlobStorage { - - @Test - public void testIOExceptionDuringUploadTransfersToFailure() throws Exception { - PutAzureBlobStorage processor = Mockito.spy(new PutAzureBlobStorage()); - doThrow(IOException.class).when(processor).uploadBlob(any(), any(), any(), any()); - - TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutAzureBlobStorage.BLOB, "test"); - runner.setProperty(AzureStorageUtils.CONTAINER, "test"); - runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "test"); - runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "test"); - - runner.enqueue("test data"); - runner.run(); - - runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1); - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java index f54cb08fae..68f2fcd76b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.azure.storage.queue; -import com.microsoft.azure.storage.StorageException; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; @@ -25,8 +24,6 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; -import java.net.URISyntaxException; -import java.security.InvalidKeyException; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; @@ -39,7 +36,7 @@ public class TestPutAzureQueueStorage { private final TestRunner runner = TestRunners.newTestRunner(PutAzureQueueStorage.class); @Test - public void testInvalidTTLAndVisibilityDelay() throws StorageException, URISyntaxException, InvalidKeyException { + public void testInvalidTTLAndVisibilityDelay() { runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage"); runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key"); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java deleted file mode 100644 index 5c338f226f..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureBlobClientSideEncryptionUtils.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage.utils; - -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processors.azure.storage.PutAzureBlobStorage; -import org.apache.nifi.util.MockProcessContext; -import org.apache.nifi.util.MockValidationContext; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.Collection; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class TestAzureBlobClientSideEncryptionUtils { - private static final String KEY_ID_VALUE = "key:id"; - private static final String KEY_64B_VALUE = "1234567890ABCDEF"; - private static final String KEY_128B_VALUE = KEY_64B_VALUE + KEY_64B_VALUE; - private static final String KEY_192B_VALUE = KEY_128B_VALUE + KEY_64B_VALUE; - private static final String KEY_256B_VALUE = KEY_128B_VALUE + KEY_128B_VALUE; - private static final String KEY_384B_VALUE = KEY_256B_VALUE + KEY_128B_VALUE; - private static final String KEY_512B_VALUE = KEY_256B_VALUE + KEY_256B_VALUE; - - private MockProcessContext processContext; - private MockValidationContext validationContext; - - @BeforeEach - public void setUp() { - Processor processor = new PutAzureBlobStorage(); - processContext = new MockProcessContext(processor); - validationContext = new MockValidationContext(processContext); - } - - @Test - public void testNoCesConfiguredOnProcessor() { - configureProcessorProperties("NONE", null,null); - - Collection result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext); - - assertValid(result); - } - - @Test - public void testSymmetricCesNoKeyIdOnProcessor() { - configureProcessorProperties("SYMMETRIC", null, KEY_128B_VALUE); - - Collection result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext); - - assertNotValid(result); - } - - @Test - public void testSymmetricCesNoKeyOnProcessor() { - configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,null); - - Collection result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext); - - assertNotValid(result); - } - - @Test - public void testSymmetricCesInvalidHexKeyOnProcessor() { - configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE,"ZZ"); - - Collection result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext); - - assertNotValid(result); - } - - @Test - public void testSymmetricCes64BitKeyOnProcessor() { - configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_64B_VALUE); - - Collection result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext); - - assertNotValid(result); - } - - @Test - public void testSymmetricCes128BitKeyOnProcessor() { - configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_128B_VALUE); - - Collection result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext); - - assertValid(result); - } - - @Test - public void testSymmetricCes192BitKeyOnProcessor() { - configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_192B_VALUE); - - Collection result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext); - - assertValid(result); - } - - @Test - public void testSymmetricCes256BitKeyOnProcessor() { - configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_256B_VALUE); - - Collection result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext); - - assertValid(result); - } - - @Test - public void testSymmetricCes384BitKeyOnProcessor() { - configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_384B_VALUE); - - Collection result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext); - - assertValid(result); - } - - @Test - public void testSymmetricCes512BitKeyOnProcessor() { - configureProcessorProperties("SYMMETRIC", KEY_ID_VALUE, KEY_512B_VALUE); - - Collection result = AzureBlobClientSideEncryptionUtils.validateClientSideEncryptionProperties(validationContext); - - assertValid(result); - } - - private void configureProcessorProperties(String keyType, String keyId, String symmetricKeyHex) { - if (keyType != null) { - processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_TYPE, keyType); - } - if (keyId != null) { - processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_KEY_ID, keyId); - } - if (symmetricKeyHex != null) { - processContext.setProperty(AzureBlobClientSideEncryptionUtils.CSE_SYMMETRIC_KEY_HEX, symmetricKeyHex); - } - } - - private void assertValid(Collection result) { - assertTrue(result.isEmpty(), "There should be no validation error"); - } - - private void assertNotValid(Collection result) { - assertFalse(result.isEmpty(), "There should be validation error"); - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java deleted file mode 100644 index 56b973a2a3..0000000000 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsGetStorageCredentialsDetails.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage.utils; - -import com.microsoft.azure.storage.StorageCredentialsAccountAndKey; -import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; -import com.microsoft.azure.storage.core.Base64; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage; -import org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService; -import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails; -import org.apache.nifi.util.MockConfigurationContext; -import org.apache.nifi.util.MockProcessContext; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class TestAzureStorageUtilsGetStorageCredentialsDetails { - - private static final String CREDENTIALS_SERVICE_VALUE = "CredentialsService"; - private static final String ACCOUNT_NAME_VALUE = "AccountName"; - private static final String ACCOUNT_KEY_VALUE = Base64.encode("AccountKey".getBytes()); - private static final String SAS_TOKEN_VALUE = "SasToken"; - - private MockProcessContext processContext; - - @BeforeEach - public void setUp() { - Processor processor = new ListAzureBlobStorage(); - processContext = new MockProcessContext(processor); - } - - @Test - public void testAccountNameAndAccountKeyConfiguredOnProcessor() { - configureProcessorProperties(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE, null); - - AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null); - - assertStorageCredentialsDetailsAccountNameAndAccountKey(storageCredentialsDetails); - } - - @Test - public void testAccountNameAndSasTokenConfiguredOnProcessor() { - configureProcessorProperties(ACCOUNT_NAME_VALUE, null, SAS_TOKEN_VALUE); - - AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null); - - assertStorageCredentialsDetailsAccountNameAndSasToken(storageCredentialsDetails); - } - - @Test - public void testAccountNameAndAccountKeyConfiguredOnControllerService() { - configureControllerService(ACCOUNT_NAME_VALUE, ACCOUNT_KEY_VALUE, null); - - AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null); - - assertStorageCredentialsDetailsAccountNameAndAccountKey(storageCredentialsDetails); - } - - @Test - public void testAccountNameAndSasTokenConfiguredOnControllerService() { - configureControllerService(ACCOUNT_NAME_VALUE, null, SAS_TOKEN_VALUE); - - AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(processContext, null); - - assertStorageCredentialsDetailsAccountNameAndSasToken(storageCredentialsDetails); - } - - @Test - public void testAccountNameMissingConfiguredOnProcessor() { - configureProcessorProperties(null, ACCOUNT_KEY_VALUE, null); - - assertThrows(IllegalArgumentException.class, () -> AzureStorageUtils.getStorageCredentialsDetails(processContext, null)); - } - - @Test - public void testAccountKeyAndSasTokenMissingConfiguredOnProcessor() { - configureProcessorProperties(ACCOUNT_NAME_VALUE, null, null); - - assertThrows(IllegalArgumentException.class, () -> AzureStorageUtils.getStorageCredentialsDetails(processContext, null)); - } - - @Test - public void testAccountNameMissingConfiguredOnControllerService() { - configureControllerService(null, ACCOUNT_KEY_VALUE, null); - - assertThrows(IllegalArgumentException.class, () -> AzureStorageUtils.getStorageCredentialsDetails(processContext, null)); - } - - @Test - public void testAccountKeyAndSasTokenMissingConfiguredOnControllerService() { - configureControllerService(ACCOUNT_NAME_VALUE, null, null); - - assertThrows(IllegalArgumentException.class, () -> AzureStorageUtils.getStorageCredentialsDetails(processContext, null)); - } - - private void configureProcessorProperties(String accountName, String accountKey, String sasToken) { - if (accountName != null) { - processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, accountName); - } - if (accountKey != null) { - processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, accountKey); - } - if (sasToken != null) { - processContext.setProperty(AzureStorageUtils.PROP_SAS_TOKEN, sasToken); - } - } - - private void configureControllerService(String accountName, String accountKey, String sasToken) { - AzureStorageCredentialsControllerService credentialsService = new AzureStorageCredentialsControllerService(); - - Map properties = new HashMap<>(); - if (accountName != null) { - properties.put(AzureStorageUtils.ACCOUNT_NAME, accountName); - } - if (accountKey != null) { - properties.put(AzureStorageUtils.ACCOUNT_KEY, accountKey); - } - if (sasToken != null) { - properties.put(AzureStorageUtils.PROP_SAS_TOKEN, sasToken); - } - - MockConfigurationContext configurationContext = new MockConfigurationContext(properties, null); - credentialsService.onEnabled(configurationContext); - - processContext.addControllerService(credentialsService, CREDENTIALS_SERVICE_VALUE); - processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, CREDENTIALS_SERVICE_VALUE); - } - - private void assertStorageCredentialsDetailsAccountNameAndAccountKey(AzureStorageCredentialsDetails storageCredentialsDetails) { - assertEquals(ACCOUNT_NAME_VALUE, storageCredentialsDetails.getStorageAccountName()); - assertTrue(storageCredentialsDetails.getStorageCredentials() instanceof StorageCredentialsAccountAndKey); - StorageCredentialsAccountAndKey storageCredentials = (StorageCredentialsAccountAndKey) storageCredentialsDetails.getStorageCredentials(); - assertEquals(ACCOUNT_NAME_VALUE, storageCredentials.getAccountName()); - assertEquals(ACCOUNT_KEY_VALUE, storageCredentials.exportBase64EncodedKey()); - } - - private void assertStorageCredentialsDetailsAccountNameAndSasToken(AzureStorageCredentialsDetails storageCredentialsDetails) { - assertEquals(ACCOUNT_NAME_VALUE, storageCredentialsDetails.getStorageAccountName()); - assertTrue(storageCredentialsDetails.getStorageCredentials() instanceof StorageCredentialsSharedAccessSignature); - StorageCredentialsSharedAccessSignature storageCredentials = (StorageCredentialsSharedAccessSignature) storageCredentialsDetails.getStorageCredentials(); - assertEquals(SAS_TOKEN_VALUE, storageCredentials.getToken()); - } -} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java index be956eb2c8..ac88c2fdeb 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/TestAzureStorageUtilsValidateCredentialProperties.java @@ -18,7 +18,7 @@ package org.apache.nifi.processors.azure.storage.utils; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.Processor; -import org.apache.nifi.processors.azure.storage.ListAzureBlobStorage; +import org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.MockValidationContext; import org.junit.jupiter.api.BeforeEach; @@ -41,7 +41,7 @@ public class TestAzureStorageUtilsValidateCredentialProperties { @BeforeEach public void setUp() { - Processor processor = new ListAzureBlobStorage(); + Processor processor = new GetAzureQueueStorage(); processContext = new MockProcessContext(processor); validationContext = new MockValidationContext(processContext); }