mirror of https://github.com/apache/nifi.git
NIFI-11549 Added AzureQueueStorage_v12 Processors
- Deprecated GetAzureQueueStorage and PutAzureQueueStorage This closes #7269 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
7a7f6f6229
commit
b11373af7b
|
@ -92,6 +92,10 @@
|
|||
<groupId>com.azure</groupId>
|
||||
<artifactId>azure-security-keyvault-keys</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.azure</groupId>
|
||||
<artifactId>azure-storage-queue</artifactId>
|
||||
</dependency>
|
||||
<!-- Legacy Microsoft Azure Libraries -->
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
|
|
|
@ -0,0 +1,190 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.azure.storage.queue;
|
||||
|
||||
import com.azure.core.credential.AzureSasCredential;
|
||||
import com.azure.core.credential.TokenCredential;
|
||||
import com.azure.core.http.HttpClient;
|
||||
import com.azure.core.http.ProxyOptions;
|
||||
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
|
||||
import com.azure.identity.ClientSecretCredentialBuilder;
|
||||
import com.azure.identity.ManagedIdentityCredentialBuilder;
|
||||
import com.azure.storage.common.StorageSharedKeyCredential;
|
||||
import com.azure.storage.queue.QueueClient;
|
||||
import com.azure.storage.queue.QueueClientBuilder;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.context.PropertyContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.azure.AzureServiceEndpoints;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
|
||||
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class AbstractAzureQueueStorage_v12 extends AbstractProcessor {
|
||||
public static final PropertyDescriptor QUEUE_NAME = new PropertyDescriptor.Builder()
|
||||
.name("Queue Name")
|
||||
.displayName("Queue Name")
|
||||
.description("Name of the Azure Storage Queue")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ENDPOINT_SUFFIX = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(AzureStorageUtils.ENDPOINT_SUFFIX)
|
||||
.displayName("Endpoint Suffix")
|
||||
.description("Storage accounts in public Azure always use a common FQDN suffix. " +
|
||||
"Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions).")
|
||||
.required(true)
|
||||
.defaultValue(AzureServiceEndpoints.DEFAULT_QUEUE_ENDPOINT_SUFFIX)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("Credentials Service")
|
||||
.displayName("Credentials Service")
|
||||
.description("Controller Service used to obtain Azure Storage Credentials.")
|
||||
.identifiesControllerService(AzureStorageCredentialsService_v12.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor REQUEST_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("Request Timeout")
|
||||
.displayName("Request Timeout")
|
||||
.description("The timeout for read or write requests to Azure Queue Storage. " +
|
||||
"Defaults to 1 second.")
|
||||
.required(true)
|
||||
.defaultValue("10 secs")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("All successfully processed FlowFiles are routed to this relationship")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("Unsuccessful operations will be transferred to the failure relationship.")
|
||||
.build();
|
||||
|
||||
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
|
||||
|
||||
static final String URI_ATTRIBUTE = "azure.queue.uri";
|
||||
static final String INSERTION_TIME_ATTRIBUTE = "azure.queue.insertionTime";
|
||||
static final String EXPIRATION_TIME_ATTRIBUTE = "azure.queue.expirationTime";
|
||||
static final String MESSAGE_ID_ATTRIBUTE = "azure.queue.messageId";
|
||||
static final String POP_RECEIPT_ATTRIBUTE = "azure.queue.popReceipt";
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
final List<ValidationResult> results = new ArrayList<>();
|
||||
final int requestTimeout = validationContext.getProperty(REQUEST_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
|
||||
|
||||
if (requestTimeout <= 0 || requestTimeout > 30) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.valid(false)
|
||||
.subject(REQUEST_TIMEOUT.getDisplayName())
|
||||
.explanation(REQUEST_TIMEOUT.getDisplayName() + " should be greater than 0 secs " +
|
||||
"and less than or equal to 30 secs")
|
||||
.build());
|
||||
}
|
||||
|
||||
AzureStorageUtils.validateProxySpec(validationContext, results);
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
protected final QueueClient createQueueClient(final ProcessContext context, final FlowFile flowFile) {
|
||||
final QueueClientBuilder clientBuilder = new QueueClientBuilder();
|
||||
|
||||
final AzureStorageCredentialsService_v12 storageCredentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
|
||||
final Map<String, String> attributes = flowFile == null ? Collections.emptyMap() : flowFile.getAttributes();
|
||||
final AzureStorageCredentialsDetails_v12 storageCredentialsDetails = storageCredentialsService.getCredentialsDetails(attributes);
|
||||
processCredentials(clientBuilder, storageCredentialsDetails);
|
||||
processProxyOptions(clientBuilder, context);
|
||||
|
||||
final String endpointSuffix = context.getProperty(ENDPOINT_SUFFIX).getValue();
|
||||
clientBuilder.endpoint(String.format("https://%s.%s", storageCredentialsDetails.getAccountName(), endpointSuffix));
|
||||
|
||||
final String queueName = context.getProperty(QUEUE_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
clientBuilder.queueName(queueName);
|
||||
return clientBuilder.buildClient();
|
||||
}
|
||||
|
||||
private void processCredentials(final QueueClientBuilder clientBuilder, final AzureStorageCredentialsDetails_v12 storageCredentialsDetails) {
|
||||
switch (storageCredentialsDetails.getCredentialsType()) {
|
||||
case ACCOUNT_KEY:
|
||||
clientBuilder.credential(new StorageSharedKeyCredential(storageCredentialsDetails.getAccountName(), storageCredentialsDetails.getAccountKey()));
|
||||
break;
|
||||
case SAS_TOKEN:
|
||||
clientBuilder.credential(new AzureSasCredential(storageCredentialsDetails.getSasToken()));
|
||||
break;
|
||||
case MANAGED_IDENTITY:
|
||||
clientBuilder.credential(new ManagedIdentityCredentialBuilder()
|
||||
.clientId(storageCredentialsDetails.getManagedIdentityClientId())
|
||||
.build());
|
||||
break;
|
||||
case SERVICE_PRINCIPAL:
|
||||
clientBuilder.credential(new ClientSecretCredentialBuilder()
|
||||
.tenantId(storageCredentialsDetails.getServicePrincipalTenantId())
|
||||
.clientId(storageCredentialsDetails.getServicePrincipalClientId())
|
||||
.clientSecret(storageCredentialsDetails.getServicePrincipalClientSecret())
|
||||
.build());
|
||||
break;
|
||||
case ACCESS_TOKEN:
|
||||
TokenCredential credential = tokenRequestContext -> Mono.just(storageCredentialsDetails.getAccessToken());
|
||||
clientBuilder.credential(credential);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unhandled credentials type: " + storageCredentialsDetails.getCredentialsType());
|
||||
}
|
||||
}
|
||||
|
||||
private void processProxyOptions(final QueueClientBuilder clientBuilder,
|
||||
final PropertyContext propertyContext) {
|
||||
final ProxyOptions proxyOptions = AzureStorageUtils.getProxyOptions(propertyContext);
|
||||
final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder();
|
||||
nettyClientBuilder.proxy(proxyOptions);
|
||||
|
||||
final HttpClient nettyClient = nettyClientBuilder.build();
|
||||
clientBuilder.httpClient(nettyClient);
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
|||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.DeprecationNotice;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
|
@ -63,6 +64,7 @@ import java.util.concurrent.TimeUnit;
|
|||
@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
|
||||
@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
|
||||
})
|
||||
@DeprecationNotice(alternatives = GetAzureQueueStorage_v12.class)
|
||||
public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
|
||||
|
||||
public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.azure.storage.queue;
|
||||
|
||||
import com.azure.core.util.Context;
|
||||
import com.azure.storage.queue.QueueClient;
|
||||
import com.azure.storage.queue.models.QueueMessageItem;
|
||||
import com.azure.storage.queue.models.QueueStorageException;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
import org.apache.nifi.proxy.ProxySpec;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.EXPIRATION_TIME_ATTRIBUTE;
|
||||
import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.INSERTION_TIME_ATTRIBUTE;
|
||||
import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.MESSAGE_ID_ATTRIBUTE;
|
||||
import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.POP_RECEIPT_ATTRIBUTE;
|
||||
import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.URI_ATTRIBUTE;
|
||||
|
||||
@SeeAlso({PutAzureQueueStorage.class})
|
||||
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||
@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
|
||||
@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
|
||||
"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'. Note: There might be chances of receiving duplicates in situations like " +
|
||||
"when a message is received but was unable to be deleted from the queue due to some unexpected situations.")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = URI_ATTRIBUTE, description = "The absolute URI of the configured Azure Queue Storage"),
|
||||
@WritesAttribute(attribute = INSERTION_TIME_ATTRIBUTE, description = "The time when the message was inserted into the queue storage"),
|
||||
@WritesAttribute(attribute = EXPIRATION_TIME_ATTRIBUTE, description = "The time when the message will expire from the queue storage"),
|
||||
@WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = "The ID of the retrieved message"),
|
||||
@WritesAttribute(attribute = POP_RECEIPT_ATTRIBUTE, description = "The pop receipt of the retrieved message"),
|
||||
})
|
||||
public class GetAzureQueueStorage_v12 extends AbstractAzureQueueStorage_v12 {
|
||||
public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
|
||||
.name("Auto Delete Messages")
|
||||
.displayName("Auto Delete Messages")
|
||||
.description("Specifies whether the received message is to be automatically deleted from the queue.")
|
||||
.required(true)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MESSAGE_BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Message Batch Size")
|
||||
.displayName("Message Batch Size")
|
||||
.description("The number of messages to be retrieved from the queue.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.createLongValidator(1, 32, true))
|
||||
.defaultValue("32")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("Visibility Timeout")
|
||||
.displayName("Visibility Timeout")
|
||||
.description("The duration during which the retrieved message should be invisible to other consumers.")
|
||||
.required(true)
|
||||
.defaultValue("30 secs")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
|
||||
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.SOCKS};
|
||||
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(
|
||||
Arrays.asList(
|
||||
QUEUE_NAME,
|
||||
ENDPOINT_SUFFIX,
|
||||
STORAGE_CREDENTIALS_SERVICE,
|
||||
AUTO_DELETE,
|
||||
MESSAGE_BATCH_SIZE,
|
||||
VISIBILITY_TIMEOUT,
|
||||
REQUEST_TIMEOUT,
|
||||
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, PROXY_SPECS)
|
||||
)
|
||||
);
|
||||
|
||||
private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
|
||||
|
||||
// 7 days is the maximum timeout as per https://learn.microsoft.com/en-us/rest/api/storageservices/get-messages
|
||||
private static final Duration MAX_VISIBILITY_TIMEOUT = Duration.ofDays(7);
|
||||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return PROPERTIES;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final List<ValidationResult> results = (List<ValidationResult>) super.customValidate(validationContext);
|
||||
|
||||
final Duration visibilityTimeout = Duration.ofSeconds(
|
||||
validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS)
|
||||
);
|
||||
|
||||
if (visibilityTimeout.getSeconds() <= 0) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.valid(false)
|
||||
.subject(VISIBILITY_TIMEOUT.getDisplayName())
|
||||
.explanation(VISIBILITY_TIMEOUT.getDisplayName() + " should be greater than 0 secs")
|
||||
.build());
|
||||
}
|
||||
|
||||
if (MAX_VISIBILITY_TIMEOUT.compareTo(visibilityTimeout) < 0) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.valid(false)
|
||||
.subject(VISIBILITY_TIMEOUT.getDisplayName())
|
||||
.explanation(VISIBILITY_TIMEOUT.getDisplayName() + " should not be greater than 7 days")
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
final int batchSize = context.getProperty(MESSAGE_BATCH_SIZE).asInteger();
|
||||
final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
|
||||
final int requestTimeoutInSecs = context.getProperty(REQUEST_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
|
||||
final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
|
||||
|
||||
final QueueClient queueClient = createQueueClient(context, null);
|
||||
final Iterable<QueueMessageItem> retrievedMessagesIterable;
|
||||
try {
|
||||
retrievedMessagesIterable = queueClient.receiveMessages(
|
||||
batchSize,
|
||||
Duration.ofSeconds(visibilityTimeoutInSecs),
|
||||
Duration.ofSeconds(requestTimeoutInSecs),
|
||||
Context.NONE);
|
||||
} catch (final QueueStorageException e) {
|
||||
getLogger().error("Failed to retrieve messages from Azure Storage Queue", e);
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
final Map<String, String> messagesToDelete = new LinkedHashMap<>();
|
||||
|
||||
for (final QueueMessageItem message : retrievedMessagesIterable) {
|
||||
FlowFile flowFile = session.create();
|
||||
|
||||
final Map<String, String> attributes = new LinkedHashMap<>();
|
||||
attributes.put("azure.queue.uri", queueClient.getQueueUrl());
|
||||
attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
|
||||
attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
|
||||
attributes.put("azure.queue.messageId", message.getMessageId());
|
||||
attributes.put("azure.queue.popReceipt", message.getPopReceipt());
|
||||
|
||||
if (autoDelete) {
|
||||
messagesToDelete.put(message.getMessageId(), message.getPopReceipt());
|
||||
}
|
||||
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
flowFile = session.write(flowFile, out -> out.write(message.getBody().toString().getBytes()));
|
||||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.getProvenanceReporter().receive(flowFile, queueClient.getQueueUrl().toString());
|
||||
}
|
||||
|
||||
if (autoDelete) {
|
||||
session.commitAsync(() -> {
|
||||
for (final Map.Entry<String, String> entry : messagesToDelete.entrySet()) {
|
||||
final String messageId = entry.getKey();
|
||||
final String popReceipt = entry.getValue();
|
||||
queueClient.deleteMessage(messageId, popReceipt);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -33,6 +33,7 @@ import com.microsoft.azure.storage.queue.CloudQueueMessage;
|
|||
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.DeprecationNotice;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
|
@ -49,6 +50,7 @@ import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
|||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" })
|
||||
@CapabilityDescription("Writes the content of the incoming FlowFiles to the configured Azure Queue Storage.")
|
||||
@DeprecationNotice(alternatives = PutAzureQueueStorage_v12.class)
|
||||
public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
|
||||
|
||||
public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -0,0 +1,162 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.azure.storage.queue;
|
||||
|
||||
import com.azure.core.util.Context;
|
||||
import com.azure.storage.queue.QueueClient;
|
||||
import com.azure.storage.queue.models.QueueStorageException;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
import org.apache.nifi.proxy.ProxySpec;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@SeeAlso({GetAzureQueueStorage_v12.class})
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" })
|
||||
@CapabilityDescription("Writes the content of the incoming FlowFiles to the configured Azure Queue Storage.")
|
||||
public class PutAzureQueueStorage_v12 extends AbstractAzureQueueStorage_v12 {
|
||||
public static final PropertyDescriptor MESSAGE_TIME_TO_LIVE = new PropertyDescriptor.Builder()
|
||||
.name("Message Time To Live")
|
||||
.displayName("Message Time To Live")
|
||||
.description("Maximum time to allow the message to be in the queue")
|
||||
.required(true)
|
||||
.defaultValue("7 days")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("Visibility Timeout")
|
||||
.displayName("Visibility Timeout")
|
||||
.description("The length of time during which the message will be invisible after it is read. " +
|
||||
"If the processing unit fails to delete the message after it is read, then the message will reappear in the queue.")
|
||||
.required(true)
|
||||
.defaultValue("30 secs")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
|
||||
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.SOCKS};
|
||||
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(
|
||||
Arrays.asList(
|
||||
QUEUE_NAME,
|
||||
ENDPOINT_SUFFIX,
|
||||
STORAGE_CREDENTIALS_SERVICE,
|
||||
MESSAGE_TIME_TO_LIVE,
|
||||
VISIBILITY_TIMEOUT,
|
||||
REQUEST_TIMEOUT,
|
||||
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, PROXY_SPECS)
|
||||
)
|
||||
);
|
||||
|
||||
// 7 days is the maximum timeout as per https://learn.microsoft.com/en-us/rest/api/storageservices/get-messages
|
||||
private static final Duration MAX_VISIBILITY_TIMEOUT = Duration.ofDays(7);
|
||||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return PROPERTIES;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final List<ValidationResult> results = (List<ValidationResult>) super.customValidate(validationContext);
|
||||
final Duration visibilityTimeout = Duration.ofSeconds(
|
||||
validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS)
|
||||
);
|
||||
|
||||
if (visibilityTimeout.getSeconds() <= 0) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.valid(false)
|
||||
.subject(VISIBILITY_TIMEOUT.getDisplayName())
|
||||
.explanation(VISIBILITY_TIMEOUT.getDisplayName() + " should be greater than 0 secs")
|
||||
.build());
|
||||
}
|
||||
|
||||
if (MAX_VISIBILITY_TIMEOUT.compareTo(visibilityTimeout) < 0) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.valid(false)
|
||||
.subject(VISIBILITY_TIMEOUT.getDisplayName())
|
||||
.explanation(VISIBILITY_TIMEOUT.getDisplayName() + " should not be greater than 7 days")
|
||||
.build());
|
||||
}
|
||||
|
||||
final int ttl = validationContext.getProperty(MESSAGE_TIME_TO_LIVE).asTimePeriod(TimeUnit.SECONDS).intValue();
|
||||
if (ttl <= 0) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(MESSAGE_TIME_TO_LIVE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation(MESSAGE_TIME_TO_LIVE.getDisplayName() + " should be any positive number")
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final long startNanos = System.nanoTime();
|
||||
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
session.exportTo(flowFile, baos);
|
||||
final String flowFileContent = baos.toString();
|
||||
|
||||
final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
|
||||
final int ttl = context.getProperty(MESSAGE_TIME_TO_LIVE).asTimePeriod(TimeUnit.SECONDS).intValue();
|
||||
final int requestTimeoutInSecs = context.getProperty(REQUEST_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
|
||||
|
||||
final QueueClient queueClient = createQueueClient(context, flowFile);
|
||||
try {
|
||||
queueClient.sendMessageWithResponse(
|
||||
flowFileContent,
|
||||
Duration.ofSeconds(visibilityTimeoutInSecs),
|
||||
Duration.ofSeconds(ttl),
|
||||
Duration.ofSeconds(requestTimeoutInSecs),
|
||||
Context.NONE
|
||||
);
|
||||
} catch (final QueueStorageException e) {
|
||||
getLogger().error("Failed to write message to Azure Queue Storage", e);
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
session.getProvenanceReporter().send(flowFile, queueClient.getQueueUrl().toString(), transmissionMillis);
|
||||
}
|
||||
}
|
|
@ -32,12 +32,12 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Provides credentials details for Azure Blob processors
|
||||
* Provides credentials details for Azure Storage processors
|
||||
*
|
||||
* @see AbstractControllerService
|
||||
*/
|
||||
@Tags({"azure", "microsoft", "cloud", "storage", "blob", "credentials"})
|
||||
@CapabilityDescription("Provides credentials for Azure Blob processors using Azure Blob Storage client library v12.")
|
||||
@Tags({"azure", "microsoft", "cloud", "storage", "blob", "credentials", "queue"})
|
||||
@CapabilityDescription("Provides credentials for Azure Storage processors using Azure Storage client library v12.")
|
||||
public class AzureStorageCredentialsControllerService_v12 extends AbstractControllerService implements AzureStorageCredentialsService_v12 {
|
||||
|
||||
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
|
||||
|
|
|
@ -27,3 +27,5 @@ org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12
|
|||
org.apache.nifi.processors.azure.storage.PutAzureBlobStorage_v12
|
||||
org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage_v12
|
||||
org.apache.nifi.processors.azure.storage.MoveAzureDataLakeStorage
|
||||
org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage_v12
|
||||
org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage_v12
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.azure.storage.queue;
|
||||
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12;
|
||||
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
|
||||
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsType;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
|
||||
public abstract class AbstractTestAzureQueueStorage_v12 {
|
||||
public static final String CREDENTIALS_SERVICE_IDENTIFIER = "credentials-service";
|
||||
protected TestRunner runner;
|
||||
protected AzureStorageCredentialsService_v12 credentialsService = new AzureStorageCredentialsControllerService_v12();
|
||||
|
||||
protected void setupStorageCredentialsService() throws InitializationException {
|
||||
runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER, credentialsService);
|
||||
runner.setProperty(credentialsService, AzureStorageCredentialsControllerService_v12.ACCOUNT_NAME, "account-name");
|
||||
runner.setProperty(credentialsService, AzureStorageCredentialsControllerService_v12.CREDENTIALS_TYPE, AzureStorageCredentialsType.ACCOUNT_KEY.getAllowableValue());
|
||||
runner.setProperty(credentialsService, AzureStorageCredentialsControllerService_v12.ACCOUNT_KEY, "account-key");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.azure.storage.queue;
|
||||
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class TestGetAzureQueueStorage_v12 extends AbstractTestAzureQueueStorage_v12 {
|
||||
@BeforeEach
|
||||
public void setup() throws InitializationException {
|
||||
runner = TestRunners.newTestRunner(GetAzureQueueStorage_v12.class);
|
||||
setupStorageCredentialsService();
|
||||
runner.enableControllerService(credentialsService);
|
||||
runner.setProperty(GetAzureQueueStorage_v12.STORAGE_CREDENTIALS_SERVICE, CREDENTIALS_SERVICE_IDENTIFIER);
|
||||
runner.setProperty(GetAzureQueueStorage_v12.QUEUE_NAME, "queue");
|
||||
runner.setProperty(GetAzureQueueStorage_v12.MESSAGE_BATCH_SIZE, "10");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidVisibilityTimeout() {
|
||||
runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 secs");
|
||||
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidVisibilityTimeoutZeroSecs() {
|
||||
runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "0 secs");
|
||||
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidVisibilityTimeoutMoreThanSevenDays() {
|
||||
runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "8 days");
|
||||
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidRequestTimeout() {
|
||||
runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 secs");
|
||||
runner.setProperty(GetAzureQueueStorage_v12.REQUEST_TIMEOUT, "15 secs");
|
||||
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidRequestTimeoutZeroSecs() {
|
||||
runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 secs");
|
||||
runner.setProperty(GetAzureQueueStorage_v12.REQUEST_TIMEOUT, "0 secs");
|
||||
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidRequestTimeoutMoreThanThirtySecs() {
|
||||
runner.setProperty(GetAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 secs");
|
||||
runner.setProperty(GetAzureQueueStorage_v12.REQUEST_TIMEOUT, "31 secs");
|
||||
|
||||
runner.assertNotValid();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.azure.storage.queue;
|
||||
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class TestPutAzureQueueStorage_v12 extends AbstractTestAzureQueueStorage_v12 {
|
||||
@BeforeEach
|
||||
public void setup() throws InitializationException {
|
||||
runner = TestRunners.newTestRunner(PutAzureQueueStorage_v12.class);
|
||||
setupStorageCredentialsService();
|
||||
runner.enableControllerService(credentialsService);
|
||||
runner.setProperty(PutAzureQueueStorage_v12.STORAGE_CREDENTIALS_SERVICE, CREDENTIALS_SERVICE_IDENTIFIER);
|
||||
runner.setProperty(PutAzureQueueStorage_v12.QUEUE_NAME, "queue");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidVisibilityTimeout() {
|
||||
runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days");
|
||||
runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "10 secs");
|
||||
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidVisibilityTimeoutZeroSecs() {
|
||||
runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days");
|
||||
runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "0 secs");
|
||||
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidVisibilityTimeoutMoreThanSevenDays() {
|
||||
runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days");
|
||||
runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "8 days");
|
||||
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidTTL() {
|
||||
runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days");
|
||||
runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 secs");
|
||||
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidTTLZeroSecs() {
|
||||
runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "0 secs");
|
||||
runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 secs");
|
||||
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidRequestTimeout() {
|
||||
runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days");
|
||||
runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 secs");
|
||||
runner.setProperty(PutAzureQueueStorage_v12.REQUEST_TIMEOUT, "15 secs");
|
||||
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidRequestTimeoutZeroSecs() {
|
||||
runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days");
|
||||
runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 secs");
|
||||
runner.setProperty(PutAzureQueueStorage_v12.REQUEST_TIMEOUT, "0 secs");
|
||||
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidRequestTimeoutMoreThanThirtySecs() {
|
||||
runner.setProperty(PutAzureQueueStorage_v12.MESSAGE_TIME_TO_LIVE, "7 days");
|
||||
runner.setProperty(PutAzureQueueStorage_v12.VISIBILITY_TIMEOUT, "30 secs");
|
||||
runner.setProperty(PutAzureQueueStorage_v12.REQUEST_TIMEOUT, "31 secs");
|
||||
|
||||
runner.assertNotValid();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue