NIFI-11656 Removed deprecated Azure Queue Storage Processors

- Removed AzureStorageCredentialsControllerService and related implementations
- Removed com.micrsoft.azure:azure-storage dependencies

This closes #7350

Signed-off-by: Nandor Soma Abonyi <nsabonyi@apache.org>
This commit is contained in:
exceptionfactory 2023-06-06 14:23:37 -05:00 committed by Nandor Soma Abonyi
parent dd2486d3e0
commit 9195b586e5
No known key found for this signature in database
GPG Key ID: AFFFD8C3A1A88ED7
27 changed files with 7 additions and 1927 deletions

View File

@ -96,11 +96,6 @@
<groupId>com.azure</groupId>
<artifactId>azure-storage-queue</artifactId>
</dependency>
<!-- Legacy Microsoft Azure Libraries -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
@ -113,6 +108,10 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>

View File

@ -1,80 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
.name("storage-queue-name")
.displayName("Queue Name")
.description("Name of the Azure Storage Queue")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All successfully processed FlowFiles are routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Unsuccessful operations will be transferred to the failure relationship.")
.build();
private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
protected final CloudQueueClient createCloudQueueClient(final ProcessContext context, final FlowFile flowFile) throws URISyntaxException {
final AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(context, flowFile);
final CloudStorageAccount cloudStorageAccount = AzureStorageUtils.getCloudStorageAccount(storageCredentialsDetails);
final CloudQueueClient cloudQueueClient = cloudStorageAccount.createCloudQueueClient();
return cloudQueueClient;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return AzureStorageUtils.validateCredentialProperties(validationContext);
}
}

View File

@ -1,215 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@SeeAlso({PutAzureQueueStorage.class})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +
"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'. Note: There might be chances of receiving duplicates in situations like " +
"when a message is received but was unable to be deleted from the queue due to some unexpected situations.")
@WritesAttributes({
@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"),
@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"),
@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"),
@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"),
@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"),
})
@DeprecationNotice(alternatives = GetAzureQueueStorage_v12.class)
public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
.name("auto-delete-messages")
.displayName("Auto Delete Messages")
.description("Specifies whether the received message is to be automatically deleted from the queue.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("batch-size")
.displayName("Batch Size")
.description("The number of messages to be retrieved from the queue.")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, 32, true))
.defaultValue("32")
.build();
public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
.name("visibility-timeout")
.displayName("Visibility Timeout")
.description("The duration during which the retrieved message should be invisible to other consumers.")
.required(true)
.defaultValue("30 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, AzureStorageUtils.ENDPOINT_SUFFIX,
QUEUE, AUTO_DELETE, BATCH_SIZE, VISIBILITY_TIMEOUT, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final int visibilityTimeoutInSecs = context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
final String queue = context.getProperty(QUEUE).evaluateAttributeExpressions().getValue().toLowerCase();
final Iterable<CloudQueueMessage> retrievedMessagesIterable;
CloudQueueClient cloudQueueClient;
CloudQueue cloudQueue;
try {
cloudQueueClient = createCloudQueueClient(context, null);
cloudQueue = cloudQueueClient.getQueueReference(queue);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
retrievedMessagesIterable = cloudQueue.retrieveMessages(batchSize, visibilityTimeoutInSecs, null, operationContext);
} catch (URISyntaxException | StorageException e) {
getLogger().error("Failed to retrieve messages from the provided Azure Storage Queue due to {}", new Object[] {e});
context.yield();
return;
}
final List<CloudQueueMessage> cloudQueueMessages = toList(retrievedMessagesIterable);
for (final CloudQueueMessage message : cloudQueueMessages) {
FlowFile flowFile = session.create();
final Map<String, String> attributes = new HashMap<>();
attributes.put("azure.queue.uri", cloudQueue.getUri().toString());
attributes.put("azure.queue.insertionTime", message.getInsertionTime().toString());
attributes.put("azure.queue.expirationTime", message.getExpirationTime().toString());
attributes.put("azure.queue.messageId", message.getMessageId());
attributes.put("azure.queue.popReceipt", message.getPopReceipt());
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, out -> {
try {
out.write(message.getMessageContentAsByte());
} catch (StorageException e) {
getLogger().error("Failed to write the retrieved queue message to FlowFile content due to {}", new Object[] {e});
context.yield();
}
});
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, cloudQueue.getStorageUri().toString());
}
if(autoDelete) {
session.commitAsync(() -> {
for (final CloudQueueMessage message : cloudQueueMessages) {
try {
cloudQueue.deleteMessage(message);
} catch (StorageException e) {
getLogger().error("Failed to delete the retrieved message with the id {} from the queue due to {}",
new Object[] {message.getMessageId(), e});
}
}
});
}
}
@Override
public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
final int visibilityTimeout = validationContext.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
if (visibilityTimeout <= 0) {
problems.add(new ValidationResult.Builder()
.valid(false)
.subject(VISIBILITY_TIMEOUT.getDisplayName())
.explanation(VISIBILITY_TIMEOUT.getDisplayName() + " should be greater than 0 secs")
.build());
}
AzureStorageUtils.validateProxySpec(validationContext, problems);
return problems;
}
private List<CloudQueueMessage> toList(Iterable<CloudQueueMessage> iterable) {
if (iterable instanceof List) {
return (List<CloudQueueMessage>) iterable;
}
final ArrayList<CloudQueueMessage> list = new ArrayList<>();
if (iterable != null) {
for(CloudQueueMessage message : iterable) {
list.add(message);
}
}
return list;
}
}

View File

@ -55,7 +55,7 @@ import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueS
import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.POP_RECEIPT_ATTRIBUTE;
import static org.apache.nifi.processors.azure.storage.queue.AbstractAzureQueueStorage_v12.URI_ATTRIBUTE;
@SeeAlso({PutAzureQueueStorage.class})
@SeeAlso({PutAzureQueueStorage_v12.class})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " +

View File

@ -1,162 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import java.io.ByteArrayOutputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
@SeeAlso({GetAzureQueueStorage.class})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" })
@CapabilityDescription("Writes the content of the incoming FlowFiles to the configured Azure Queue Storage.")
@DeprecationNotice(alternatives = PutAzureQueueStorage_v12.class)
public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
.name("time-to-live")
.displayName("TTL")
.description("Maximum time to allow the message to be in the queue. If left empty, the default value of 7 days will be used.")
.required(false)
.defaultValue("7 days")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor VISIBILITY_DELAY = new PropertyDescriptor.Builder()
.name("visibility-delay")
.displayName("Visibility Delay")
.description("The length of time during which the message will be invisible, starting when it is added to the queue. " +
"This value must be greater than or equal to 0 and less than the TTL value.")
.required(false)
.defaultValue("0 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, AzureStorageUtils.ENDPOINT_SUFFIX,
TTL, QUEUE, VISIBILITY_DELAY, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
session.exportTo(flowFile, baos);
final String flowFileContent = baos.toString();
CloudQueueMessage message = new CloudQueueMessage(flowFileContent);
CloudQueueClient cloudQueueClient;
CloudQueue cloudQueue;
final int ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
final int delay = context.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
final String queue = context.getProperty(QUEUE).evaluateAttributeExpressions(flowFile).getValue().toLowerCase();
try {
cloudQueueClient = createCloudQueueClient(context, flowFile);
cloudQueue = cloudQueueClient.getQueueReference(queue);
final OperationContext operationContext = new OperationContext();
AzureStorageUtils.setProxy(operationContext, context);
cloudQueue.addMessage(message, ttl, delay, null, operationContext);
} catch (URISyntaxException | StorageException e) {
getLogger().error("Failed to write the message to Azure Queue Storage due to {}", new Object[]{e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
session.transfer(flowFile, REL_SUCCESS);
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, cloudQueue.getUri().toString(), transmissionMillis);
}
@Override
public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
final boolean ttlSet = validationContext.getProperty(TTL).isSet();
final boolean delaySet = validationContext.getProperty(VISIBILITY_DELAY).isSet();
final int ttl = validationContext.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS).intValue();
if (ttlSet) {
final int SEVEN_DAYS_TIMEPERIOD_IN_SECS = 604800; // i.e. 7 * 24 * 60 * 60
if (ttl > SEVEN_DAYS_TIMEPERIOD_IN_SECS) {
problems.add(new ValidationResult.Builder()
.subject(TTL.getDisplayName())
.valid(false)
.explanation(TTL.getDisplayName() + " exceeds the allowed limit of 7 days. Set a value less than 7 days")
.build());
}
}
if (delaySet) {
int delay = validationContext.getProperty(VISIBILITY_DELAY).asTimePeriod(TimeUnit.SECONDS).intValue();
if (delay > ttl || delay < 0) {
problems.add(new ValidationResult.Builder()
.subject(VISIBILITY_DELAY.getDisplayName())
.valid(false)
.explanation(VISIBILITY_DELAY.getDisplayName() + " should be greater than or equal to 0 and less than " + TTL.getDisplayName())
.build());
}
}
AzureStorageUtils.validateProxySpec(validationContext, problems);
return problems;
}
}

View File

