NIFI-4005 Add support for Azure Shared Access Signature (SAS) Tokens. Upgraded the client library from 5.0.0 to 5.2.0

This commit is contained in:
Andrew Grande 2017-05-31 16:45:26 -04:00 committed by Matt Gilman
parent f04ddcf442
commit 17ddaf6be0
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
12 changed files with 236 additions and 230 deletions

View File

@ -36,7 +36,7 @@
<dependency> <dependency>
<groupId>com.microsoft.azure</groupId> <groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId> <artifactId>azure-storage</artifactId>
<version>5.0.0</version> <version>5.2.0</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>

View File

@ -17,23 +17,52 @@
package org.apache.nifi.processors.azure; package org.apache.nifi.processors.azure;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.Azure;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor { public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("blob").displayName("Blob").description("The filename of the blob") public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("blob").displayName("Blob").description("The filename of the blob")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build(); .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successfully processed FlowFiles are routed to this relationship").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Unsuccessful operations will be transferred to the failure relationship.").build();
private static final List<PropertyDescriptor> PROPERTIES = Collections private static final List<PropertyDescriptor> PROPERTIES = Collections
.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB)); .unmodifiableList(Arrays.asList(
Azure.CONTAINER,
Azure.PROP_SAS_TOKEN,
Azure.ACCOUNT_NAME,
Azure.ACCOUNT_KEY,
BLOB));
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(
AbstractAzureBlobProcessor.REL_SUCCESS,
AbstractAzureBlobProcessor.REL_FAILURE)));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES; return PROPERTIES;
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return Azure.validateCredentialProperties(validationContext);
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
} }

View File

@ -1,79 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import com.microsoft.azure.storage.CloudStorageAccount;
public abstract class AbstractAzureProcessor extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successfully processed FlowFiles are routed to this relationship").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Unsuccessful operations will be transferred to the failure relationship.").build();
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
protected CloudStorageAccount createStorageConnection(ProcessContext context) {
final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
return createStorageConnectionFromNameAndKey(accountName, accountKey);
}
protected CloudStorageAccount createStorageConnection(ProcessContext context, FlowFile flowFile) {
final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
return createStorageConnectionFromNameAndKey(accountName, accountKey);
}
private CloudStorageAccount createStorageConnectionFromNameAndKey(String accountName, String accountKey) {
final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey);
try {
return createStorageAccountFromConnectionString(storageConnectionString);
} catch (InvalidKeyException | IllegalArgumentException | URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
/**
* Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format.
*
* @param storageConnectionString
* Connection string for the storage service or the emulator
* @return The newly created CloudStorageAccount object
*
*/
private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
CloudStorageAccount storageAccount;
storageAccount = CloudStorageAccount.parse(storageConnectionString);
return storageAccount;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
}

View File

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.util.StandardValidators;
public final class AzureConstants {
public static final String BLOCK = "Block";
public static final String PAGE = "Page";
public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage Account Key")
.description("The storage account key. There are certain risks in allowing the account key to be stored as a flowfile" +
"attribute. While it does provide for a more flexible flow by allowing the account key to " +
"be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
"In addition, the provenance repositories may be put on encrypted disk partitions.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage Account Name")
.description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile" +
"attribute. While it does provide for a more flexible flow by allowing the account name to " +
"be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
"In addition, the provenance repositories may be put on encrypted disk partitions.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("container-name").displayName("Container name")
.description("Name of the azure storage container").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
// use HTTPS by default as per MSFT recommendation
public static final String FORMAT_DEFAULT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
private AzureConstants() {
// do not instantiate
}
}

View File

