NIFI-7434: Add endpoint suffix to Azure storage processors

This closes #4265.

Signed-off-by: Joey Frazee <jfrazee@apache.org>
This commit is contained in:
sjyang18 2020-03-27 20:07:41 -07:00 committed by Joey Frazee
parent 3c757b6ba8
commit 73008451d4
11 changed files with 161 additions and 74 deletions

View File

@ -60,6 +60,7 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.ENDPOINT_SUFFIX,
BLOB,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));

View File

@ -16,6 +16,16 @@
*/
package org.apache.nifi.processors.azure.storage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
@ -25,6 +35,7 @@ import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -50,16 +61,6 @@ import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@PrimaryNodeOnly
@TriggerSerially
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@ -100,6 +101,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.ENDPOINT_SUFFIX,
PROP_PREFIX,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
ListedEntityTracker.TRACKING_STATE_CACHE,

View File

@ -16,8 +16,16 @@
*/
package org.apache.nifi.processors.azure.storage.queue;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -30,13 +38,6 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
@ -67,7 +68,13 @@ public abstract class AbstractAzureQueueStorage extends AbstractProcessor {
protected final CloudQueueClient createCloudQueueClient(final ProcessContext context, final FlowFile flowFile) throws URISyntaxException {
final AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(context, flowFile);
final CloudStorageAccount cloudStorageAccount = new CloudStorageAccount(storageCredentialsDetails.getStorageCredentials(), true, null, storageCredentialsDetails.getStorageAccountName());
final CloudStorageAccount cloudStorageAccount =
new CloudStorageAccount(
storageCredentialsDetails.getStorageCredentials(),
true,
storageCredentialsDetails.getStorageSuffix(),
storageCredentialsDetails.getStorageAccountName()
);
final CloudQueueClient cloudQueueClient = cloudStorageAccount.createCloudQueueClient();
return cloudQueueClient;

View File

@ -16,11 +16,23 @@
*/
package org.apache.nifi.processors.azure.storage.queue;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -39,17 +51,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Collections;
import java.util.Arrays;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
@SeeAlso({PutAzureQueueStorage.class})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"})
@ -94,7 +95,7 @@ public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
.build();
private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, AzureStorageUtils.ENDPOINT_SUFFIX,
QUEUE, AUTO_DELETE, BATCH_SIZE, VISIBILITY_TIMEOUT, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
@Override

View File

@ -16,11 +16,21 @@
*/
package org.apache.nifi.processors.azure.storage.queue;
import java.io.ByteArrayOutputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueue;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@ -35,15 +45,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.io.ByteArrayOutputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@SeeAlso({GetAzureQueueStorage.class})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({ "azure", "microsoft", "cloud", "storage", "queue", "enqueue" })
@ -70,7 +71,7 @@ public class PutAzureQueueStorage extends AbstractAzureQueueStorage {
.build();
private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, AzureStorageUtils.ENDPOINT_SUFFIX,
TTL, QUEUE, VISIBILITY_DELAY, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
@Override

View File

@ -16,12 +16,20 @@
*/
package org.apache.nifi.processors.azure.storage.utils;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@ -37,13 +45,6 @@ import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public final class AzureStorageUtils {
public static final String BLOCK = "Block";
public static final String PAGE = "Page";
@ -85,6 +86,21 @@ public final class AzureStorageUtils {
.sensitive(true)
.build();
public static final PropertyDescriptor ENDPOINT_SUFFIX = new PropertyDescriptor.Builder()
.name("storage-endpoint-suffix")
.displayName("Common Storage Account Endpoint Suffix")
.description(
"Storage accounts in public Azure always use a common FQDN suffix. " +
"Override this endpoint suffix with a different suffix in certain circumsances (like Azure Stack or non-public Azure regions). " +
"The preferred way is to configure them through a controller service specified in the Storage Credentials property. " +
"The controller service can provide a common/shared configuration for multiple/all Azure processors. Furthermore, the credentials " +
"can also be looked up dynamically with the 'Lookup' version of the service.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.sensitive(false)
.build();
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder()
.name("container-name")
.displayName("Container Name")
@ -131,7 +147,11 @@ public final class AzureStorageUtils {
*/
public static CloudBlobClient createCloudBlobClient(ProcessContext context, ComponentLog logger, FlowFile flowFile) throws URISyntaxException {
final AzureStorageCredentialsDetails storageCredentialsDetails = getStorageCredentialsDetails(context, flowFile);
final CloudStorageAccount cloudStorageAccount = new CloudStorageAccount(storageCredentialsDetails.getStorageCredentials(), true, null, storageCredentialsDetails.getStorageAccountName());
final CloudStorageAccount cloudStorageAccount = new CloudStorageAccount(
storageCredentialsDetails.getStorageCredentials(),
true,
storageCredentialsDetails.getStorageSuffix(),
storageCredentialsDetails.getStorageAccountName());
final CloudBlobClient cloudBlobClient = cloudStorageAccount.createCloudBlobClient();
return cloudBlobClient;
@ -151,6 +171,7 @@ public final class AzureStorageUtils {
public static AzureStorageCredentialsDetails createStorageCredentialsDetails(PropertyContext context, Map<String, String> attributes) {
final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue();
final String storageSuffix = context.getProperty(ENDPOINT_SUFFIX).evaluateAttributeExpressions(attributes).getValue();
final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue();
final String sasToken = context.getProperty(PROP_SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
@ -168,7 +189,7 @@ public final class AzureStorageUtils {
throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.", ACCOUNT_KEY.getDisplayName(), PROP_SAS_TOKEN.getDisplayName()));
}
return new AzureStorageCredentialsDetails(accountName, storageCredentials);
return new AzureStorageCredentialsDetails(accountName, storageSuffix, storageCredentials);
}
public static Collection<ValidationResult> validateCredentialProperties(ValidationContext validationContext) {
@ -178,6 +199,7 @@ public final class AzureStorageUtils {
final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue();
final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue();
final String sasToken = validationContext.getProperty(PROP_SAS_TOKEN).getValue();
final String endpointSuffix = validationContext.getProperty(ENDPOINT_SUFFIX).getValue();
if (!((StringUtils.isNotBlank(storageCredentials) && StringUtils.isBlank(accountName) && StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken))
|| (StringUtils.isBlank(storageCredentials) && StringUtils.isNotBlank(accountName) && StringUtils.isNotBlank(accountKey) && StringUtils.isBlank(sasToken))
@ -190,6 +212,14 @@ public final class AzureStorageUtils {
.build());
}
if(StringUtils.isNotBlank(storageCredentials) && StringUtils.isNotBlank(endpointSuffix)) {
String errMsg = "Either " + STORAGE_CREDENTIALS_SERVICE.getDisplayName() + " or " + ENDPOINT_SUFFIX.getDisplayName()
+ " should be specified, not both.";
results.add(new ValidationResult.Builder().subject("AzureStorageUtils Credentials")
.explanation(errMsg)
.build());
}
return results;
}

View File

@ -16,6 +16,13 @@
*/
package org.apache.nifi.services.azure.storage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@ -29,13 +36,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Implementation of AbstractControllerService interface
*
@ -60,7 +60,8 @@ public class AzureStorageCredentialsControllerService extends AbstractController
.unmodifiableList(Arrays.asList(
ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
AzureStorageUtils.PROP_SAS_TOKEN));
AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.ENDPOINT_SUFFIX));
private ConfigurationContext context;

View File

@ -16,8 +16,11 @@
*/
package org.apache.nifi.processors.azure.storage.queue;
import java.util.List;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.queue.CloudQueueMessage;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockFlowFile;
@ -25,8 +28,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class GetAzureQueueStorageIT extends AbstractAzureQueueStorageIT {
@Override
@ -59,6 +60,14 @@ public class GetAzureQueueStorageIT extends AbstractAzureQueueStorageIT {
assertResult(0);
}
@Test
public void testNotValidWithCredentialsServiceAndEndpointSuffix() throws Exception {
configureCredentialsService();
runner.setProperty(AzureStorageUtils.ENDPOINT_SUFFIX, "core.windows.net");
runner.assertNotValid();
}
@Test
public void testSimpleGetWithEL() throws Exception {
runner.setValidateExpressionUsage(true);

View File

@ -48,6 +48,21 @@ public class TestAzureStorageCredentialsControllerService {
runner.assertValid(credentialsService);
}
@Test
public void testNotValidWithEmptyEndpointSuffix() {
configureAccountName();
configureAccountKey();
runner.setProperty(credentialsService, AzureStorageUtils.ENDPOINT_SUFFIX, "");
runner.assertNotValid(credentialsService);
}
@Test
public void testNotValidWithWhitespaceEndpointSuffix() {
configureAccountName();
configureAccountKey();
runner.setProperty(credentialsService, AzureStorageUtils.ENDPOINT_SUFFIX, " ");
runner.assertNotValid(credentialsService);
}
@Test
public void testValidWithAccountNameAndSasToken() {
configureAccountName();

View File

@ -16,6 +16,13 @@
*/
package org.apache.nifi.services.azure.storage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
@ -25,12 +32,6 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestAzureStorageCredentialsControllerServiceLookup {
private MockAzureStorageCredentialsService serviceA;
@ -41,8 +42,9 @@ public class TestAzureStorageCredentialsControllerServiceLookup {
@Before
public void setup() throws InitializationException {
serviceA = new MockAzureStorageCredentialsService(new AzureStorageCredentialsDetails("Account_A", null));
serviceB = new MockAzureStorageCredentialsService(new AzureStorageCredentialsDetails("Account_B", null));
serviceA = new MockAzureStorageCredentialsService(
new AzureStorageCredentialsDetails("Account_A", "core.windows.net", null));
serviceB = new MockAzureStorageCredentialsService(new AzureStorageCredentialsDetails("Account_B", null, null));
lookupService = new AzureStorageCredentialsControllerServiceLookup();
@ -71,28 +73,32 @@ public class TestAzureStorageCredentialsControllerServiceLookup {
final AzureStorageCredentialsDetails storageCredentialsDetails = lookupService.getStorageCredentialsDetails(attributes);
assertNotNull(storageCredentialsDetails);
assertEquals("Account_A", storageCredentialsDetails.getStorageAccountName());
assertEquals("core.windows.net", storageCredentialsDetails.getStorageSuffix());
}
@Test
public void testLookupServiceB() {
final Map<String,String> attributes = new HashMap<>();
final Map<String, String> attributes = new HashMap<>();
attributes.put(AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "b");
final AzureStorageCredentialsDetails storageCredentialsDetails = lookupService.getStorageCredentialsDetails(attributes);
final AzureStorageCredentialsDetails storageCredentialsDetails = lookupService
.getStorageCredentialsDetails(attributes);
assertNotNull(storageCredentialsDetails);
assertEquals("Account_B", storageCredentialsDetails.getStorageAccountName());
assertNull(storageCredentialsDetails.getStorageSuffix());
}
@Test(expected = ProcessException.class)
public void testLookupMissingCredentialsNameAttribute() {
final Map<String,String> attributes = new HashMap<>();
final Map<String, String> attributes = new HashMap<>();
lookupService.getStorageCredentialsDetails(attributes);
}
@Test(expected = ProcessException.class)
public void testLookupWithCredentialsNameThatDoesNotExist() {
final Map<String,String> attributes = new HashMap<>();
attributes.put(AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "DOES-NOT-EXIST");
final Map<String, String> attributes = new HashMap<>();
attributes.put(AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE,
"DOES-NOT-EXIST");
lookupService.getStorageCredentialsDetails(attributes);
}
@ -121,9 +127,11 @@ public class TestAzureStorageCredentialsControllerServiceLookup {
}
/**
* A mock AzureStorageCredentialsService that will always return the passed in AzureStorageCredentialsDetails.
* A mock AzureStorageCredentialsService that will always return the passed in
* AzureStorageCredentialsDetails.
*/
private static class MockAzureStorageCredentialsService extends AbstractControllerService implements AzureStorageCredentialsService {
private static class MockAzureStorageCredentialsService extends AbstractControllerService
implements AzureStorageCredentialsService {
private AzureStorageCredentialsDetails storageCredentialsDetails;

View File

@ -22,10 +22,18 @@ public class AzureStorageCredentialsDetails {
private final String storageAccountName;
private final String storageSuffix;
private final StorageCredentials storageCredentials;
@Deprecated
public AzureStorageCredentialsDetails(String storageAccountName, StorageCredentials storageCredentials) {
this(storageAccountName, null, storageCredentials);
}
public AzureStorageCredentialsDetails(String storageAccountName, String storageSuffix, StorageCredentials storageCredentials) {
this.storageAccountName = storageAccountName;
this.storageSuffix = storageSuffix;
this.storageCredentials = storageCredentials;
}
@ -33,6 +41,10 @@ public class AzureStorageCredentialsDetails {
return storageAccountName;
}
public String getStorageSuffix() {
return storageSuffix;
}
public StorageCredentials getStorageCredentials() {
return storageCredentials;
}