@ -19,41 +19,20 @@ package org.apache.nifi.processors.azure.storage.utils;
import com.azure.core.http.ProxyOptions;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.proxy.SocksVersion;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
import org.apache.nifi.services.azure.storage.AzureStorageEmulatorCredentialsDetails;
import reactor.netty.http.client.HttpClient;
public final class AzureStorageUtils {
public static final String BLOCK = "Block";
public static final String PAGE = "Page";
public static final String STORAGE_ACCOUNT_NAME_PROPERTY_DESCRIPTOR_NAME = "storage-account-name";
public static final String STORAGE_ACCOUNT_KEY_PROPERTY_DESCRIPTOR_NAME = "storage-account-key";
public static final String STORAGE_SAS_TOKEN_PROPERTY_DESCRIPTOR_NAME = "storage-sas-token";
@ -148,17 +127,6 @@ public final class AzureStorageUtils {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("storage-credentials-service")
.displayName("Storage Credentials")
.description("The Controller Service used to obtain Azure Storage Credentials. Instead of the processor level properties, " +
"the credentials can be configured here through a common/shared controller service, which is the preferred way. " +
"The 'Lookup' version of the service can also be used to select the credentials dynamically at runtime " +
"based on a FlowFile attribute (if the processor has FlowFile input).")
.identifiesControllerService(AzureStorageCredentialsService.class)
.required(false)
.build();
public static final PropertyDescriptor MANAGED_IDENTITY_CLIENT_ID = new PropertyDescriptor.Builder()
.name("managed-identity-client-id")
.displayName("Managed Identity Client ID")
@ -204,92 +172,6 @@ public final class AzureStorageUtils {
// do not instantiate
}
public static CloudStorageAccount getCloudStorageAccount(final AzureStorageCredentialsDetails storageCredentialsDetails) throws URISyntaxException {
final CloudStorageAccount cloudStorageAccount;
if (storageCredentialsDetails instanceof AzureStorageEmulatorCredentialsDetails) {
AzureStorageEmulatorCredentialsDetails emulatorCredentials = (AzureStorageEmulatorCredentialsDetails) storageCredentialsDetails;
final String proxyUri = emulatorCredentials.getDevelopmentStorageProxyUri();
if (proxyUri != null) {
cloudStorageAccount = CloudStorageAccount.getDevelopmentStorageAccount(new URI(proxyUri));
} else {
cloudStorageAccount = CloudStorageAccount.getDevelopmentStorageAccount();
}
} else {
cloudStorageAccount = new CloudStorageAccount(
storageCredentialsDetails.getStorageCredentials(),
true,
storageCredentialsDetails.getStorageSuffix(),
storageCredentialsDetails.getStorageAccountName());
}
return cloudStorageAccount;
}
public static AzureStorageCredentialsDetails getStorageCredentialsDetails(PropertyContext context, FlowFile flowFile) {
final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap();
final AzureStorageCredentialsService storageCredentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService.class);
if (storageCredentialsService != null) {
return storageCredentialsService.getStorageCredentialsDetails(attributes);
} else {
return createStorageCredentialsDetails(context, attributes);
}
}
public static AzureStorageCredentialsDetails createStorageCredentialsDetails(PropertyContext context, Map<String, String> attributes) {
final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue();
final String storageSuffix = context.getProperty(ENDPOINT_SUFFIX).evaluateAttributeExpressions(attributes).getValue();
final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue();
final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
if (StringUtils.isBlank(accountName)) {
throw new IllegalArgumentException(String.format("'%s' must not be empty.", ACCOUNT_NAME.getDisplayName()));
}
StorageCredentials storageCredentials;
if (StringUtils.isNotBlank(accountKey)) {
storageCredentials = new StorageCredentialsAccountAndKey(accountName, accountKey);
} else if (StringUtils.isNotBlank(sasToken)) {
storageCredentials = new StorageCredentialsSharedAccessSignature(sasToken);
} else {
throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.", ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
}
return new AzureStorageCredentialsDetails(accountName, storageSuffix, storageCredentials);
}
public static Collection<ValidationResult> validateCredentialProperties(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final String storageCredentials = validationContext.getProperty(STORAGE_CREDENTIALS_SERVICE).getValue();
final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue();
final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue();
final String sasToken = validationContext.getProperty(PROP_SAS_TOKEN).getValue();
final String endpointSuffix = validationContext.getProperty(ENDPOINT_SUFFIX).getValue();
if (!((StringUtils.isNotBlank(storageCredentials) && StringUtils.isBlank(accountName) && StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken))
|| (StringUtils.isBlank(storageCredentials) && StringUtils.isNotBlank(accountName) && StringUtils.isNotBlank(accountKey) && StringUtils.isBlank(sasToken))
|| (StringUtils.isBlank(storageCredentials) && StringUtils.isNotBlank(accountName) && StringUtils.isBlank(accountKey) && StringUtils.isNotBlank(sasToken)))) {
results.add(new ValidationResult.Builder().subject("AzureStorageUtils Credentials")
.valid(false)
.explanation("either " + STORAGE_CREDENTIALS_SERVICE.getDisplayName()
+ ", or " + ACCOUNT_NAME.getDisplayName() + " with " + ACCOUNT_KEY.getDisplayName()
+ " or " + ACCOUNT_NAME.getDisplayName() + " with " + PROP_SAS_TOKEN.getDisplayName() + " must be specified")
.build());
}
if(StringUtils.isNotBlank(storageCredentials) && StringUtils.isNotBlank(endpointSuffix)) {
String errMsg = "Either " + STORAGE_CREDENTIALS_SERVICE.getDisplayName() + " or " + ENDPOINT_SUFFIX.getDisplayName()
+ " should be specified, not both.";
results.add(new ValidationResult.Builder().subject("AzureStorageUtils Credentials")
.explanation(errMsg)
.build());
}
return results;
}
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.SOCKS};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE
= ProxyConfiguration.createProxyConfigPropertyDescriptor(false, PROXY_SPECS);
@ -298,11 +180,6 @@ public final class AzureStorageUtils {
ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS);
}
public static void setProxy(final OperationContext operationContext, final ProcessContext processContext) {
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(processContext);
operationContext.setProxy(proxyConfig.createProxy());
}
/**
*
* Creates the {@link ProxyOptions proxy options} that {@link HttpClient} will use.

View File

@ -1,104 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
/**
* Implementation of AbstractControllerService interface
*
* @see AbstractControllerService
*/
@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" })
@CapabilityDescription("Defines credentials for Azure Storage processors. " +
"Uses Account Name with Account Key or Account Name with SAS Token.")
public class AzureStorageCredentialsControllerService extends AbstractControllerService implements AzureStorageCredentialsService {
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
.name(AzureStorageUtils.ACCOUNT_NAME.getName())
.displayName(AzureStorageUtils.ACCOUNT_NAME.getDisplayName())
.description(AzureStorageUtils.ACCOUNT_NAME_BASE_DESCRIPTION + AzureStorageUtils.ACCOUNT_NAME_SECURITY_DESCRIPTION)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.sensitive(true)
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections
.unmodifiableList(Arrays.asList(
ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.ENDPOINT_SUFFIX));
private ConfigurationContext context;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final String accountKey = validationContext.getProperty(AzureStorageUtils.ACCOUNT_KEY).getValue();
final String sasToken = validationContext.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).getValue();
if (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)) {
results.add(new ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
.valid(false)
.explanation("either " + AzureStorageUtils.ACCOUNT_KEY.getDisplayName() + " or " + AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName() + " is required")
.build());
} else if (StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) {
results.add(new ValidationResult.Builder().subject("AzureStorageCredentialsControllerService")
.valid(false)
.explanation("cannot set both " + AzureStorageUtils.ACCOUNT_KEY.getDisplayName() + " and " + AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName())
.build());
}
return results;
}
@OnEnabled
public void onEnabled(ConfigurationContext context) {
this.context = context;
}
@Override
public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) {
return AzureStorageUtils.createStorageCredentialsDetails(context, attributes);
}
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import java.util.Map;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" })
@CapabilityDescription("Provides an AzureStorageCredentialsService that can be used to dynamically select another AzureStorageCredentialsService. " +
"This service requires an attribute named 'azure.storage.credentials.name' to be passed in, and will throw an exception if the attribute is missing. " +
"The value of 'azure.storage.credentials.name' will be used to select the AzureStorageCredentialsService that has been registered with that name. " +
"This will allow multiple AzureStorageCredentialsServices to be defined and registered, and then selected dynamically at runtime by tagging flow files " +
"with the appropriate 'azure.storage.credentials.name' attribute.")
@DynamicProperty(name = "The name to register AzureStorageCredentialsService", value = "The AzureStorageCredentialsService",
description = "If '" + AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + "' attribute contains " +
"the name of the dynamic property, then the AzureStorageCredentialsService (registered in the value) will be selected.",
expressionLanguageScope = ExpressionLanguageScope.NONE)
public class AzureStorageCredentialsControllerServiceLookup
extends AbstractSingleAttributeBasedControllerServiceLookup<AzureStorageCredentialsService> implements AzureStorageCredentialsService {
public static final String AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE = "azure.storage.credentials.name";
@Override
protected String getLookupAttribute() {
return AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE;
}
@Override
public Class<AzureStorageCredentialsService> getServiceType() {
return AzureStorageCredentialsService.class;
}
@Override
public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) {
return lookupService(attributes).getStorageCredentialsDetails(attributes);
}
}