@ -18,10 +18,7 @@ package org.apache.nifi.processors.azure.storage;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -34,15 +31,13 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.AzureConstants; import org.apache.nifi.processors.azure.storage.utils.Azure;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobClient;
@ -57,14 +52,6 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer;
}) })
public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
private static final List<PropertyDescriptor> PROPERTIES = Collections
.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get(); FlowFile flowFile = session.get();
@ -74,13 +61,12 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
final long startNanos = System.nanoTime(); final long startNanos = System.nanoTime();
String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); String containerName = context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
AtomicReference<Exception> storedException = new AtomicReference<>(); AtomicReference<Exception> storedException = new AtomicReference<>();
try { try {
CloudStorageAccount storageAccount = createStorageConnection(context, flowFile); CloudBlobClient blobClient = Azure.createCloudBlobClient(context, getLogger());
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
CloudBlobContainer container = blobClient.getContainerReference(containerName); CloudBlobContainer container = blobClient.getContainerReference(containerName);
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();

View File

@ -16,17 +16,15 @@
*/ */
package org.apache.nifi.processors.azure.storage; package org.apache.nifi.processors.azure.storage;
import java.io.IOException; import com.microsoft.azure.storage.StorageUri;
import java.net.URISyntaxException; import com.microsoft.azure.storage.blob.BlobListingDetails;
import java.security.InvalidKeyException; import com.microsoft.azure.storage.blob.BlobProperties;
import java.util.ArrayList; import com.microsoft.azure.storage.blob.CloudBlob;
import java.util.Arrays; import com.microsoft.azure.storage.blob.CloudBlobClient;
import java.util.Collections; import com.microsoft.azure.storage.blob.CloudBlobContainer;
import java.util.EnumSet; import com.microsoft.azure.storage.blob.CloudBlockBlob;
import java.util.HashMap; import com.microsoft.azure.storage.blob.ListBlobItem;
import java.util.List; import org.apache.commons.lang3.exception.ExceptionUtils;
import java.util.Map;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.Stateful;
@ -37,24 +35,25 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.list.AbstractListProcessor; import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processors.azure.AzureConstants; import org.apache.nifi.processors.azure.storage.utils.Azure;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo; import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder; import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
import com.microsoft.azure.storage.CloudStorageAccount; import java.io.IOException;
import com.microsoft.azure.storage.StorageException; import java.util.ArrayList;
import com.microsoft.azure.storage.StorageUri; import java.util.Arrays;
import com.microsoft.azure.storage.blob.BlobListingDetails; import java.util.Collection;
import com.microsoft.azure.storage.blob.BlobProperties; import java.util.Collections;
import com.microsoft.azure.storage.blob.CloudBlob; import java.util.EnumSet;
import com.microsoft.azure.storage.blob.CloudBlobClient; import java.util.HashMap;
import com.microsoft.azure.storage.blob.CloudBlobContainer; import java.util.List;
import com.microsoft.azure.storage.blob.CloudBlockBlob; import java.util.Map;
import com.microsoft.azure.storage.blob.ListBlobItem;
@TriggerSerially @TriggerSerially
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) @Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@ -79,16 +78,27 @@ import com.microsoft.azure.storage.blob.ListBlobItem;
"where the previous node left off, without duplicating the data.") "where the previous node left off, without duplicating the data.")
public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("prefix").displayName("Prefix").description("Search prefix for listing") private static final PropertyDescriptor PROP_PREFIX = new PropertyDescriptor.Builder().name("prefix").displayName("Prefix").description("Search prefix for listing")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).build(); .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
Azure.CONTAINER,
Azure.PROP_SAS_TOKEN,
Azure.ACCOUNT_NAME,
Azure.ACCOUNT_KEY,
PROP_PREFIX));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES; return PROPERTIES;
} }
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return Azure.validateCredentialProperties(validationContext);
}
@Override @Override
protected Map<String, String> createAttributes(BlobInfo entity, ProcessContext context) { protected Map<String, String> createAttributes(BlobInfo entity, ProcessContext context) {
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
@ -107,15 +117,16 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
@Override @Override
protected String getPath(final ProcessContext context) { protected String getPath(final ProcessContext context) {
return context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue(); return context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions().getValue();
} }
@Override @Override
protected boolean isListingResetNecessary(final PropertyDescriptor property) { protected boolean isListingResetNecessary(final PropertyDescriptor property) {
// re-list if configuration changed, but not when security keys are rolled (not included in the condition) // re-list if configuration changed, but not when security keys are rolled (not included in the condition)
return PREFIX.equals(property) return PROP_PREFIX.equals(property)
|| AzureConstants.ACCOUNT_NAME.equals(property) || Azure.ACCOUNT_NAME.equals(property)
|| AzureConstants.CONTAINER.equals(property); || Azure.CONTAINER.equals(property)
|| Azure.PROP_SAS_TOKEN.equals(property);
} }
@Override @Override
@ -125,15 +136,14 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
@Override @Override
protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException { protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue(); String containerName = context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions().getValue();
String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); String prefix = context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
if (prefix == null) { if (prefix == null) {
prefix = ""; prefix = "";
} }
final List<BlobInfo> listing = new ArrayList<>(); final List<BlobInfo> listing = new ArrayList<>();
try { try {
CloudStorageAccount storageAccount = createStorageConnection(context); CloudBlobClient blobClient = Azure.createCloudBlobClient(context, getLogger());
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
CloudBlobContainer container = blobClient.getContainerReference(containerName); CloudBlobContainer container = blobClient.getContainerReference(containerName);
for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) { for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) {
@ -142,43 +152,32 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
BlobProperties properties = cloudBlob.getProperties(); BlobProperties properties = cloudBlob.getProperties();
StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri(); StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
Builder builder = new BlobInfo.Builder().primaryUri(uri.getPrimaryUri().toString()).secondaryUri(uri.getSecondaryUri().toString()).contentType(properties.getContentType()) Builder builder = new BlobInfo.Builder()
.contentLanguage(properties.getContentLanguage()).etag(properties.getEtag()).lastModifiedTime(properties.getLastModified().getTime()).length(properties.getLength()); .primaryUri(uri.getPrimaryUri().toString())
.contentType(properties.getContentType())
.contentLanguage(properties.getContentLanguage())
.etag(properties.getEtag())
.lastModifiedTime(properties.getLastModified().getTime())
.length(properties.getLength());
if (uri.getSecondaryUri() != null) {
builder.secondaryUri(uri.getSecondaryUri().toString());
}
if (blob instanceof CloudBlockBlob) { if (blob instanceof CloudBlockBlob) {
builder.blobType(AzureConstants.BLOCK); builder.blobType(Azure.BLOCK);
} else { } else {
builder.blobType(AzureConstants.PAGE); builder.blobType(Azure.PAGE);
} }
listing.add(builder.build()); listing.add(builder.build());
} }
} }
} catch (IllegalArgumentException | URISyntaxException | StorageException e) { } catch (Throwable t) {
throw (new IOException(e)); throw new IOException(ExceptionUtils.getRootCause(t));
} }
return listing; return listing;
} }
private CloudStorageAccount createStorageConnection(ProcessContext context) {
final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey);
try {
CloudStorageAccount storageAccount;
try {
storageAccount = CloudStorageAccount.parse(storageConnectionString);
} catch (IllegalArgumentException | URISyntaxException e) {
getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e);
throw e;
} catch (InvalidKeyException e) {
getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e);
throw e;
}
return storageAccount;
} catch (InvalidKeyException | URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
} }

