mirror of https://github.com/apache/nifi.git
NIFI-4005: Azure Blob Storage SAS support, incorporating review comments. This closes #2353
- Renamed Azure to AzureStorageUtils. - Fixed whitespacing in property description. - Renamed SAS String to SAS Token.
This commit is contained in:
parent
17ddaf6be0
commit
1ee8d16a21
|
@ -22,7 +22,7 @@ import org.apache.nifi.components.ValidationResult;
|
|||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.azure.storage.utils.Azure;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -40,10 +40,10 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
|
|||
|
||||
private static final List<PropertyDescriptor> PROPERTIES = Collections
|
||||
.unmodifiableList(Arrays.asList(
|
||||
Azure.CONTAINER,
|
||||
Azure.PROP_SAS_TOKEN,
|
||||
Azure.ACCOUNT_NAME,
|
||||
Azure.ACCOUNT_KEY,
|
||||
AzureStorageUtils.CONTAINER,
|
||||
AzureStorageUtils.PROP_SAS_TOKEN,
|
||||
AzureStorageUtils.ACCOUNT_NAME,
|
||||
AzureStorageUtils.ACCOUNT_KEY,
|
||||
BLOB));
|
||||
|
||||
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(
|
||||
|
@ -58,7 +58,7 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
return Azure.validateCredentialProperties(validationContext);
|
||||
return AzureStorageUtils.validateCredentialProperties(validationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.nifi.processor.ProcessContext;
|
|||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
|
||||
import org.apache.nifi.processors.azure.storage.utils.Azure;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
|
@ -61,12 +61,12 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||
|
||||
final long startNanos = System.nanoTime();
|
||||
|
||||
String containerName = context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
|
||||
String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
|
||||
String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
AtomicReference<Exception> storedException = new AtomicReference<>();
|
||||
try {
|
||||
CloudBlobClient blobClient = Azure.createCloudBlobClient(context, getLogger());
|
||||
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger());
|
||||
CloudBlobContainer container = blobClient.getContainerReference(containerName);
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
|
|
|
@ -41,7 +41,7 @@ import org.apache.nifi.components.state.Scope;
|
|||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processor.util.list.AbstractListProcessor;
|
||||
import org.apache.nifi.processors.azure.storage.utils.Azure;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
|
||||
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
|
||||
|
||||
|
@ -83,10 +83,10 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
|
|||
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
|
||||
Azure.CONTAINER,
|
||||
Azure.PROP_SAS_TOKEN,
|
||||
Azure.ACCOUNT_NAME,
|
||||
Azure.ACCOUNT_KEY,
|
||||
AzureStorageUtils.CONTAINER,
|
||||
AzureStorageUtils.PROP_SAS_TOKEN,
|
||||
AzureStorageUtils.ACCOUNT_NAME,
|
||||
AzureStorageUtils.ACCOUNT_KEY,
|
||||
PROP_PREFIX));
|
||||
|
||||
@Override
|
||||
|
@ -96,7 +96,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
|
|||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
return Azure.validateCredentialProperties(validationContext);
|
||||
return AzureStorageUtils.validateCredentialProperties(validationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -117,16 +117,16 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
|
|||
|
||||
@Override
|
||||
protected String getPath(final ProcessContext context) {
|
||||
return context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions().getValue();
|
||||
return context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
|
||||
// re-list if configuration changed, but not when security keys are rolled (not included in the condition)
|
||||
return PROP_PREFIX.equals(property)
|
||||
|| Azure.ACCOUNT_NAME.equals(property)
|
||||
|| Azure.CONTAINER.equals(property)
|
||||
|| Azure.PROP_SAS_TOKEN.equals(property);
|
||||
|| AzureStorageUtils.ACCOUNT_NAME.equals(property)
|
||||
|| AzureStorageUtils.CONTAINER.equals(property)
|
||||
|| AzureStorageUtils.PROP_SAS_TOKEN.equals(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -136,14 +136,14 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
|
|||
|
||||
@Override
|
||||
protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
|
||||
String containerName = context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions().getValue();
|
||||
String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
|
||||
String prefix = context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
|
||||
if (prefix == null) {
|
||||
prefix = "";
|
||||
}
|
||||
final List<BlobInfo> listing = new ArrayList<>();
|
||||
try {
|
||||
CloudBlobClient blobClient = Azure.createCloudBlobClient(context, getLogger());
|
||||
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger());
|
||||
CloudBlobContainer container = blobClient.getContainerReference(containerName);
|
||||
|
||||
for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) {
|
||||
|
@ -165,9 +165,9 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
|
|||
}
|
||||
|
||||
if (blob instanceof CloudBlockBlob) {
|
||||
builder.blobType(Azure.BLOCK);
|
||||
builder.blobType(AzureStorageUtils.BLOCK);
|
||||
} else {
|
||||
builder.blobType(Azure.PAGE);
|
||||
builder.blobType(AzureStorageUtils.PAGE);
|
||||
}
|
||||
listing.add(builder.build());
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.nifi.processor.ProcessContext;
|
|||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
|
||||
import org.apache.nifi.processors.azure.storage.utils.Azure;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||
|
@ -66,13 +66,13 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||
|
||||
final long startNanos = System.nanoTime();
|
||||
|
||||
String containerName = context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
|
||||
String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
AtomicReference<Exception> storedException = new AtomicReference<>();
|
||||
try {
|
||||
CloudBlobClient blobClient = Azure.createCloudBlobClient(context, getLogger());
|
||||
CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger());
|
||||
CloudBlobContainer container = blobClient.getContainerReference(containerName);
|
||||
|
||||
CloudBlob blob = container.getBlockBlobReference(blobPath);
|
||||
|
|
|
@ -35,7 +35,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public final class Azure {
|
||||
public final class AzureStorageUtils {
|
||||
public static final String BLOCK = "Block";
|
||||
public static final String PAGE = "Page";
|
||||
|
||||
|
@ -61,8 +61,9 @@ public final class Azure {
|
|||
.description("Name of the Azure storage container").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
|
||||
|
||||
public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder()
|
||||
.name("SAS String")
|
||||
.description("Shared Access Signature string, including the leading '?'. Specify either SAS (recommended) or Account Key")
|
||||
.name("storage-sas-token")
|
||||
.displayName("SAS Token")
|
||||
.description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.sensitive(true)
|
||||
|
@ -73,25 +74,25 @@ public final class Azure {
|
|||
public static final String FORMAT_BLOB_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
|
||||
public static final String FORMAT_BASE_URI = "https://%s.blob.core.windows.net";
|
||||
|
||||
private Azure() {
|
||||
private AzureStorageUtils() {
|
||||
// do not instantiate
|
||||
}
|
||||
|
||||
public static CloudBlobClient createCloudBlobClient(ProcessContext context, ComponentLog logger) {
|
||||
final String accountName = context.getProperty(Azure.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
|
||||
final String accountKey = context.getProperty(Azure.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
|
||||
final String sasToken = context.getProperty(Azure.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
|
||||
final String accountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
|
||||
final String accountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
|
||||
final String sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
|
||||
|
||||
CloudBlobClient cloudBlobClient;
|
||||
|
||||
try {
|
||||
// sas token and acct name/key have different ways of creating a secure connection (e.g. new StorageCredentialsAccountAndKey didn't work)
|
||||
if (StringUtils.isNotBlank(sasToken)) {
|
||||
String storageConnectionString = String.format(Azure.FORMAT_BASE_URI, accountName);
|
||||
String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BASE_URI, accountName);
|
||||
StorageCredentials creds = new StorageCredentialsSharedAccessSignature(sasToken);
|
||||
cloudBlobClient = new CloudBlobClient(new URI(storageConnectionString), creds);
|
||||
} else {
|
||||
String blobConnString = String.format(Azure.FORMAT_BLOB_CONNECTION_STRING, accountName, accountKey);
|
||||
String blobConnString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, accountName, accountKey);
|
||||
CloudStorageAccount storageAccount = CloudStorageAccount.parse(blobConnString);
|
||||
cloudBlobClient = storageAccount.createCloudBlobClient();
|
||||
}
|
||||
|
@ -113,7 +114,7 @@ public final class Azure {
|
|||
String acctName = validationContext.getProperty(ACCOUNT_KEY).getValue();
|
||||
if ((StringUtils.isBlank(sasToken) && StringUtils.isBlank(acctName))
|
||||
|| (StringUtils.isNotBlank(sasToken) && StringUtils.isNotBlank(acctName))) {
|
||||
results.add(new ValidationResult.Builder().subject("Azure Credentials")
|
||||
results.add(new ValidationResult.Builder().subject("AzureStorageUtils Credentials")
|
||||
.valid(false)
|
||||
.explanation("either Azure Account Key or Shared Access Signature required, but not both")
|
||||
.build());
|
|
@ -25,7 +25,7 @@ import java.net.URISyntaxException;
|
|||
import java.security.InvalidKeyException;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.nifi.processors.azure.storage.utils.Azure;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
import org.apache.nifi.util.file.FileUtils;
|
||||
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
|
@ -67,7 +67,7 @@ class AzureTestUtil {
|
|||
}
|
||||
|
||||
static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException {
|
||||
String storageConnectionString = String.format(Azure.FORMAT_BLOB_CONNECTION_STRING, getAccountName(), getAccountKey());
|
||||
String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, getAccountName(), getAccountKey());
|
||||
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
|
||||
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
|
||||
return blobClient.getContainerReference(containerName);
|
||||
|
|
|
@ -27,7 +27,7 @@ import java.util.Map;
|
|||
import java.util.UUID;
|
||||
|
||||
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
|
||||
import org.apache.nifi.processors.azure.storage.utils.Azure;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
@ -55,15 +55,15 @@ public class ITFetchAzureBlobStorage {
|
|||
try {
|
||||
runner.setValidateExpressionUsage(true);
|
||||
|
||||
runner.setProperty(Azure.ACCOUNT_NAME, AzureTestUtil.getAccountName());
|
||||
runner.setProperty(Azure.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
|
||||
runner.setProperty(Azure.CONTAINER, containerName);
|
||||
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
|
||||
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
|
||||
runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
|
||||
runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}");
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("azure.primaryUri", "https://" + AzureTestUtil.getAccountName() + ".blob.core.windows.net/" + containerName + "/" + AzureTestUtil.TEST_BLOB_NAME);
|
||||
attributes.put("azure.blobname", AzureTestUtil.TEST_BLOB_NAME);
|
||||
attributes.put("azure.blobtype", Azure.BLOCK);
|
||||
attributes.put("azure.blobtype", AzureStorageUtils.BLOCK);
|
||||
runner.enqueue(new byte[0], attributes);
|
||||
runner.run();
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.net.URISyntaxException;
|
|||
import java.security.InvalidKeyException;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.nifi.processors.azure.storage.utils.Azure;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
@ -49,9 +49,9 @@ public class ITListAzureBlobStorage {
|
|||
final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage());
|
||||
|
||||
try {
|
||||
runner.setProperty(Azure.ACCOUNT_NAME, AzureTestUtil.getAccountName());
|
||||
runner.setProperty(Azure.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
|
||||
runner.setProperty(Azure.CONTAINER, containerName);
|
||||
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
|
||||
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
|
||||
runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
|
||||
|
||||
// requires multiple runs to deal with List processor checking
|
||||
runner.run(3);
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.security.InvalidKeyException;
|
|||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.nifi.processors.azure.storage.utils.Azure;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
@ -44,9 +44,9 @@ public class ITPutAzureStorageBlob {
|
|||
try {
|
||||
runner.setValidateExpressionUsage(true);
|
||||
|
||||
runner.setProperty(Azure.ACCOUNT_NAME, AzureTestUtil.getAccountName());
|
||||
runner.setProperty(Azure.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
|
||||
runner.setProperty(Azure.CONTAINER, containerName);
|
||||
runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, AzureTestUtil.getAccountName());
|
||||
runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
|
||||
runner.setProperty(AzureStorageUtils.CONTAINER, containerName);
|
||||
runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload");
|
||||
|
||||
runner.enqueue("0123456789".getBytes());
|
||||
|
|
Loading…
Reference in New Issue