View File

@ -1,79 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.util.StandardValidators;
@Tags({ "azure", "microsoft", "emulator", "storage", "blob", "queue", "credentials" })
@CapabilityDescription("Defines credentials for Azure Storage processors that connects to Azurite emulator.")
public class AzureStorageEmulatorCredentialsControllerService extends AbstractControllerService implements AzureStorageCredentialsService {
public static final PropertyDescriptor DEVELOPMENT_STORAGE_PROXY_URI = new PropertyDescriptor.Builder()
.name("azurite-uri")
.displayName("Storage Emulator URI")
.description("URI to connect to Azure Storage Emulator (Azurite)")
.required(false)
.sensitive(false)
.addValidator(StandardValidators.URI_VALIDATOR)
.build();
private static final List<PropertyDescriptor> PROPERTIES =
Collections.unmodifiableList(Arrays.asList(DEVELOPMENT_STORAGE_PROXY_URI));
private String azuriteProxyUri;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
return results;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.azuriteProxyUri = context.getProperty(DEVELOPMENT_STORAGE_PROXY_URI).getValue();
}
public String getProxyUri() {
return azuriteProxyUri;
}
@Override
public AzureStorageCredentialsDetails getStorageCredentialsDetails(final Map<String, String> attributes) {
return new AzureStorageEmulatorCredentialsDetails(azuriteProxyUri);
}
}

View File

@ -1,28 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
public class AzureStorageEmulatorCredentialsDetails extends AzureStorageCredentialsDetails {
private String developmentStorageProxyUri;
public AzureStorageEmulatorCredentialsDetails(String developmentStorageProxyUri) {
this.developmentStorageProxyUri = developmentStorageProxyUri;
}
public String getDevelopmentStorageProxyUri() {
return developmentStorageProxyUri;
}
}

View File

@ -13,12 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.services.azure.eventhub.AzureEventHubRecordSink
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup
org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService
org.apache.nifi.services.azure.storage.ADLSCredentialsControllerServiceLookup
org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService
org.apache.nifi.services.azure.storage.AzureStorageEmulatorCredentialsControllerService
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup_v12
org.apache.nifi.services.azure.StandardAzureCredentialsControllerService

View File

@ -15,8 +15,6 @@
org.apache.nifi.processors.azure.eventhub.PutAzureEventHub
org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
org.apache.nifi.processors.azure.eventhub.ConsumeAzureEventHub
org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage
org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage

View File