View File

@ -38,9 +38,8 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.AzureConstants; import org.apache.nifi.processors.azure.storage.utils.Azure;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobProperties; import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlob;
@ -67,14 +66,13 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
final long startNanos = System.nanoTime(); final long startNanos = System.nanoTime();
String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue(); String containerName = context.getProperty(Azure.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue(); String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
AtomicReference<Exception> storedException = new AtomicReference<>(); AtomicReference<Exception> storedException = new AtomicReference<>();
try { try {
CloudStorageAccount storageAccount = createStorageConnection(context, flowFile); CloudBlobClient blobClient = Azure.createCloudBlobClient(context, getLogger());
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
CloudBlobContainer container = blobClient.getContainerReference(containerName); CloudBlobContainer container = blobClient.getContainerReference(containerName);
CloudBlob blob = container.getBlockBlobReference(blobPath); CloudBlob blob = container.getBlockBlobReference(blobPath);

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
public final class Azure {
public static final String BLOCK = "Block";
public static final String PAGE = "Page";
public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage Account Key")
.description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
"one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " +
"There are certain risks in allowing the account key to be stored as a flowfile" +
"attribute. While it does provide for a more flexible flow by allowing the account key to " +
"be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
"In addition, the provenance repositories may be put on encrypted disk partitions.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).sensitive(true).build();
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage Account Name")
.description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile" +
"attribute. While it does provide for a more flexible flow by allowing the account name to " +
"be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
"In addition, the provenance repositories may be put on encrypted disk partitions.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("container-name").displayName("Container Name")
.description("Name of the Azure storage container").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder()
.name("SAS String")
.description("Shared Access Signature string, including the leading '?'. Specify either SAS (recommended) or Account Key")
.required(false)
.expressionLanguageSupported(true)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
// use HTTPS by default as per MSFT recommendation
public static final String FORMAT_BLOB_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
public static final String FORMAT_BASE_URI = "https://%s.blob.core.windows.net";
private Azure() {
// do not instantiate
}
public static CloudBlobClient createCloudBlobClient(ProcessContext context, ComponentLog logger) {
final String accountName = context.getProperty(Azure.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
final String accountKey = context.getProperty(Azure.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String sasToken = context.getProperty(Azure.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
CloudBlobClient cloudBlobClient;
try {
// sas token and acct name/key have different ways of creating a secure connection (e.g. new StorageCredentialsAccountAndKey didn't work)
if (StringUtils.isNotBlank(sasToken)) {
String storageConnectionString = String.format(Azure.FORMAT_BASE_URI, accountName);
StorageCredentials creds = new StorageCredentialsSharedAccessSignature(sasToken);
cloudBlobClient = new CloudBlobClient(new URI(storageConnectionString), creds);
} else {
String blobConnString = String.format(Azure.FORMAT_BLOB_CONNECTION_STRING, accountName, accountKey);
CloudStorageAccount storageAccount = CloudStorageAccount.parse(blobConnString);
cloudBlobClient = storageAccount.createCloudBlobClient();
}
} catch (IllegalArgumentException | URISyntaxException e) {
logger.error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e);
throw new IllegalArgumentException(e);
} catch (InvalidKeyException e) {
logger.error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e);
throw new IllegalArgumentException(e);
}
return cloudBlobClient;
}
public static Collection<ValidationResult> validateCredentialProperties(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
String sasToken = validationContext.getProperty(PROP_SAS_TOKEN).getValue();
String acctName = validationContext.getProperty(ACCOUNT_KEY).getValue();
if ((StringUtils.isBlank(sasToken) && StringUtils.isBlank(acctName))
|| (StringUtils.isNotBlank(sasToken) && StringUtils.isNotBlank(acctName))) {
results.add(new ValidationResult.Builder().subject("Azure Credentials")
.valid(false)
.explanation("either Azure Account Key or Shared Access Signature required, but not both")
.build());
}
return results;
}
}

View File

@ -25,7 +25,7 @@ import java.net.URISyntaxException;
import java.security.InvalidKeyException; import java.security.InvalidKeyException;
import java.util.Properties; import java.util.Properties;
import org.apache.nifi.processors.azure.AzureConstants; import org.apache.nifi.processors.azure.storage.utils.Azure;
import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.util.file.FileUtils;
import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.CloudStorageAccount;
@ -67,7 +67,7 @@ class AzureTestUtil {
} }
static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException { static CloudBlobContainer getContainer(String containerName) throws InvalidKeyException, URISyntaxException, StorageException {
String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, getAccountName(), getAccountKey()); String storageConnectionString = String.format(Azure.FORMAT_BLOB_CONNECTION_STRING, getAccountName(), getAccountKey());
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
return blobClient.getContainerReference(containerName); return blobClient.getContainerReference(containerName);

View File

@ -26,8 +26,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.apache.nifi.processors.azure.AbstractAzureProcessor; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.AzureConstants; import org.apache.nifi.processors.azure.storage.utils.Azure;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -55,19 +55,19 @@ public class ITFetchAzureBlobStorage {
try { try {
runner.setValidateExpressionUsage(true); runner.setValidateExpressionUsage(true);
runner.setProperty(AzureConstants.ACCOUNT_NAME, AzureTestUtil.getAccountName()); runner.setProperty(Azure.ACCOUNT_NAME, AzureTestUtil.getAccountName());
runner.setProperty(AzureConstants.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); runner.setProperty(Azure.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
runner.setProperty(AzureConstants.CONTAINER, containerName); runner.setProperty(Azure.CONTAINER, containerName);
runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}"); runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}");
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("azure.primaryUri", "https://" + AzureTestUtil.getAccountName() + ".blob.core.windows.net/" + containerName + "/" + AzureTestUtil.TEST_BLOB_NAME); attributes.put("azure.primaryUri", "https://" + AzureTestUtil.getAccountName() + ".blob.core.windows.net/" + containerName + "/" + AzureTestUtil.TEST_BLOB_NAME);
attributes.put("azure.blobname", AzureTestUtil.TEST_BLOB_NAME); attributes.put("azure.blobname", AzureTestUtil.TEST_BLOB_NAME);
attributes.put("azure.blobtype", AzureConstants.BLOCK); attributes.put("azure.blobtype", Azure.BLOCK);
runner.enqueue(new byte[0], attributes); runner.enqueue(new byte[0], attributes);
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(AbstractAzureBlobProcessor.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS); List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) { for (MockFlowFile flowFile : flowFilesForRelationship) {
flowFile.assertContentEquals("0123456789".getBytes()); flowFile.assertContentEquals("0123456789".getBytes());

View File

@ -23,7 +23,7 @@ import java.net.URISyntaxException;
import java.security.InvalidKeyException; import java.security.InvalidKeyException;
import java.util.UUID; import java.util.UUID;
import org.apache.nifi.processors.azure.AzureConstants; import org.apache.nifi.processors.azure.storage.utils.Azure;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -49,9 +49,9 @@ public class ITListAzureBlobStorage {
final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage()); final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage());
try { try {
runner.setProperty(AzureConstants.ACCOUNT_NAME, AzureTestUtil.getAccountName()); runner.setProperty(Azure.ACCOUNT_NAME, AzureTestUtil.getAccountName());
runner.setProperty(AzureConstants.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); runner.setProperty(Azure.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
runner.setProperty(AzureConstants.CONTAINER, containerName); runner.setProperty(Azure.CONTAINER, containerName);
// requires multiple runs to deal with List processor checking // requires multiple runs to deal with List processor checking
runner.run(3); runner.run(3);

View File

@ -22,7 +22,7 @@ import java.security.InvalidKeyException;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import org.apache.nifi.processors.azure.AzureConstants; import org.apache.nifi.processors.azure.storage.utils.Azure;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -44,9 +44,9 @@ public class ITPutAzureStorageBlob {
try { try {
runner.setValidateExpressionUsage(true); runner.setValidateExpressionUsage(true);
runner.setProperty(AzureConstants.ACCOUNT_NAME, AzureTestUtil.getAccountName()); runner.setProperty(Azure.ACCOUNT_NAME, AzureTestUtil.getAccountName());
runner.setProperty(AzureConstants.ACCOUNT_KEY, AzureTestUtil.getAccountKey()); runner.setProperty(Azure.ACCOUNT_KEY, AzureTestUtil.getAccountKey());
runner.setProperty(AzureConstants.CONTAINER, containerName); runner.setProperty(Azure.CONTAINER, containerName);
runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload"); runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload");
runner.enqueue("0123456789".getBytes()); runner.enqueue("0123456789".getBytes());