diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java new file mode 100644 index 0000000000..caab93648d --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.queue.CloudQueueClient; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public abstract class AbstractAzureQueueStorage extends AbstractProcessor { + + public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() + .name("storage-queue-name") + .displayName("Queue Name") + .description("Name of the Azure Storage Queue") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All successfully processed FlowFiles are routed to this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Unsuccessful operations will be transferred to the failure relationship.") + .build(); + + private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; + private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net"; + + private static final Set relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set getRelationships() { + return relationships; + } + + protected final CloudQueueClient createCloudQueueClient(final ProcessContext context, final FlowFile flowFile) { + final String storageAccountName; + final String storageAccountKey; + final String sasToken; + final String connectionString; + + if (flowFile == null) { + storageAccountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); + storageAccountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); + sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue(); + } else { + storageAccountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue(); + storageAccountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue(); + sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue(); + } + + CloudQueueClient cloudQueueClient; + try { + if (StringUtils.isNoneBlank(sasToken)) { + connectionString = String.format(FORMAT_QUEUE_BASE_URI, storageAccountName); + StorageCredentials storageCredentials = new StorageCredentialsSharedAccessSignature(sasToken); + cloudQueueClient = new CloudQueueClient(new URI(connectionString), storageCredentials); + } else { + connectionString = String.format(FORMAT_QUEUE_CONNECTION_STRING, storageAccountName, storageAccountKey); + CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString); + cloudQueueClient = storageAccount.createCloudQueueClient(); + } + } catch (IllegalArgumentException | URISyntaxException e) { + getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e); + throw new IllegalArgumentException(e); + } catch (InvalidKeyException e) { + getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e); + throw new IllegalArgumentException(e); + } + return cloudQueueClient; + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java new file mode 100644 index 0000000000..d6e510e312 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + + "to consume messages without deleting them, set 'Auto Delete Messages' to 'false'. Note: There might be chances of receiving duplicates in situations like " + + "when a message is received but was unable to be deleted from the queue due to some unexpected situations.") +@WritesAttributes({ + @WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), + @WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), + @WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), + @WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), + @WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + + public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() + .name("auto-delete-messages") + .displayName("Auto Delete Messages") + .description("Specifies whether the received message is to be automatically deleted from the queue.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("batch-size") + .displayName("Batch Size") + .description("The number of messages to be retrieved from the queue.") + .required(true) + .addValidator(StandardValidators.createLongValidator(1, 32, true)) + .defaultValue("32") + .build(); + + public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder() + .name("visibility-timeout") + .displayName("Visibility Timeout") + .description("The duration during which the retrieved message should be invisible to other consumers.") + .required(true) + .defaultValue("30 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + private static final List properties = Collections.unmodifiableList(Arrays.asList( + AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE, + BATCH_SIZE, VISIBILITY_TIMEOUT)); + + @Override + public List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean(); + final String queue = context.getProperty(QUEUE).evaluateAttributeExpressions().getValue().toLowerCase(); + + final Iterable retrievedMessagesIterable; + + CloudQueueClient cloudQueueClient; + CloudQueue cloudQueue; + + try { + cloudQueueClient = createCloudQueueClient(context, null); + cloudQueue = cloudQueueClient.getQueueReference(queue); + retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null); + } catch (URISyntaxException | StorageException e) { + getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e}); + context.yield(); + return; + } + + final List cloudQueueMessages = toList(retrievedMessagesIterable); + + for (final CloudQueueMessage message : cloudQueueMessages) { + FlowFile flowFile = session.create(); + + final Map attributes = new HashMap<>(); + + attributes.put("azure.queue.uri", cloudQueue.getUri().toString()); + attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString()); + attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString()); + attributes.put("azure.queue.messageId", message.getMessageId()); + attributes.put("azure.queue.popReceipt", message.getPopReceipt()); + + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, out -> { + try { + out.write(message.getMessageContentAsByte()); + } catch (StorageException e) { + getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e}); + context.yield(); + } + }); + + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString()); + } + + if(autoDelete) { + session.commit(); + + for (final CloudQueueMessage message : cloudQueueMessages) { + try { + cloudQueue.deleteMessage(message); + } catch (StorageException e) { + getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}", + new Object[] {message.getMessageId(), e}); + } + } + } + + } + + @Override + public Collection customValidate(final ValidationContext validationContext) { + final List problems = new ArrayList<>(super.customValidate(validationContext)); + final int visibilityTimeout = validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + + if (visibilityTimeout <= 0) { + problems.add(new ValidationResult.Builder() + .valid(false) + .subject(VISIBILITY_TIMEOUT.getDisplayName()) + .explanation(VISIBILITY_TIMEOUT.getDisplayName() + " should be greater than 0 secs") + .build()); + } + + return problems; + } + + private List toList(Iterable iterable) { + if (iterable instanceof List) { + return (List) iterable; + } + + final ArrayList list = new ArrayList<>(); + if (iterable != null) { + for(CloudQueueMessage message : iterable) { + list.add(message); + } + } + + return list; + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java new file mode 100644 index 0000000000..c289a746f1 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorage.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.io.ByteArrayOutputStream; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@SeeAlso({GetAzureQueueStorage.class}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" }) +@CapabilityDescription("Writes the content of the incoming FlowFiles to the configured Azure Queue Storage.") +public class PutAzureQueueStorage extends AbstractAzureQueueStorage { + + public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder() + .name("time-to-live") + .displayName("TTL") + .description("Maximum time to allow the message to be in the queue. If left empty, the default value of 7 days will be used.") + .required(false) + .defaultValue("7 days") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor VISIBILITY_DELAY = new PropertyDescriptor.Builder() + .name("visibility-delay") + .displayName("Visibility Delay") + .description("The length of time during which the message will be invisible, starting when it is added to the queue. " + + "This value must be greater than or equal to 0 and less than the TTL value.") + .required(false) + .defaultValue("0 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + private static final List properties = Collections.unmodifiableList(Arrays.asList( + AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, TTL, + QUEUE, VISIBILITY_DELAY)); + + @Override + public List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + session.exportTo(flowFile, baos); + final String flowFileContent = baos.toString(); + + CloudQueueMessage message = new CloudQueueMessage(flowFileContent); + CloudQueueClient cloudQueueClient; + CloudQueue cloudQueue; + + final int ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue(); + final int delay = context.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue(); + final String queue = context.getProperty(QUEUE).evaluateAttributeExpressions(flowFile).getValue().toLowerCase(); + + try { + cloudQueueClient = createCloudQueueClient(context, flowFile); + cloudQueue = cloudQueueClient.getQueueReference(queue); + cloudQueue.addMessage(message, ttl, delay, null, null); + } catch (URISyntaxException | StorageException e) { + getLogger().error("Failed to write the message to Azure Queue Storage due to {}", new Object[]{e}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + session.transfer(flowFile, REL_SUCCESS); + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, cloudQueue.getUri().toString(), transmissionMillis); + } + + @Override + public Collection customValidate(final ValidationContext validationContext) { + final List problems = new ArrayList<>(super.customValidate(validationContext)); + + final boolean ttlSet = validationContext.getProperty(TTL).isSet(); + final boolean delaySet = validationContext.getProperty(VISIBILITY_DELAY).isSet(); + + final int ttl = validationContext.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue(); + + if (ttlSet) { + final int SEVEN_DAYS_TIMEPERIOD_IN_SECS = 604800; // i.e. 7 * 24 * 60 * 60 + + if (ttl > SEVEN_DAYS_TIMEPERIOD_IN_SECS) { + problems.add(new ValidationResult.Builder() + .subject(TTL.getDisplayName()) + .valid(false) + .explanation(TTL.getDisplayName() + " exceeds the allowed limit of 7 days. Set a value less than 7 days") + .build()); + } + } + + if (delaySet) { + int delay = validationContext.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue(); + + if (delay > ttl || delay < 0) { + problems.add(new ValidationResult.Builder() + .subject(VISIBILITY_DELAY.getDisplayName()) + .valid(false) + .explanation(VISIBILITY_DELAY.getDisplayName() + " should be greater than or equal to 0 and less than " + TTL.getDisplayName()) + .build()); + } + } + + return problems; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index fa34294243..61b0df3bb5 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -18,4 +18,6 @@ org.apache.nifi.processors.azure.eventhub.ConsumeAzureEventHub org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage org.apache.nifi.processors.azure.storage.ListAzureBlobStorage org.apache.nifi.processors.azure.storage.PutAzureBlobStorage -org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage \ No newline at end of file +org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage +org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage +org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java index 4396c709a1..6d3a692508 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java @@ -23,9 +23,12 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URISyntaxException; import java.security.InvalidKeyException; +import java.util.Iterator; import java.util.Properties; -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; +import com.microsoft.azure.storage.queue.CloudQueueMessage; import org.apache.nifi.util.file.FileUtils; import com.microsoft.azure.storage.CloudStorageAccount; @@ -34,11 +37,17 @@ import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; public class AzureTestUtil { - private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES"; - static final String TEST_CONTAINER_NAME_PREFIX = "nifitest"; private static final Properties CONFIG; - static final String TEST_BLOB_NAME = "testing"; + + private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES"; + private static final String FORMAT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; + + public static final String TEST_BLOB_NAME = "testing"; + public static final String TEST_STORAGE_QUEUE = "testqueue"; + public static final String TEST_CONTAINER_NAME_PREFIX = "nifitest"; + + public static CloudQueue cloudQueue; static { final FileInputStream fis; @@ -67,10 +76,30 @@ public class AzureTestUtil { } public static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException { - String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, getAccountName(), getAccountKey()); - CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); - CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); + CloudBlobClient blobClient = getStorageAccount().createCloudBlobClient(); return blobClient.getContainerReference(containerName); } + public static CloudQueue getQueue(String queueName) throws URISyntaxException, InvalidKeyException, StorageException { + CloudQueueClient cloudQueueClient = getStorageAccount().createCloudQueueClient(); + cloudQueue = cloudQueueClient.getQueueReference(queueName); + return cloudQueue; + } + + private static CloudStorageAccount getStorageAccount() throws URISyntaxException, InvalidKeyException { + String storageConnectionString = String.format(FORMAT_CONNECTION_STRING, getAccountName(), getAccountKey()); + return CloudStorageAccount.parse(storageConnectionString); + } + + public static int getQueueCount() throws StorageException { + Iterator retrievedMessages = cloudQueue.retrieveMessages(10, 1, null, null).iterator(); + int count = 0; + + while (retrievedMessages.hasNext()) { + retrievedMessages.next(); + count++; + } + + return count; + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java new file mode 100644 index 0000000000..1711bbd596 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorageIT.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.processors.azure.storage.AzureTestUtil; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.List; + +public class GetAzureQueueStorageIT { + + private final TestRunner runner = TestRunners.newTestRunner(GetAzureQueueStorage.class); + private static CloudQueue cloudQueue; + + @BeforeClass + public static void setup() throws InvalidKeyException, StorageException, URISyntaxException { + cloudQueue = AzureTestUtil.getQueue(AzureTestUtil.TEST_STORAGE_QUEUE); + cloudQueue.createIfNotExists(); + } + + @Test + public void testGetWithAutoDeleteFalse() throws StorageException, InterruptedException { + cloudQueue.clear(); + insertDummyMessages(); + + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(GetAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE); + runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10"); + runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false"); + runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs"); + + runner.run(1); + + final List mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS); + Assert.assertFalse(mockFlowFiles.isEmpty()); + + Thread.sleep(1500); + cloudQueue.downloadAttributes(); + Assert.assertEquals(3, cloudQueue.getApproximateMessageCount()); + } + + @Test + public void testGetWithELAndAutoDeleteTrue() throws StorageException, InterruptedException { + cloudQueue.clear(); + insertDummyMessages(); + + runner.setValidateExpressionUsage(true); + + runner.setVariable("account.name", AzureTestUtil.getAccountName()); + runner.setVariable("account.key", AzureTestUtil.getAccountKey()); + runner.setVariable("queue.name", AzureTestUtil.TEST_STORAGE_QUEUE); + + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}"); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}"); + runner.setProperty(GetAzureQueueStorage.QUEUE, "${queue.name}"); + runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10"); + runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "true"); + runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs"); + + runner.run(1); + + final List mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS); + Assert.assertFalse(mockFlowFiles.isEmpty()); + + Thread.sleep(1500); + cloudQueue.downloadAttributes(); + Assert.assertEquals(0, cloudQueue.getApproximateMessageCount()); + } + + @Test + public void testGetWithVisibilityTimeout() throws StorageException, InterruptedException { + cloudQueue.clear(); + insertDummyMessages(); + + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(GetAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE); + runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10"); + runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false"); + runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs"); + + runner.run(1); + + final List mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS); + Assert.assertFalse(mockFlowFiles.isEmpty()); + Assert.assertEquals(0, AzureTestUtil.getQueueCount()); + + Thread.sleep(1500); + Assert.assertEquals(3, AzureTestUtil.getQueueCount()); + } + + @Test + public void testGetWithBatchSize() throws StorageException { + cloudQueue.clear(); + insertDummyMessages(); + + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(GetAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE); + runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "2"); + runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "true"); + runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs"); + + runner.run(1); + runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 2); + + runner.run(1); + runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3); + + } + + private static void insertDummyMessages() throws StorageException { + cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 1"), 604800, 0, null, null); + cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 2"), 604800, 0, null, null); + cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 3"), 604800, 0, null, null); + } + + @AfterClass + public static void cleanup() throws StorageException { + cloudQueue.deleteIfExists(); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java new file mode 100644 index 0000000000..e02f16dab7 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/PutAzureQueueStorageIT.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import org.apache.nifi.processors.azure.storage.AzureTestUtil; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class PutAzureQueueStorageIT { + + private final TestRunner runner = TestRunners.newTestRunner(PutAzureQueueStorage.class); + private static CloudQueue cloudQueue; + + @BeforeClass + public static void setup() throws InvalidKeyException, StorageException, URISyntaxException { + cloudQueue = AzureTestUtil.getQueue(AzureTestUtil.TEST_STORAGE_QUEUE); + cloudQueue.createIfNotExists(); + } + + @Test + public void testSimplePut() throws InvalidKeyException, StorageException, URISyntaxException { + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(PutAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE); + + runner.enqueue("Dummy message"); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1); + } + + @Test + public void testSimplePutWithEL() throws StorageException, URISyntaxException, InvalidKeyException { + runner.setValidateExpressionUsage(true); + + runner.setVariable("account.name", AzureTestUtil.getAccountName()); + runner.setVariable("account.key", AzureTestUtil.getAccountKey()); + runner.setVariable("queue.name", AzureTestUtil.TEST_STORAGE_QUEUE); + + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}"); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}"); + runner.setProperty(PutAzureQueueStorage.QUEUE, "${queue.name}"); + + runner.enqueue("Dummy message"); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1); + } + + @Test + public void testPutWithTTL() throws StorageException, InterruptedException { + cloudQueue.clear(); + + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(PutAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE); + runner.setProperty(PutAzureQueueStorage.TTL, "2 secs"); + + runner.enqueue("Dummy message"); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1); + Assert.assertEquals(1, AzureTestUtil.getQueueCount()); + + Thread.sleep(2400); + Assert.assertEquals(0, AzureTestUtil.getQueueCount()); + } + + @Test + public void testPutWithVisibilityDelay() throws StorageException, InterruptedException { + cloudQueue.clear(); + + cloudQueue.clear(); + + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName()); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); + runner.setProperty(PutAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE); + runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "2 secs"); + + runner.enqueue("Dummy message"); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1); + Assert.assertEquals(0, AzureTestUtil.getQueueCount()); + + Thread.sleep(2400); + Assert.assertEquals(1, AzureTestUtil.getQueueCount()); + } + + @AfterClass + public static void cleanup() throws StorageException { + cloudQueue.deleteIfExists(); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage.java new file mode 100644 index 0000000000..aba779cce3 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestGetAzureQueueStorage.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; + +public class TestGetAzureQueueStorage { + + private final TestRunner runner = TestRunners.newTestRunner(GetAzureQueueStorage.class); + + @Test + public void testValidVisibilityTimeout() { + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage"); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key"); + runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue"); + runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10"); + runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "10 secs"); + + ProcessContext processContext = runner.getProcessContext(); + Collection results = new HashSet<>(); + if (processContext instanceof MockProcessContext) { + results = ((MockProcessContext) processContext).validate(); + } + + Assert.assertEquals(0, results.size()); + } + + @Test + public void testInvalidVisibilityTimeout() { + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage"); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key"); + runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue"); + runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10"); + runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "0 secs"); + + ProcessContext processContext = runner.getProcessContext(); + Collection results = new HashSet<>(); + if (processContext instanceof MockProcessContext) { + results = ((MockProcessContext) processContext).validate(); + } + + Assert.assertEquals(1, results.size()); + Iterator iterator = results.iterator(); + Assert.assertTrue(iterator.next().getExplanation().contains("should be greater than 0 secs")); + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java new file mode 100644 index 0000000000..d1e552e805 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/queue/TestPutAzureQueueStorage.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; + +public class TestPutAzureQueueStorage { + + private final TestRunner runner = TestRunners.newTestRunner(PutAzureQueueStorage.class); + + @Test + public void testInvalidTTLAndVisibilityDelay() throws StorageException, URISyntaxException, InvalidKeyException { + + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage"); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key"); + runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue"); + runner.setProperty(PutAzureQueueStorage.TTL, "8 days"); + runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "9 days"); + + ProcessContext processContext = runner.getProcessContext(); + Collection results = new HashSet<>(); + if (processContext instanceof MockProcessContext) { + results = ((MockProcessContext) processContext).validate(); + } + + Assert.assertEquals(2, results.size()); + + Iterator iterator = results.iterator(); + Assert.assertTrue(iterator.next().toString().contains(PutAzureQueueStorage.TTL.getDisplayName() + " exceeds the allowed limit of 7 days. Set a value less than 7 days")); + Assert.assertTrue(iterator.next().toString().contains(PutAzureQueueStorage.VISIBILITY_DELAY.getDisplayName() + " should be greater than or equal to 0 and less than")); + } + + @Test + public void testAllValidProperties() { + runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage"); + runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key"); + runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue"); + runner.setProperty(PutAzureQueueStorage.TTL, "6 days"); + runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "5 days"); + + ProcessContext processContext = runner.getProcessContext(); + Collection results = new HashSet<>(); + if (processContext instanceof MockProcessContext) { + results = ((MockProcessContext) processContext).validate(); + } + + Assert.assertEquals(0, results.size()); + } + +}