@ -16,15 +16,10 @@
*/
package org.apache.nifi.processors.azure.storage;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.proxy.StandardProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
@ -115,29 +110,6 @@ public abstract class AbstractAzureStorageIT {
protected abstract Class<? extends Processor> getProcessorClass();
protected CloudStorageAccount getStorageAccount() throws Exception {
StorageCredentials storageCredentials = new StorageCredentialsAccountAndKey(getAccountName(), getAccountKey());
return new CloudStorageAccount(storageCredentials, true);
}
protected void configureCredentialsService() throws Exception {
runner.removeProperty(AzureStorageUtils.ACCOUNT_NAME);
runner.removeProperty(AzureStorageUtils.ACCOUNT_KEY);
AzureStorageCredentialsService credentialsService = new AzureStorageCredentialsControllerService();
runner.addControllerService("credentials-service", credentialsService);
runner.setProperty(credentialsService, AzureStorageUtils.ACCOUNT_NAME, getAccountName());
runner.setProperty(credentialsService, AzureStorageUtils.ACCOUNT_KEY, getAccountKey());
runner.assertValid(credentialsService);
runner.enableControllerService(credentialsService);
runner.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, credentialsService.getIdentifier());
}
protected void configureProxyService() throws InitializationException {
final StandardProxyConfigurationService proxyConfigurationService = new StandardProxyConfigurationService();
runner.addControllerService("proxy-configuration-service", proxyConfigurationService);

View File

@ -18,8 +18,6 @@ package org.apache.nifi.processors.azure.storage;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.google.common.collect.Sets;
import com.google.common.net.UrlEscapers;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@ -331,14 +329,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, destinationDirectory);
flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, fileName);
String urlEscapedDirectory = UrlEscapers.urlPathSegmentEscaper().escape(destinationDirectory);
String urlEscapedFileName = UrlEscapers.urlPathSegmentEscaper().escape(fileName);
String urlEscapedPathSeparator = UrlEscapers.urlPathSegmentEscaper().escape("/");
String primaryUri = StringUtils.isNotEmpty(destinationDirectory)
? String.format("https://%s.dfs.core.windows.net/%s/%s%s%s", getAccountName(), fileSystemName, urlEscapedDirectory, urlEscapedPathSeparator, urlEscapedFileName)
: String.format("https://%s.dfs.core.windows.net/%s/%s", getAccountName(), fileSystemName, urlEscapedFileName);
flowFile.assertAttributeEquals(ATTR_NAME_PRIMARY_URI, primaryUri);
flowFile.assertAttributeExists(ATTR_NAME_PRIMARY_URI);
flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, Integer.toString(fileData.length));
}
@ -368,7 +359,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
private void assertProvenanceEvents() {
Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.SEND);
Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(ProvenanceEventType.SEND);
Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
.map(ProvenanceEventRecord::getEventType)

View File

@ -1,68 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import org.apache.nifi.processors.azure.storage.AbstractAzureStorageIT;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.util.Iterator;
import java.util.UUID;
import static org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_QUEUE_ENDPOINT_SUFFIX;
public abstract class AbstractAzureQueueStorageIT extends AbstractAzureStorageIT {
protected static final String TEST_QUEUE_NAME_PREFIX = "nifi-test-queue";
protected CloudQueue cloudQueue;
@Override
protected String getDefaultEndpointSuffix() {
return DEFAULT_QUEUE_ENDPOINT_SUFFIX;
}
@BeforeEach
public void setUpAzureQueueStorageIT() throws Exception {
String queueName = String.format("%s-%s", TEST_QUEUE_NAME_PREFIX, UUID.randomUUID());
CloudQueueClient cloudQueueClient = getStorageAccount().createCloudQueueClient();
cloudQueue = cloudQueueClient.getQueueReference(queueName);
cloudQueue.createIfNotExists();
runner.setProperty(AbstractAzureQueueStorage.QUEUE, queueName);
}
@AfterEach
public void tearDownAzureQueueStorageIT() throws Exception {
cloudQueue.deleteIfExists();
}
protected int getMessageCount() throws Exception {
Iterator<CloudQueueMessage> retrievedMessages = cloudQueue.retrieveMessages(10, 1, null, null).iterator();
int count = 0;
while (retrievedMessages.hasNext()) {
retrievedMessages.next();
count++;
}
return count;
}
}

View File

@ -1,144 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class GetAzureQueueStorageIT extends AbstractAzureQueueStorageIT {
@Override
protected Class<? extends Processor> getProcessorClass() {
return GetAzureQueueStorage.class;
}
@BeforeEach
public void setUp() throws StorageException {
cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 1"), 604800, 0, null, null);
cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 2"), 604800, 0, null, null);
cloudQueue.addMessage(new CloudQueueMessage("Dummy Message 3"), 604800, 0, null, null);
}
@Test
public void testSimpleGet() throws Exception {
runner.assertValid();
runner.run(1);
assertResult(0);
}
@Test
public void testSimpleGetWithCredentialsService() throws Exception {
configureCredentialsService();
runner.assertValid();
runner.run(1);
assertResult(0);
}
@Test
public void testNotValidWithCredentialsServiceAndEndpointSuffix() throws Exception {
configureCredentialsService();
runner.setProperty(AzureStorageUtils.ENDPOINT_SUFFIX, "core.windows.net");
runner.assertNotValid();
}
@Test
public void testSimpleGetWithEL() throws Exception {
runner.setValidateExpressionUsage(true);
runner.setVariable("account.name", getAccountName());
runner.setVariable("account.key", getAccountKey());
runner.setVariable("queue.name", cloudQueue.getName());
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}");
runner.setProperty(GetAzureQueueStorage.QUEUE, "${queue.name}");
runner.assertValid();
runner.run(1);
assertResult(0);
}
@Test
public void testGetWithAutoDeleteFalse() throws Exception {
runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false");
runner.assertValid();
runner.run(1);
assertResult(3);
}
@Test
public void testGetWithVisibilityTimeout() throws Exception {
runner.setProperty(GetAzureQueueStorage.AUTO_DELETE, "false");
runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "1 secs");
runner.assertValid();
runner.run(1);
runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3);
assertEquals(0, getMessageCount());
Thread.sleep(1500);
assertEquals(3, getMessageCount());
}
@Test
public void testGetWithBatchSize() throws Exception {
runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "2");
runner.assertValid();
runner.run(1);
runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 2);
cloudQueue.downloadAttributes();
assertEquals(1, cloudQueue.getApproximateMessageCount());
runner.run(1);
runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3);
cloudQueue.downloadAttributes();
assertEquals(0, cloudQueue.getApproximateMessageCount());
}
private void assertResult(int expectedMessageCountInQueue) throws Exception {
runner.assertAllFlowFilesTransferred(GetAzureQueueStorage.REL_SUCCESS, 3);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(GetAzureQueueStorage.REL_SUCCESS);
int i = 1;
for (MockFlowFile mockFlowFile : mockFlowFiles) {
mockFlowFile.assertContentEquals("Dummy Message " + i++);
}
cloudQueue.downloadAttributes();
assertEquals(expectedMessageCountInQueue, cloudQueue.getApproximateMessageCount());
}
}

