NIFI-5015: Implemented Azure Queue Storage processors

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2611.
This commit is contained in:
zenfenan 2018-04-02 07:48:37 +05:30 committed by Pierre Villard
parent c118e96238
commit 72f8999b15
9 changed files with 924 additions and 8 deletions

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
.name("storage-queue-name")
.displayName("Queue Name")
.description("Name of the Azure Storage Queue")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All successfully processed FlowFiles are routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Unsuccessful operations will be transferred to the failure relationship.")
.build();
private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net";
private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
protected final CloudQueueClient createCloudQueueClient(final ProcessContext context, final FlowFile flowFile) {
final String storageAccountName;
final String storageAccountKey;
final String sasToken;
final String connectionString;
if (flowFile == null) {
storageAccountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
storageAccountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
} else {
storageAccountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
storageAccountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
}
CloudQueueClient cloudQueueClient;
try {
if (StringUtils.isNoneBlank(sasToken)) {
connectionString = String.format(FORMAT_QUEUE_BASE_URI, storageAccountName);
StorageCredentials storageCredentials = new StorageCredentialsSharedAccessSignature(sasToken);
cloudQueueClient = new CloudQueueClient(new URI(connectionString), storageCredentials);
} else {
connectionString = String.format(FORMAT_QUEUE_CONNECTION_STRING, storageAccountName, storageAccountKey);
CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
cloudQueueClient = storageAccount.createCloudQueueClient();
}
} catch (IllegalArgumentException | URISyntaxException e) {
getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e);
throw new IllegalArgumentException(e);
} catch (InvalidKeyException e) {
getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e);
throw new IllegalArgumentException(e);
}
return cloudQueueClient;
}
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Collections;
import java.util.Arrays;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
@SeeAlso({PutAzureQueueStorage.class})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'. Note: There might be chances of receiving duplicates in situations like " +
"when a message is received but was unable to be deleted from the queue due to some unexpected situations.")
@WritesAttributes({
@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
})
public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
.name("auto-delete-messages")
.displayName("Auto Delete Messages")
.description("Specifies whether the received message is to be automatically deleted from the queue.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("batch-size")
.displayName("Batch Size")
.description("The number of messages to be retrieved from the queue.")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, 32, true))
.defaultValue("32")
.build();
public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
.name("visibility-timeout")
.displayName("Visibility Timeout")
.description("The duration during which the retrieved message should be invisible to other consumers.")
.required(true)
.defaultValue("30 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
BATCH_SIZE, VISIBILITY_TIMEOUT));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
final String queue = context.getProperty(QUEUE).evaluateAttributeExpressions().getValue().toLowerCase();
final Iterable<CloudQueueMessage> retrievedMessagesIterable;
CloudQueueClient cloudQueueClient;
CloudQueue cloudQueue;
try {
cloudQueueClient = createCloudQueueClient(context, null);
cloudQueue = cloudQueueClient.getQueueReference(queue);
retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, null);
} catch (URISyntaxException | StorageException e) {
getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
context.yield();
return;
}
final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
for (final CloudQueueMessage message : cloudQueueMessages) {
FlowFile flowFile = session.create();
final Map<String, String> attributes = new HashMap<>();
attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
attributes.put("azure.queue.messageId", message.getMessageId());
attributes.put("azure.queue.popReceipt", message.getPopReceipt());
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, out -> {
try {
out.write(message.getMessageContentAsByte());
} catch (StorageException e) {
getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
context.yield();
}
});
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
}
if(autoDelete) {
session.commit();
for (final CloudQueueMessage message : cloudQueueMessages) {
try {
cloudQueue.deleteMessage(message);
} catch (StorageException e) {
getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
new Object[] {message.getMessageId(), e});
}
}
}
}
@Override
public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
final int visibilityTimeout = validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
if (visibilityTimeout <= 0) {
problems.add(new ValidationResult.Builder()
.valid(false)
.subject(VISIBILITY_TIMEOUT.getDisplayName())
.explanation(VISIBILITY_TIMEOUT.getDisplayName() + " should be greater than 0 secs")
.build());
}
return problems;
}
private List<CloudQueueMessage> toList(Iterable<CloudQueueMessage> iterable) {
if (iterable instanceof List) {
return (List<CloudQueueMessage>) iterable;
}
final ArrayList<CloudQueueMessage> list = new ArrayList<>();
if (iterable != null) {
for(CloudQueueMessage message : iterable) {
list.add(message);
}
}
return list;
}
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.io.ByteArrayOutputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@SeeAlso({GetAzureQueueStorage.class})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" })
@CapabilityDescription("Writes the content of the incoming FlowFiles to the configured Azure Queue Storage.")
public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
.name("time-to-live")
.displayName("TTL")
.description("Maximum time to allow the message to be in the queue. If left empty, the default value of 7 days will be used.")
.required(false)
.defaultValue("7 days")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor VISIBILITY_DELAY = new PropertyDescriptor.Builder()
.name("visibility-delay")
.displayName("Visibility Delay")
.description("The length of time during which the message will be invisible, starting when it is added to the queue. " +
"This value must be greater than or equal to 0 and less than the TTL value.")
.required(false)
.defaultValue("0 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, TTL,
QUEUE, VISIBILITY_DELAY));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
final String flowFileContent = baos.toString();
CloudQueueMessage message = new CloudQueueMessage(flowFileContent);
CloudQueueClient cloudQueueClient;
CloudQueue cloudQueue;
final int ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
final int delay = context.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
final String queue = context.getProperty(QUEUE).evaluateAttributeExpressions(flowFile).getValue().toLowerCase();
try {
cloudQueueClient = createCloudQueueClient(context, flowFile);
cloudQueue = cloudQueueClient.getQueueReference(queue);
cloudQueue.addMessage(message, ttl, delay, null, null);
} catch (URISyntaxException | StorageException e) {
getLogger().error("Failed to write the message to Azure Queue Storage due to {}", new Object[]{e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
session.transfer(flowFile, REL_SUCCESS);
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, cloudQueue.getUri().toString(), transmissionMillis);
}
@Override
public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
final boolean ttlSet = validationContext.getProperty(TTL).isSet();
final boolean delaySet = validationContext.getProperty(VISIBILITY_DELAY).isSet();
final int ttl = validationContext.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
if (ttlSet) {
final int SEVEN_DAYS_TIMEPERIOD_IN_SECS = 604800; // i.e. 7 * 24 * 60 * 60
if (ttl > SEVEN_DAYS_TIMEPERIOD_IN_SECS) {
problems.add(new ValidationResult.Builder()
.subject(TTL.getDisplayName())
.valid(false)
.explanation(TTL.getDisplayName() + " exceeds the allowed limit of 7 days. Set a value less than 7 days")
.build());
}
}
if (delaySet) {
int delay = validationContext.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
if (delay > ttl || delay < 0) {
problems.add(new ValidationResult.Builder()
.subject(VISIBILITY_DELAY.getDisplayName())
.valid(false)
.explanation(VISIBILITY_DELAY.getDisplayName() + " should be greater than or equal to 0 and less than " + TTL.getDisplayName())
.build());
}
}
return problems;
}
}

