diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 87888cac8c..c42c0c0e23 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -92,6 +92,10 @@ com.azure azure-security-keyvault-keys + + com.azure + azure-storage-queue + com.microsoft.azure diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java new file mode 100644 index 0000000000..ce1ad8f537 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.azure.core.credential.AzureSasCredential; +import com.azure.core.credential.TokenCredential; +import com.azure.core.http.HttpClient; +import com.azure.core.http.ProxyOptions; +import com.azure.core.http.netty.NettyAsyncHttpClientBuilder; +import com.azure.identity.ClientSecretCredentialBuilder; +import com.azure.identity.ManagedIdentityCredentialBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; +import com.azure.storage.queue.QueueClient; +import com.azure.storage.queue.QueueClientBuilder; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.AzureServiceEndpoints; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12; +import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractAzureQueueStorage_v12 extends AbstractProcessor { + public static final PropertyDescriptor QUEUE_NAME = new PropertyDescriptor.Builder() + .name("Queue Name") + .displayName("Queue Name") + .description("Name of the Azure Storage Queue") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor ENDPOINT_SUFFIX = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AzureStorageUtils.ENDPOINT_SUFFIX) + .displayName("Endpoint Suffix") + .description("Storage accounts in public Azure always use a common FQDN suffix. " + + "Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions).") + .required(true) + .defaultValue(AzureServiceEndpoints.DEFAULT_QUEUE_ENDPOINT_SUFFIX) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + public static final PropertyDescriptor STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("Credentials Service") + .displayName("Credentials Service") + .description("Controller Service used to obtain Azure Storage Credentials.") + .identifiesControllerService(AzureStorageCredentialsService_v12.class) + .required(true) + .build(); + + public static final PropertyDescriptor REQUEST_TIMEOUT = new PropertyDescriptor.Builder() + .name("Request Timeout") + .displayName("Request Timeout") + .description("The timeout for read or write requests to Azure Queue Storage. " + + "Defaults to 1 second.") + .required(true) + .defaultValue("10 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All successfully processed FlowFiles are routed to this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Unsuccessful operations will be transferred to the failure relationship.") + .build(); + + private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + static final String URI_ATTRIBUTE = "azure.queue.uri"; + static final String INSERTION_TIME_ATTRIBUTE = "azure.queue.insertionTime"; + static final String EXPIRATION_TIME_ATTRIBUTE = "azure.queue.expirationTime"; + static final String MESSAGE_ID_ATTRIBUTE = "azure.queue.messageId"; + static final String POP_RECEIPT_ATTRIBUTE = "azure.queue.popReceipt"; + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List results = new ArrayList<>(); + final int requestTimeout = validationContext.getProperty(REQUEST_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + + if (requestTimeout <= 0 || requestTimeout > 30) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(REQUEST_TIMEOUT.getDisplayName()) + .explanation(REQUEST_TIMEOUT.getDisplayName() + " should be greater than 0 secs " + + "and less than or equal to 30 secs") + .build()); + } + + AzureStorageUtils.validateProxySpec(validationContext, results); + + return results; + } + + protected final QueueClient createQueueClient(final ProcessContext context, final FlowFile flowFile) { + final QueueClientBuilder clientBuilder = new QueueClientBuilder(); + + final AzureStorageCredentialsService_v12 storageCredentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class); + final Map attributes = flowFile == null ? Collections.emptyMap() : flowFile.getAttributes(); + final AzureStorageCredentialsDetails_v12 storageCredentialsDetails = storageCredentialsService.getCredentialsDetails(attributes); + processCredentials(clientBuilder, storageCredentialsDetails); + processProxyOptions(clientBuilder, context); + + final String endpointSuffix = context.getProperty(ENDPOINT_SUFFIX).getValue(); + clientBuilder.endpoint(String.format("https://%s.%s", storageCredentialsDetails.getAccountName(), endpointSuffix)); + + final String queueName = context.getProperty(QUEUE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + clientBuilder.queueName(queueName); + return clientBuilder.buildClient(); + } + + private void processCredentials(final QueueClientBuilder clientBuilder, final AzureStorageCredentialsDetails_v12 storageCredentialsDetails) { + switch (storageCredentialsDetails.getCredentialsType()) { + case ACCOUNT_KEY: + clientBuilder.credential(new StorageSharedKeyCredential(storageCredentialsDetails.getAccountName(), storageCredentialsDetails.getAccountKey())); + break; + case SAS_TOKEN: + clientBuilder.credential(new AzureSasCredential(storageCredentialsDetails.getSasToken())); + break; + case MANAGED_IDENTITY: + clientBuilder.credential(new ManagedIdentityCredentialBuilder() + .clientId(storageCredentialsDetails.getManagedIdentityClientId()) + .build()); + break; + case SERVICE_PRINCIPAL: + clientBuilder.credential(new ClientSecretCredentialBuilder() + .tenantId(storageCredentialsDetails.getServicePrincipalTenantId()) + .clientId(storageCredentialsDetails.getServicePrincipalClientId()) + .clientSecret(storageCredentialsDetails.getServicePrincipalClientSecret()) + .build()); + break; + case ACCESS_TOKEN: + TokenCredential credential = tokenRequestContext -> Mono.just(storageCredentialsDetails.getAccessToken()); + clientBuilder.credential(credential); + break; + default: + throw new IllegalArgumentException("Unhandled credentials type: " + storageCredentialsDetails.getCredentialsType()); + } + } + + private void processProxyOptions(final QueueClientBuilder clientBuilder, + final PropertyContext propertyContext) { + final ProxyOptions proxyOptions = AzureStorageUtils.getProxyOptions(propertyContext); + final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder(); + nettyClientBuilder.proxy(proxyOptions); + + final HttpClient nettyClient = nettyClientBuilder.build(); + clientBuilder.httpClient(nettyClient); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java index dc5a138574..6e29905b5d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java @@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.DeprecationNotice; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -63,6 +64,7 @@ import java.util.concurrent.TimeUnit; @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), }) +@DeprecationNotice(alternatives = GetAzureQueueStorage_v12.class) public class GetAzureQueueStorage extends AbstractAzureQueueStorage { public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage_v12.java new file mode 100644 index 0000000000..28ba1e1b4e --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage_v12.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.azure.core.util.Context; +import com.azure.storage.queue.QueueClient; +import com.azure.storage.queue.models.QueueMessageItem; +import com.azure.storage.queue.models.QueueStorageException; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxySpec; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.EXPIRATION_TIME_ATTRIBUTE; +import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.INSERTION_TIME_ATTRIBUTE; +import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.MESSAGE_ID_ATTRIBUTE; +import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.POP_RECEIPT_ATTRIBUTE; +import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.URI_ATTRIBUTE; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + + "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'. Note: There might be chances of receiving duplicates in situations like " + + "when a message is received but was unable to be deleted from the queue due to some unexpected situations.") +@WritesAttributes({ + @WritesAttribute(attribute = URI_ATTRIBUTE, description = "The absolute URI of the configured Azure Queue Storage"), + @WritesAttribute(attribute = INSERTION_TIME_ATTRIBUTE, description = "The time when the message was inserted into the queue storage"), + @WritesAttribute(attribute = EXPIRATION_TIME_ATTRIBUTE, description = "The time when the message will expire from the queue storage"), + @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = "The ID of the retrieved message"), + @WritesAttribute(attribute = POP_RECEIPT_ATTRIBUTE, description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage_v12 extends AbstractAzureQueueStorage_v12 { + public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() + .name("Auto Delete Messages") + .displayName("Auto Delete Messages") + .description("Specifies whether the received message is to be automatically deleted from the queue.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final PropertyDescriptor MESSAGE_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Message Batch Size") + .displayName("Message Batch Size") + .description("The number of messages to be retrieved from the queue.") + .required(true) + .addValidator(StandardValidators.createLongValidator(1, 32, true)) + .defaultValue("32") + .build(); + + public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder() + .name("Visibility Timeout") + .displayName("Visibility Timeout") + .description("The duration during which the retrieved message should be invisible to other consumers.") + .required(true) + .defaultValue("30 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.SOCKS}; + private static final List PROPERTIES = Collections.unmodifiableList( + Arrays.asList( + QUEUE_NAME, + ENDPOINT_SUFFIX, + STORAGE_CREDENTIALS_SERVICE, + AUTO_DELETE, + MESSAGE_BATCH_SIZE, + VISIBILITY_TIMEOUT, + REQUEST_TIMEOUT, + ProxyConfiguration.createProxyConfigPropertyDescriptor(false, PROXY_SPECS) + ) + ); + + private static final Set RELATIONSHIPS = Collections.singleton(REL_SUCCESS); + + // 7 days is the maximum timeout as per https://learn.microsoft.com/en-us/rest/api/storageservices/get-messages + private static final Duration MAX_VISIBILITY_TIMEOUT = Duration.ofDays(7); + + @Override + public List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final List results = (List) super.customValidate(validationContext); + + final Duration visibilityTimeout = Duration.ofSeconds( + validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS) + ); + + if (visibilityTimeout.getSeconds() <= 0) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(VISIBILITY_TIMEOUT.getDisplayName()) + .explanation(VISIBILITY_TIMEOUT.getDisplayName() + " should be greater than 0 secs") + .build()); + } + + if (MAX_VISIBILITY_TIMEOUT.compareTo(visibilityTimeout) < 0) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(VISIBILITY_TIMEOUT.getDisplayName()) + .explanation(VISIBILITY_TIMEOUT.getDisplayName() + " should not be greater than 7 days") + .build()); + } + + return results; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(MESSAGE_BATCH_SIZE).asInteger(); + final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final int requestTimeoutInSecs = context.getProperty(REQUEST_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean(); + + final QueueClient queueClient = createQueueClient(context, null); + final Iterable retrievedMessagesIterable; + try { + retrievedMessagesIterable = queueClient.receiveMessages( + batchSize, + Duration.ofSeconds(visibilityTimeoutInSecs), + Duration.ofSeconds(requestTimeoutInSecs), + Context.NONE); + } catch (final QueueStorageException e) { + getLogger().error("Failed to retrieve messages from Azure Storage Queue", e); + context.yield(); + return; + } + + final Map messagesToDelete = new LinkedHashMap<>(); + + for (final QueueMessageItem message : retrievedMessagesIterable) { + FlowFile flowFile = session.create(); + + final Map attributes = new LinkedHashMap<>(); + attributes.put("azure.queue.uri", queueClient.getQueueUrl()); + attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString()); + attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString()); + attributes.put("azure.queue.messageId", message.getMessageId()); + attributes.put("azure.queue.popReceipt", message.getPopReceipt()); + + if (autoDelete) { + messagesToDelete.put(message.getMessageId(), message.getPopReceipt()); + } + + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, out -> out.write(message.getBody().toString().getBytes())); + + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().receive(flowFile, queueClient.getQueueUrl().toString()); + } + + if (autoDelete) { + session.commitAsync(() -> { + for (final Map.Entry entry : messagesToDelete.entrySet()) { + final String messageId = entry.getKey(); + final String popReceipt = entry.getValue(); + queueClient.deleteMessage(messageId, popReceipt); + } + }); + } + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java index d1e0c35f8c..10da98016f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java @@ -33,6 +33,7 @@ import com.microsoft.azure.storage.queue.CloudQueueMessage; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.DeprecationNotice; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; @@ -49,6 +50,7 @@ import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" }) @CapabilityDescription("Writes the content of the incoming FlowFiles to the configured Azure Queue Storage.") +@DeprecationNotice(alternatives = PutAzureQueueStorage_v12.class) public class PutAzureQueueStorage extends AbstractAzureQueueStorage { public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage_v12.java new file mode 100644 index 0000000000..e6389504c5 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage_v12.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.azure.core.util.Context; +import com.azure.storage.queue.QueueClient; +import com.azure.storage.queue.models.QueueStorageException; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxySpec; + +import java.io.ByteArrayOutputStream; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@SeeAlso({GetAzureQueueStorage_v12.class}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" }) +@CapabilityDescription("Writes the content of the incoming FlowFiles to the configured Azure Queue Storage.") +public class PutAzureQueueStorage_v12 extends AbstractAzureQueueStorage_v12 { + public static final PropertyDescriptor MESSAGE_TIME_TO_LIVE = new PropertyDescriptor.Builder() + .name("Message Time To Live") + .displayName("Message Time To Live") + .description("Maximum time to allow the message to be in the queue") + .required(true) + .defaultValue("7 days") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder() + .name("Visibility Timeout") + .displayName("Visibility Timeout") + .description("The length of time during which the message will be invisible after it is read. " + + "If the processing unit fails to delete the message after it is read, then the message will reappear in the queue.") + .required(true) + .defaultValue("30 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.SOCKS}; + private static final List PROPERTIES = Collections.unmodifiableList( + Arrays.asList( + QUEUE_NAME, + ENDPOINT_SUFFIX, + STORAGE_CREDENTIALS_SERVICE, + MESSAGE_TIME_TO_LIVE, + VISIBILITY_TIMEOUT, + REQUEST_TIMEOUT, + ProxyConfiguration.createProxyConfigPropertyDescriptor(false, PROXY_SPECS) + ) + ); + + // 7 days is the maximum timeout as per https://learn.microsoft.com/en-us/rest/api/storageservices/get-messages + private static final Duration MAX_VISIBILITY_TIMEOUT = Duration.ofDays(7); + + @Override + public List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final List results = (List) super.customValidate(validationContext); + final Duration visibilityTimeout = Duration.ofSeconds( + validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS) + ); + + if (visibilityTimeout.getSeconds() <= 0) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(VISIBILITY_TIMEOUT.getDisplayName()) + .explanation(VISIBILITY_TIMEOUT.getDisplayName() + " should be greater than 0 secs") + .build()); + } + + if (MAX_VISIBILITY_TIMEOUT.compareTo(visibilityTimeout) < 0) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(VISIBILITY_TIMEOUT.getDisplayName()) + .explanation(VISIBILITY_TIMEOUT.getDisplayName() + " should not be greater than 7 days") + .build()); + } + + final int ttl = validationContext.getProperty(MESSAGE_TIME_TO_LIVE).asTimePeriod(TimeUnit.SECONDS).intValue(); + if (ttl <= 0) { + results.add(new ValidationResult.Builder() + .subject(MESSAGE_TIME_TO_LIVE.getDisplayName()) + .valid(false) + .explanation(MESSAGE_TIME_TO_LIVE.getDisplayName() + " should be any positive number") + .build()); + } + + return results; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + session.exportTo(flowFile, baos); + final String flowFileContent = baos.toString(); + + final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final int ttl = context.getProperty(MESSAGE_TIME_TO_LIVE).asTimePeriod(TimeUnit.SECONDS).intValue(); + final int requestTimeoutInSecs = context.getProperty(REQUEST_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + + final QueueClient queueClient = createQueueClient(context, flowFile); + try { + queueClient.sendMessageWithResponse( + flowFileContent, + Duration.ofSeconds(visibilityTimeoutInSecs), + Duration.ofSeconds(ttl), + Duration.ofSeconds(requestTimeoutInSecs), + Context.NONE + ); + } catch (final QueueStorageException e) { + getLogger().error("Failed to write message to Azure Queue Storage", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + session.transfer(flowFile, REL_SUCCESS); + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, queueClient.getQueueUrl().toString(), transmissionMillis); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java index 696f86d27f..ff63b1538f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java @@ -32,12 +32,12 @@ import java.util.List; import java.util.Map; /** - * Provides credentials details for Azure Blob processors + * Provides credentials details for Azure Storage processors * * @see AbstractControllerService */ -@Tags({"azure", "microsoft", "cloud", "storage", "blob", "credentials"}) -@CapabilityDescription("Provides credentials for Azure Blob processors using Azure Blob Storage client library v12.") +@Tags({"azure", "microsoft", "cloud", "storage", "blob", "credentials", "queue"}) +@CapabilityDescription("Provides credentials for Azure Storage processors using Azure Storage client library v12.") public class AzureStorageCredentialsControllerService_v12 extends AbstractControllerService implements AzureStorageCredentialsService_v12 { public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index f9e86a4add..62c618c7c0 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -27,3 +27,5 @@ org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12 org.apache.nifi.processors.azure.storage.PutAzureBlobStorage_v12 org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage_v12 org.apache.nifi.processors.azure.storage.MoveAzureDataLakeStorage +org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage_v12 +org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage_v12 diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/AbstractTestAzureQueueStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/AbstractTestAzureQueueStorage_v12.java new file mode 100644 index 0000000000..6d5d663d9c --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/AbstractTestAzureQueueStorage_v12.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12; +import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12; +import org.apache.nifi.services.azure.storage.AzureStorageCredentialsType; +import org.apache.nifi.util.TestRunner; + +public abstract class AbstractTestAzureQueueStorage_v12 { + public static final String CREDENTIALS_SERVICE_IDENTIFIER = "credentials-service"; + protected TestRunner runner; + protected AzureStorageCredentialsService_v12 credentialsService = new AzureStorageCredentialsControllerService_v12(); + + protected void setupStorageCredentialsService() throws InitializationException { + runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER, credentialsService); + runner.setProperty(credentialsService, AzureStorageCredentialsControllerService_v12.ACCOUNT_NAME, "account-name"); + runner.setProperty(credentialsService, AzureStorageCredentialsControllerService_v12.CREDENTIALS_TYPE, AzureStorageCredentialsType.ACCOUNT_KEY.getAllowableValue()); + runner.setProperty(credentialsService, AzureStorageCredentialsControllerService_v12.ACCOUNT_KEY, "account-key"); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage_v12.java new file mode 100644 index 0000000000..3f91929426 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage_v12.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestGetAzureQueueStorage_v12 extends AbstractTestAzureQueueStorage_v12 { + @BeforeEach + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(GetAzureQueueStorage_v12.class); + setupStorageCredentialsService(); + runner.enableControllerService(credentialsService); + runner.setProperty(GetAzureQueueStorage_v12.STORAGE_CREDENTIALS_SERVICE, CREDENTIALS_SERVICE_IDENTIFIER); + runner.setProperty(GetAzureQueueStorage_v12.QUEUE_NAME, "queue"); + runner.setProperty(GetAzureQueueStorage_v12.MESSAGE_BATCH_SIZE, "10"); + } + + @Test + public void testValidVisibilityTimeout() { + runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 secs"); + + runner.assertValid(); + } + + @Test + public void testInvalidVisibilityTimeoutZeroSecs() { + runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "0 secs"); + + runner.assertNotValid(); + } + + @Test + public void testInvalidVisibilityTimeoutMoreThanSevenDays() { + runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "8 days"); + + runner.assertNotValid(); + } + + @Test + public void testValidRequestTimeout() { + runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 secs"); + runner.setProperty(GetAzureQueueStorage_v12.REQUEST_TIMEOUT, "15 secs"); + + runner.assertValid(); + } + + @Test + public void testInvalidRequestTimeoutZeroSecs() { + runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 secs"); + runner.setProperty(GetAzureQueueStorage_v12.REQUEST_TIMEOUT, "0 secs"); + + runner.assertNotValid(); + } + + @Test + public void testInvalidRequestTimeoutMoreThanThirtySecs() { + runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 secs"); + runner.setProperty(GetAzureQueueStorage_v12.REQUEST_TIMEOUT, "31 secs"); + + runner.assertNotValid(); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage_v12.java new file mode 100644 index 0000000000..ce0afbeddf --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage_v12.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestPutAzureQueueStorage_v12 extends AbstractTestAzureQueueStorage_v12 { + @BeforeEach + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(PutAzureQueueStorage_v12.class); + setupStorageCredentialsService(); + runner.enableControllerService(credentialsService); + runner.setProperty(PutAzureQueueStorage_v12.STORAGE_CREDENTIALS_SERVICE, CREDENTIALS_SERVICE_IDENTIFIER); + runner.setProperty(PutAzureQueueStorage_v12.QUEUE_NAME, "queue"); + } + + @Test + public void testValidVisibilityTimeout() { + runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days"); + runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 secs"); + + runner.assertValid(); + } + + @Test + public void testInvalidVisibilityTimeoutZeroSecs() { + runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days"); + runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "0 secs"); + + runner.assertNotValid(); + } + + @Test + public void testInvalidVisibilityTimeoutMoreThanSevenDays() { + runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days"); + runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "8 days"); + + runner.assertNotValid(); + } + + @Test + public void testValidTTL() { + runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days"); + runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 secs"); + + runner.assertValid(); + } + + @Test + public void testInvalidTTLZeroSecs() { + runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "0 secs"); + runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 secs"); + + runner.assertNotValid(); + } + + @Test + public void testValidRequestTimeout() { + runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days"); + runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 secs"); + runner.setProperty(PutAzureQueueStorage_v12.REQUEST_TIMEOUT, "15 secs"); + + runner.assertValid(); + } + + @Test + public void testInvalidRequestTimeoutZeroSecs() { + runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days"); + runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 secs"); + runner.setProperty(PutAzureQueueStorage_v12.REQUEST_TIMEOUT, "0 secs"); + + runner.assertNotValid(); + } + + @Test + public void testInvalidRequestTimeoutMoreThanThirtySecs() { + runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days"); + runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 secs"); + runner.setProperty(PutAzureQueueStorage_v12.REQUEST_TIMEOUT, "31 secs"); + + runner.assertNotValid(); + } +}