View File

@ -1,100 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class PutAzureQueueStorageIT extends AbstractAzureQueueStorageIT {
@Override
protected Class<? extends Processor> getProcessorClass() {
return PutAzureQueueStorage.class;
}
@Test
public void testSimplePut() {
runner.assertValid();
runner.enqueue("Dummy message");
runner.run(1);
runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
}
@Test
public void testSimplePutWithCredentialsService() throws Exception {
configureCredentialsService();
runner.assertValid();
runner.enqueue("Dummy message");
runner.run(1);
runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
}
@Test
public void testSimplePutWithEL() {
runner.setValidateExpressionUsage(true);
runner.setVariable("account.name", getAccountName());
runner.setVariable("account.key", getAccountKey());
runner.setVariable("queue.name", cloudQueue.getName());
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "${account.name}");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "${account.key}");
runner.setProperty(PutAzureQueueStorage.QUEUE, "${queue.name}");
runner.assertValid();
runner.enqueue("Dummy message");
runner.run(1);
runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
}
@Test
public void testPutWithTTL() throws Exception {
runner.setProperty(PutAzureQueueStorage.TTL, "2 secs");
runner.assertValid();
runner.enqueue("Dummy message");
runner.run(1);
runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
assertEquals(1, getMessageCount());
Thread.sleep(2400);
assertEquals(0, getMessageCount());
}
@Test
public void testPutWithVisibilityDelay() throws Exception {
runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "2 secs");
runner.assertValid();
runner.enqueue("Dummy message");
runner.run(1);
runner.assertAllFlowFilesTransferred(PutAzureQueueStorage.REL_SUCCESS, 1);
assertEquals(0, getMessageCount());
Thread.sleep(2400);
assertEquals(1, getMessageCount());
}
}

View File

@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestGetAzureQueueStorage {
private final TestRunner runner = TestRunners.newTestRunner(GetAzureQueueStorage.class);
@Test
public void testValidVisibilityTimeout() {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue");
runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "10 secs");
ProcessContext processContext = runner.getProcessContext();
Collection<ValidationResult> results = new HashSet<>();
if (processContext instanceof MockProcessContext) {
results = ((MockProcessContext) processContext).validate();
}
assertEquals(0, results.size());
}
@Test
public void testInvalidVisibilityTimeout() {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue");
runner.setProperty(GetAzureQueueStorage.BATCH_SIZE, "10");
runner.setProperty(GetAzureQueueStorage.VISIBILITY_TIMEOUT, "0 secs");
ProcessContext processContext = runner.getProcessContext();
Collection<ValidationResult> results = new HashSet<>();
if (processContext instanceof MockProcessContext) {
results = ((MockProcessContext) processContext).validate();
}
assertEquals(1, results.size());
Iterator<ValidationResult> iterator = results.iterator();
assertTrue(iterator.next().getExplanation().contains("should be greater than 0 secs"));
}
}

View File

@ -1,76 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestPutAzureQueueStorage {
private final TestRunner runner = TestRunners.newTestRunner(PutAzureQueueStorage.class);
@Test
public void testInvalidTTLAndVisibilityDelay() {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue");
runner.setProperty(PutAzureQueueStorage.TTL, "8 days");
runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "9 days");
ProcessContext processContext = runner.getProcessContext();
Collection<ValidationResult> results = new HashSet<>();
if (processContext instanceof MockProcessContext) {
results = ((MockProcessContext) processContext).validate();
}
assertEquals(2, results.size());
Iterator<ValidationResult> iterator = results.iterator();
assertTrue(iterator.next().toString().contains(PutAzureQueueStorage.TTL.getDisplayName() + " exceeds the allowed limit of 7 days. Set a value less than 7 days"));
assertTrue(iterator.next().toString().contains(PutAzureQueueStorage.VISIBILITY_DELAY.getDisplayName() + " should be greater than or equal to 0 and less than"));
}
@Test
public void testAllValidProperties() {
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "dummy-storage");
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "dummy-key");
runner.setProperty(GetAzureQueueStorage.QUEUE, "dummyqueue");
runner.setProperty(PutAzureQueueStorage.TTL, "6 days");
runner.setProperty(PutAzureQueueStorage.VISIBILITY_DELAY, "5 days");
ProcessContext processContext = runner.getProcessContext();
Collection<ValidationResult> results = new HashSet<>();
if (processContext instanceof MockProcessContext) {
results = ((MockProcessContext) processContext).validate();
}
assertEquals(0, results.size());
}
}

View File