View File

@ -19,3 +19,5 @@ org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage
org.apache.nifi.processors.azure.storage.ListAzureBlobStorage org.apache.nifi.processors.azure.storage.ListAzureBlobStorage
org.apache.nifi.processors.azure.storage.PutAzureBlobStorage org.apache.nifi.processors.azure.storage.PutAzureBlobStorage
org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage
org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage
org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage

View File

@ -23,9 +23,12 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.security.InvalidKeyException; import java.security.InvalidKeyException;
import java.util.Iterator;
import java.util.Properties; import java.util.Properties;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.util.file.FileUtils;
import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.CloudStorageAccount;
@ -34,11 +37,17 @@ import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer; import com.microsoft.azure.storage.blob.CloudBlobContainer;
public class AzureTestUtil { public class AzureTestUtil {
private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
static final String TEST_CONTAINER_NAME_PREFIX = "nifitest";
private static final Properties CONFIG; private static final Properties CONFIG;
static final String TEST_BLOB_NAME = "testing";
private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
private static final String FORMAT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
public static final String TEST_BLOB_NAME = "testing";
public static final String TEST_STORAGE_QUEUE = "testqueue";
public static final String TEST_CONTAINER_NAME_PREFIX = "nifitest";
public static CloudQueue cloudQueue;
static { static {
final FileInputStream fis; final FileInputStream fis;
@ -67,10 +76,30 @@ public class AzureTestUtil {
} }
public static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException { public static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException {
String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, getAccountName(), getAccountKey()); CloudBlobClient blobClient = getStorageAccount().createCloudBlobClient();
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
return blobClient.getContainerReference(containerName); return blobClient.getContainerReference(containerName);
} }
public static CloudQueue getQueue(String queueName) throws URISyntaxException, InvalidKeyException, StorageException {
CloudQueueClient cloudQueueClient = getStorageAccount().createCloudQueueClient();
cloudQueue = cloudQueueClient.getQueueReference(queueName);
return cloudQueue;
}
private static CloudStorageAccount getStorageAccount() throws URISyntaxException, InvalidKeyException {
String storageConnectionString = String.format(FORMAT_CONNECTION_STRING, getAccountName(), getAccountKey());
return CloudStorageAccount.parse(storageConnectionString);
}
public static int getQueueCount() throws StorageException {
Iterator<CloudQueueMessage> retrievedMessages = cloudQueue.retrieveMessages(10, 1, null, null).iterator();
int count = 0;
while (retrievedMessages.hasNext()) {
retrievedMessages.next();
count++;
}
return count;
}
} }

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import org.apache.nifi.processors.azure.storage.AzureTestUtil;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.List;
public class GetAzureQueueStorageIT {
private final TestRunner runner = TestRunners.newTestRunner(GetAzureQueueStorage.class);
private static CloudQueue cloudQueue;
@BeforeClass
public static void setup() throws InvalidKeyException, StorageException, URISyntaxException {
cloudQueue = AzureTestUtil.getQueue(AzureTestUtil.TEST_STORAGE_QUEUE);
cloudQueue.createIfNotExists();
}
@Test
public void testGetWithAutoDeleteFalse() throws StorageException, InterruptedException {
cloudQueue.clear();
insertDummyMessages();
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
runner.setProperty(GetAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false");
runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
runner.run(1);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
Assert.assertFalse(mockFlowFiles.isEmpty());
Thread.sleep(1500);
cloudQueue.downloadAttributes();
Assert.assertEquals(3, cloudQueue.getApproximateMessageCount());
}
@Test
public void testGetWithELAndAutoDeleteTrue() throws StorageException, InterruptedException {
cloudQueue.clear();
insertDummyMessages();
runner.setValidateExpressionUsage(true);
runner.setVariable("account.name", AzureTestUtil.getAccountName());
runner.setVariable("account.key", AzureTestUtil.getAccountKey());
runner.setVariable("queue.name", AzureTestUtil.TEST_STORAGE_QUEUE);
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}");
runner.setProperty(GetAzureQueueStorage.QUEUE, "${queue.name}");
runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "true");
runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
runner.run(1);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
Assert.assertFalse(mockFlowFiles.isEmpty());
Thread.sleep(1500);
cloudQueue.downloadAttributes();
Assert.assertEquals(0, cloudQueue.getApproximateMessageCount());
}
@Test
public void testGetWithVisibilityTimeout() throws StorageException, InterruptedException {
cloudQueue.clear();
insertDummyMessages();
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
runner.setProperty(GetAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false");
runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
runner.run(1);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
Assert.assertFalse(mockFlowFiles.isEmpty());
Assert.assertEquals(0, AzureTestUtil.getQueueCount());
Thread.sleep(1500);
Assert.assertEquals(3, AzureTestUtil.getQueueCount());
}
@Test
public void testGetWithBatchSize() throws StorageException {
cloudQueue.clear();
insertDummyMessages();
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
runner.setProperty(GetAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "2");
runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "true");
runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
runner.run(1);
runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 2);
runner.run(1);
runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3);
}
private static void insertDummyMessages() throws StorageException {
cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 1"), 604800, 0, null, null);
cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 2"), 604800, 0, null, null);
cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 3"), 604800, 0, null, null);
}
@AfterClass
public static void cleanup() throws StorageException {
cloudQueue.deleteIfExists();
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue;
import org.apache.nifi.processors.azure.storage.AzureTestUtil;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
public class PutAzureQueueStorageIT {
private final TestRunner runner = TestRunners.newTestRunner(PutAzureQueueStorage.class);
private static CloudQueue cloudQueue;
@BeforeClass
public static void setup() throws InvalidKeyException, StorageException, URISyntaxException {
cloudQueue = AzureTestUtil.getQueue(AzureTestUtil.TEST_STORAGE_QUEUE);
cloudQueue.createIfNotExists();
}
@Test
public void testSimplePut() throws InvalidKeyException, StorageException, URISyntaxException {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
runner.setProperty(PutAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
runner.enqueue("Dummy message");
runner.run(1);
runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
}
@Test
public void testSimplePutWithEL() throws StorageException, URISyntaxException, InvalidKeyException {
runner.setValidateExpressionUsage(true);
runner.setVariable("account.name", AzureTestUtil.getAccountName());
runner.setVariable("account.key", AzureTestUtil.getAccountKey());
runner.setVariable("queue.name", AzureTestUtil.TEST_STORAGE_QUEUE);
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}");
runner.setProperty(PutAzureQueueStorage.QUEUE, "${queue.name}");
runner.enqueue("Dummy message");
runner.run(1);
runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
}
@Test
public void testPutWithTTL() throws StorageException, InterruptedException {
cloudQueue.clear();
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
runner.setProperty(PutAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
runner.setProperty(PutAzureQueueStorage.TTL, "2 secs");
runner.enqueue("Dummy message");
runner.run();
runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
Assert.assertEquals(1, AzureTestUtil.getQueueCount());
Thread.sleep(2400);
Assert.assertEquals(0, AzureTestUtil.getQueueCount());
}
@Test
public void testPutWithVisibilityDelay() throws StorageException, InterruptedException {
cloudQueue.clear();
cloudQueue.clear();
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
runner.setProperty(PutAzureQueueStorage.QUEUE, AzureTestUtil.TEST_STORAGE_QUEUE);
runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "2 secs");
runner.enqueue("Dummy message");
runner.run(1);
runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
Assert.assertEquals(0, AzureTestUtil.getQueueCount());
Thread.sleep(2400);
Assert.assertEquals(1, AzureTestUtil.getQueueCount());
}
@AfterClass
public static void cleanup() throws StorageException {
cloudQueue.deleteIfExists();
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
public class TestGetAzureQueueStorage {
private final TestRunner runner = TestRunners.newTestRunner(GetAzureQueueStorage.class);
@Test
public void testValidVisibilityTimeout() {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue");
runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "10 secs");
ProcessContext processContext = runner.getProcessContext();
Collection<ValidationResult> results = new HashSet<>();
if (processContext instanceof MockProcessContext) {
results = ((MockProcessContext) processContext).validate();
}
Assert.assertEquals(0, results.size());
}
@Test
public void testInvalidVisibilityTimeout() {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue");
runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "0 secs");
ProcessContext processContext = runner.getProcessContext();
Collection<ValidationResult> results = new HashSet<>();
if (processContext instanceof MockProcessContext) {
results = ((MockProcessContext) processContext).validate();
}
Assert.assertEquals(1, results.size());
Iterator<ValidationResult> iterator = results.iterator();
Assert.assertTrue(iterator.next().getExplanation().contains("should be greater than 0 secs"));
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.StorageException;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
public class TestPutAzureQueueStorage {
private final TestRunner runner = TestRunners.newTestRunner(PutAzureQueueStorage.class);
@Test
public void testInvalidTTLAndVisibilityDelay() throws StorageException, URISyntaxException, InvalidKeyException {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue");
runner.setProperty(PutAzureQueueStorage.TTL, "8 days");
runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "9 days");
ProcessContext processContext = runner.getProcessContext();
Collection<ValidationResult> results = new HashSet<>();
if (processContext instanceof MockProcessContext) {
results = ((MockProcessContext) processContext).validate();
}
Assert.assertEquals(2, results.size());
Iterator<ValidationResult> iterator = results.iterator();
Assert.assertTrue(iterator.next().toString().contains(PutAzureQueueStorage.TTL.getDisplayName() + " exceeds the allowed limit of 7 days. Set a value less than 7 days"));
Assert.assertTrue(iterator.next().toString().contains(PutAzureQueueStorage.VISIBILITY_DELAY.getDisplayName() + " should be greater than or equal to 0 and less than"));
}
@Test
public void testAllValidProperties() {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue");
runner.setProperty(PutAzureQueueStorage.TTL, "6 days");
runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "5 days");
ProcessContext processContext = runner.getProcessContext();
Collection<ValidationResult> results = new HashSet<>();
if (processContext instanceof MockProcessContext) {
results = ((MockProcessContext) processContext).validate();
}
Assert.assertEquals(0, results.size());
}
}