@ -1,158 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockValidationContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collection;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestAzureStorageUtilsValidateCredentialProperties {
private static final String CREDENTIALS_SERVICE_VALUE = "CredentialsService";
private static final String ACCOUNT_NAME_VALUE = "AccountName";
private static final String ACCOUNT_KEY_VALUE = "AccountKey";
private static final String SAS_TOKEN_VALUE = "SasToken";
private MockProcessContext processContext;
private MockValidationContext validationContext;
@BeforeEach
public void setUp() {
Processor processor = new GetAzureQueueStorage();
processContext = new MockProcessContext(processor);
validationContext = new MockValidationContext(processContext);
}
@Test
public void testValidWithCredentialsService() {
configureCredentialsService();
Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
assertValid(result);
}
@Test
public void testValidWithAccountNameAndAccountKey() {
configureAccountName();
configureAccountKey();
Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
assertValid(result);
}
@Test
public void testValidWithAccountNameAndSasToken() {
configureAccountName();
configureSasToken();
Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
assertValid(result);
}
@Test
public void testNotValidBecauseNothingSpecified() {
Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
assertNotValid(result);
}
@Test
public void testNotValidBecauseBothCredentialsServiceAndAccountNameSpecified() {
configureCredentialsService();
configureAccountName();
Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
assertNotValid(result);
}
@Test
public void testNotValidBecauseBothCredentialsServiceAndAccountKeySpecified() {
configureCredentialsService();
configureAccountKey();
Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
assertNotValid(result);
}
@Test
public void testNotValidBecauseBothCredentialsServiceAndSasTokenSpecified() {
configureCredentialsService();
configureSasToken();
Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
assertNotValid(result);
}
@Test
public void testNotValidBecauseAccountNameSpecifiedWithoutAccountKeyOrSasToken() {
configureAccountName();
Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
assertNotValid(result);
}
@Test
public void testNotValidBecauseAccountNameSpecifiedWithBothAccountKeyAndSasToken() {
configureAccountName();
configureAccountKey();
configureSasToken();
Collection<ValidationResult> result = AzureStorageUtils.validateCredentialProperties(validationContext);
assertNotValid(result);
}
private void configureCredentialsService() {
processContext.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, CREDENTIALS_SERVICE_VALUE);
}
private void configureAccountName() {
processContext.setProperty(AzureStorageUtils.ACCOUNT_NAME, ACCOUNT_NAME_VALUE);
}
private void configureAccountKey() {
processContext.setProperty(AzureStorageUtils.ACCOUNT_KEY, ACCOUNT_KEY_VALUE);
}
private void configureSasToken() {
processContext.setProperty(AzureStorageUtils.PROP_SAS_TOKEN, SAS_TOKEN_VALUE);
}
private void assertValid(Collection<ValidationResult> result) {
assertTrue(result.isEmpty(), "There should be no validation error");
}
private void assertNotValid(Collection<ValidationResult> result) {
assertFalse(result.isEmpty(), "There should be validation error");
}
}

View File

@ -1,108 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class TestAzureStorageCredentialsControllerService {
private static final String ACCOUNT_NAME_VALUE = "AccountName";
private static final String ACCOUNT_KEY_VALUE = "AccountKey";
private static final String SAS_TOKEN_VALUE = "SasToken";
private TestRunner runner;
private AzureStorageCredentialsService credentialsService;
@BeforeEach
public void setUp() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
credentialsService = new AzureStorageCredentialsControllerService();
runner.addControllerService("credentials-service", credentialsService);
}
@Test
public void testValidWithAccountNameAndAccountKey() {
configureAccountName();
configureAccountKey();
runner.assertValid(credentialsService);
}
@Test
public void testNotValidWithEmptyEndpointSuffix() {
configureAccountName();
configureAccountKey();
runner.setProperty(credentialsService, AzureStorageUtils.ENDPOINT_SUFFIX, "");
runner.assertNotValid(credentialsService);
}
@Test
public void testNotValidWithWhitespaceEndpointSuffix() {
configureAccountName();
configureAccountKey();
runner.setProperty(credentialsService, AzureStorageUtils.ENDPOINT_SUFFIX, " ");
runner.assertNotValid(credentialsService);
}
@Test
public void testValidWithAccountNameAndSasToken() {
configureAccountName();
configureSasToken();
runner.assertValid(credentialsService);
}
@Test
public void testNotValidBecauseAccountNameMissing() {
configureAccountKey();
runner.assertNotValid(credentialsService);
}
@Test
public void testNotValidBecauseAccountKeyAndSasTokenMissing() {
configureAccountName();
runner.assertNotValid(credentialsService);
}
@Test
public void testNotValidBecauseBothAccountKeyAndSasTokenSpecified() {
configureAccountName();
configureAccountKey();
configureSasToken();
runner.assertNotValid(credentialsService);
}
private void configureAccountName() {
runner.setProperty(credentialsService, AzureStorageCredentialsControllerService.ACCOUNT_NAME, ACCOUNT_NAME_VALUE);
}
private void configureAccountKey() {
runner.setProperty(credentialsService, AzureStorageUtils.ACCOUNT_KEY, ACCOUNT_KEY_VALUE);
}
private void configureSasToken() {
runner.setProperty(credentialsService, AzureStorageUtils.PROP_SAS_TOKEN, SAS_TOKEN_VALUE);
}
}

View File

@ -1,148 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestAzureStorageCredentialsControllerServiceLookup {
private MockAzureStorageCredentialsService serviceA;
private MockAzureStorageCredentialsService serviceB;
private AzureStorageCredentialsControllerServiceLookup lookupService;
private TestRunner runner;
@BeforeEach
public void setup() throws InitializationException {
serviceA = new MockAzureStorageCredentialsService(
new AzureStorageCredentialsDetails("Account_A", "core.windows.net", null));
serviceB = new MockAzureStorageCredentialsService(new AzureStorageCredentialsDetails("Account_B", null, null));
lookupService = new AzureStorageCredentialsControllerServiceLookup();
runner = TestRunners.newTestRunner(NoOpProcessor.class);
final String serviceAIdentifier = "service-a";
runner.addControllerService(serviceAIdentifier, serviceA);
final String serviceBIdentifier = "service-b";
runner.addControllerService(serviceBIdentifier, serviceB);
runner.addControllerService("lookup-service", lookupService);
runner.setProperty(lookupService, "a", serviceAIdentifier);
runner.setProperty(lookupService, "b", serviceBIdentifier);
runner.enableControllerService(serviceA);
runner.enableControllerService(serviceB);
runner.enableControllerService(lookupService);
}
@Test
public void testLookupServiceA() {
final Map<String,String> attributes = new HashMap<>();
attributes.put(AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "a");
final AzureStorageCredentialsDetails storageCredentialsDetails = lookupService.getStorageCredentialsDetails(attributes);
assertNotNull(storageCredentialsDetails);
assertEquals("Account_A", storageCredentialsDetails.getStorageAccountName());
assertEquals("core.windows.net", storageCredentialsDetails.getStorageSuffix());
}
@Test
public void testLookupServiceB() {
final Map<String, String> attributes = new HashMap<>();
attributes.put(AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "b");
final AzureStorageCredentialsDetails storageCredentialsDetails = lookupService
.getStorageCredentialsDetails(attributes);
assertNotNull(storageCredentialsDetails);
assertEquals("Account_B", storageCredentialsDetails.getStorageAccountName());
assertNull(storageCredentialsDetails.getStorageSuffix());
}
@Test
public void testLookupMissingCredentialsNameAttribute() {
final Map<String, String> attributes = new HashMap<>();
assertThrows(ProcessException.class, () -> lookupService.getStorageCredentialsDetails(attributes));
}
@Test
public void testLookupWithCredentialsNameThatDoesNotExist() {
final Map<String, String> attributes = new HashMap<>();
attributes.put(AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE,
"DOES-NOT-EXIST");
assertThrows(ProcessException.class, () -> lookupService.getStorageCredentialsDetails(attributes));
}
@Test
public void testCustomValidateAtLeaseOneServiceDefined() throws InitializationException {
// enable lookup service with no services registered, verify not valid
runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.addControllerService("lookup-service", lookupService);
runner.assertNotValid(lookupService);
final String serviceAIdentifier = "service-a";
runner.addControllerService(serviceAIdentifier, serviceA);
// register a service and now verify valid
runner.setProperty(lookupService, "a", serviceAIdentifier);
runner.enableControllerService(lookupService);
runner.assertValid(lookupService);
}
@Test
public void testCustomValidateSelfReferenceNotAllowed() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.addControllerService("lookup-service", lookupService);
runner.setProperty(lookupService, "lookup-service", "lookup-service");
runner.assertNotValid(lookupService);
}
/**
* A mock AzureStorageCredentialsService that will always return the passed in
* AzureStorageCredentialsDetails.
*/
private static class MockAzureStorageCredentialsService extends AbstractControllerService
implements AzureStorageCredentialsService {
private AzureStorageCredentialsDetails storageCredentialsDetails;
MockAzureStorageCredentialsService(AzureStorageCredentialsDetails storageCredentialsDetails) {
this.storageCredentialsDetails = storageCredentialsDetails;
}
@Override
public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) {
return storageCredentialsDetails;
}
}
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class TestAzureStorageEmulatorCredentialsControllerService {
private static final String TEST_ENVIRONMENT_URI = "http://127.0.0.1";
private TestRunner runner;
private AzureStorageCredentialsService credentialsService;
@BeforeEach
public void setUp() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
credentialsService = new AzureStorageEmulatorCredentialsControllerService();
runner.addControllerService("credentials-service", credentialsService);
}
@Test
public void testValidWithProxyURI() {
configureProxyURI();
runner.assertValid(credentialsService);
}
@Test
public void testValidWithoutProxyURI() {
runner.assertValid(credentialsService);
}
private void configureProxyURI() {
runner.setProperty(credentialsService, AzureStorageEmulatorCredentialsControllerService.DEVELOPMENT_STORAGE_PROXY_URI, TEST_ENVIRONMENT_URI);
}
}

View File

@ -24,10 +24,6 @@
<artifactId>nifi-azure-services-api</artifactId>
<dependencies>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import com.microsoft.azure.storage.StorageCredentials;
public class AzureStorageCredentialsDetails {
private final String storageAccountName;
private final String storageSuffix;
private final StorageCredentials storageCredentials;
public AzureStorageCredentialsDetails() {
this(null, null, null);
}
@Deprecated
public AzureStorageCredentialsDetails(String storageAccountName, StorageCredentials storageCredentials) {
this(storageAccountName, null, storageCredentials);
}
public AzureStorageCredentialsDetails(String storageAccountName, String storageSuffix, StorageCredentials storageCredentials) {
this.storageAccountName = storageAccountName;
this.storageSuffix = storageSuffix;
this.storageCredentials = storageCredentials;
}
public String getStorageAccountName() {
return storageAccountName;
}
public String getStorageSuffix() {
return storageSuffix;
}
public StorageCredentials getStorageCredentials() {
return storageCredentials;
}
}

View File

@ -1,35 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import org.apache.nifi.controller.ControllerService;
import java.util.Map;
/**
* AzureStorageCredentialsService interface to support getting Storage Account Name and Storage Credentials
* used for instantiating Azure Storage clients.
*/
public interface AzureStorageCredentialsService extends ControllerService {
/**
* Get AzureStorageCredentialsDetails object which contains the Storage Account Name and the Storage Credentials
* @param attributes FlowFile attributes (typically)
* @return AzureStorageCredentialsDetails object
*/
AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes);
}

View File

@ -27,7 +27,6 @@
<properties>
<azure.sdk.bom.version>1.2.13</azure.sdk.bom.version>
<microsoft.azure-storage.version>8.6.6</microsoft.azure-storage.version>
<msal4j.version>1.13.8</msal4j.version>
<qpid.proton.version>0.34.1</qpid.proton.version>
</properties>
@ -51,11 +50,6 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>${microsoft.azure-storage.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>