mirror of
https://github.com/apache/nifi.git
synced 2025-02-16 06:55:28 +00:00
NIFI-1833 - Addressed issues from PR review.
Addressed dependency issues from the review. Addressed a checkstyle issue. Review: reworded the descriptions. Review: implemented the reset condition logic. Review: dropped static qualifier from method signatures, not required really Review: removed sys.out, inlined a single method to get access to the ProcessContext.getName() Switched to HTTPS as per MSFT recommendation. Some DRY. Dropped cruft. Addressing review suggestions from 4/5 Review: documentation improvements Review: documentation improvements This closes #1636. Signed-off-by: Bryan Rosander <brosander@apache.org>
This commit is contained in:
parent
3488a169ca
commit
f30c8169ab
@ -38,7 +38,7 @@
|
|||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-standard-services-api-nar</artifactId>
|
<artifactId>nifi-standard-nar</artifactId>
|
||||||
<type>nar</type>
|
<type>nar</type>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
@ -85,6 +85,7 @@
|
|||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-standard-processors</artifactId>
|
<artifactId>nifi-standard-processors</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
@ -27,13 +27,13 @@ public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor
|
|||||||
|
|
||||||
public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
|
.expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> properties = Collections
|
private static final List<PropertyDescriptor> PROPERTIES = Collections
|
||||||
.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
|
.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
return properties;
|
return PROPERTIES;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -32,8 +32,8 @@ import com.microsoft.azure.storage.CloudStorageAccount;
|
|||||||
|
|
||||||
public abstract class AbstractAzureProcessor extends AbstractProcessor {
|
public abstract class AbstractAzureProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
|
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successfully processed FlowFiles are routed to this relationship").build();
|
||||||
protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any failed fetches will be transferred to the failure relation.").build();
|
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Unsuccessful operations will be transferred to the failure relationship.").build();
|
||||||
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
|
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
|
||||||
|
|
||||||
protected CloudStorageAccount createStorageConnection(ProcessContext context) {
|
protected CloudStorageAccount createStorageConnection(ProcessContext context) {
|
||||||
@ -49,7 +49,7 @@ public abstract class AbstractAzureProcessor extends AbstractProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private CloudStorageAccount createStorageConnectionFromNameAndKey(String accountName, String accountKey) {
|
private CloudStorageAccount createStorageConnectionFromNameAndKey(String accountName, String accountKey) {
|
||||||
final String storageConnectionString = String.format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s", accountName, accountKey);
|
final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey);
|
||||||
try {
|
try {
|
||||||
return createStorageAccountFromConnectionString(storageConnectionString);
|
return createStorageAccountFromConnectionString(storageConnectionString);
|
||||||
} catch (InvalidKeyException | IllegalArgumentException | URISyntaxException e) {
|
} catch (InvalidKeyException | IllegalArgumentException | URISyntaxException e) {
|
||||||
@ -65,13 +65,11 @@ public abstract class AbstractAzureProcessor extends AbstractProcessor {
|
|||||||
* @return The newly created CloudStorageAccount object
|
* @return The newly created CloudStorageAccount object
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
protected static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
|
private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
|
||||||
CloudStorageAccount storageAccount;
|
CloudStorageAccount storageAccount;
|
||||||
try {
|
try {
|
||||||
storageAccount = CloudStorageAccount.parse(storageConnectionString);
|
storageAccount = CloudStorageAccount.parse(storageConnectionString);
|
||||||
} catch (IllegalArgumentException | URISyntaxException e) {
|
} catch (IllegalArgumentException | URISyntaxException | InvalidKeyException e) {
|
||||||
throw e;
|
|
||||||
} catch (InvalidKeyException e) {
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
return storageAccount;
|
return storageAccount;
|
||||||
|
@ -32,6 +32,9 @@ public final class AzureConstants {
|
|||||||
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("Container name").description("Name of the azure storage container")
|
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("Container name").description("Name of the azure storage container")
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
|
||||||
|
|
||||||
|
// use HTTPS by default as per MSFT recommendation
|
||||||
|
public static final String FORMAT_DEFAULT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
|
||||||
|
|
||||||
private AzureConstants() {
|
private AzureConstants() {
|
||||||
// do not instantiate
|
// do not instantiate
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,6 @@
|
|||||||
package org.apache.nifi.processors.azure.storage;
|
package org.apache.nifi.processors.azure.storage;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -31,13 +30,13 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
|
|||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
|
||||||
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
|
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
|
||||||
import org.apache.nifi.processors.azure.AzureConstants;
|
import org.apache.nifi.processors.azure.AzureConstants;
|
||||||
|
|
||||||
@ -49,12 +48,14 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
|||||||
|
|
||||||
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
|
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
|
||||||
@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile")
|
@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile")
|
||||||
|
@SeeAlso({ ListAzureBlobStorage.class, PutAzureBlobStorage.class })
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@WritesAttributes({
|
@WritesAttributes({
|
||||||
@WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched")
|
@WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched")
|
||||||
})
|
})
|
||||||
public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
|
public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
|
||||||
public static final List<PropertyDescriptor> PROPERTIES = Collections
|
|
||||||
|
private static final List<PropertyDescriptor> PROPERTIES = Collections
|
||||||
.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
|
.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -84,14 +85,11 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||||||
|
|
||||||
// TODO - we may be able do fancier things with ranges and
|
// TODO - we may be able do fancier things with ranges and
|
||||||
// distribution of download over threads, investigate
|
// distribution of download over threads, investigate
|
||||||
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
flowFile = session.write(flowFile, os -> {
|
||||||
@Override
|
try {
|
||||||
public void process(OutputStream os) throws IOException {
|
blob.download(os);
|
||||||
try {
|
} catch (StorageException e) {
|
||||||
blob.download(os);
|
throw new IOException(e);
|
||||||
} catch (StorageException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -46,12 +46,10 @@ import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
|
|||||||
import org.apache.nifi.processors.standard.AbstractListProcessor;
|
import org.apache.nifi.processors.standard.AbstractListProcessor;
|
||||||
|
|
||||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||||
import com.microsoft.azure.storage.OperationContext;
|
|
||||||
import com.microsoft.azure.storage.StorageException;
|
import com.microsoft.azure.storage.StorageException;
|
||||||
import com.microsoft.azure.storage.StorageUri;
|
import com.microsoft.azure.storage.StorageUri;
|
||||||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||||
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
|
||||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||||
@ -60,14 +58,18 @@ import com.microsoft.azure.storage.blob.ListBlobItem;
|
|||||||
|
|
||||||
@TriggerSerially
|
@TriggerSerially
|
||||||
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
|
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
|
||||||
@SeeAlso({ FetchAzureBlobStorage.class })
|
@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class })
|
||||||
@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
|
@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
|
||||||
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||||
@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"),
|
@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
|
||||||
@WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"), @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
|
@WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
|
||||||
@WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"), @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
|
@WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
|
||||||
@WritesAttribute(attribute = "azure.length", description = "Length of the blob"), @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
|
@WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"),
|
||||||
@WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), @WritesAttribute(attribute = "lang", description = "Language code for the content"),
|
@WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
|
||||||
|
@WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
|
||||||
|
@WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
|
||||||
|
@WritesAttribute(attribute = "mime.type", description = "MimeType of the content"),
|
||||||
|
@WritesAttribute(attribute = "lang", description = "Language code for the content"),
|
||||||
@WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
|
@WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
|
||||||
@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
|
@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
|
||||||
+ "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
|
+ "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
|
||||||
@ -76,7 +78,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
|
|||||||
private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.expressionLanguageSupported(true).required(false).build();
|
.expressionLanguageSupported(true).required(false).build();
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
|
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
@ -106,8 +108,10 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
|
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
|
||||||
// TODO - implement
|
// re-list if configuration changed, but not when security keys are rolled (not included in the condition)
|
||||||
return false;
|
return PREFIX.equals(property)
|
||||||
|
|| AzureConstants.ACCOUNT_NAME.equals(property)
|
||||||
|
|| AzureConstants.CONTAINER.equals(property);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -128,10 +132,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
|
|||||||
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
|
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
|
||||||
CloudBlobContainer container = blobClient.getContainerReference(containerName);
|
CloudBlobContainer container = blobClient.getContainerReference(containerName);
|
||||||
|
|
||||||
BlobRequestOptions blobRequestOptions = null;
|
for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) {
|
||||||
OperationContext operationContext = null;
|
|
||||||
|
|
||||||
for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), blobRequestOptions, operationContext)) {
|
|
||||||
if (blob instanceof CloudBlob) {
|
if (blob instanceof CloudBlob) {
|
||||||
CloudBlob cloudBlob = (CloudBlob) blob;
|
CloudBlob cloudBlob = (CloudBlob) blob;
|
||||||
BlobProperties properties = cloudBlob.getProperties();
|
BlobProperties properties = cloudBlob.getProperties();
|
||||||
@ -154,40 +155,26 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
|
|||||||
return listing;
|
return listing;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static CloudStorageAccount createStorageConnection(ProcessContext context) {
|
protected CloudStorageAccount createStorageConnection(ProcessContext context) {
|
||||||
final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
|
final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
|
||||||
final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
|
final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
|
||||||
final String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", accountName, accountKey);
|
final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey);
|
||||||
try {
|
try {
|
||||||
return createStorageAccountFromConnectionString(storageConnectionString);
|
|
||||||
|
CloudStorageAccount storageAccount;
|
||||||
|
try {
|
||||||
|
storageAccount = CloudStorageAccount.parse(storageConnectionString);
|
||||||
|
} catch (IllegalArgumentException | URISyntaxException e) {
|
||||||
|
getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e);
|
||||||
|
throw e;
|
||||||
|
} catch (InvalidKeyException e) {
|
||||||
|
getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return storageAccount;
|
||||||
} catch (InvalidKeyException | URISyntaxException e) {
|
} catch (InvalidKeyException | URISyntaxException e) {
|
||||||
throw new IllegalArgumentException(e);
|
throw new IllegalArgumentException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format.
|
|
||||||
*
|
|
||||||
* @param storageConnectionString
|
|
||||||
* Connection string for the storage service or the emulator
|
|
||||||
* @return The newly created CloudStorageAccount object
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
|
|
||||||
|
|
||||||
CloudStorageAccount storageAccount;
|
|
||||||
try {
|
|
||||||
storageAccount = CloudStorageAccount.parse(storageConnectionString);
|
|
||||||
} catch (IllegalArgumentException | URISyntaxException e) {
|
|
||||||
System.out.println("\nConnection string specifies an invalid URI.");
|
|
||||||
System.out.println("Please confirm the connection string is in the Azure connection string format.");
|
|
||||||
throw e;
|
|
||||||
} catch (InvalidKeyException e) {
|
|
||||||
System.out.println("\nConnection string specifies an invalid key.");
|
|
||||||
System.out.println("Please confirm the AccountName and AccountKey in the connection string are valid.");
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
return storageAccount;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,6 @@ import org.apache.nifi.flowfile.FlowFile;
|
|||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
|
||||||
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
|
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
|
||||||
import org.apache.nifi.processors.azure.AzureConstants;
|
import org.apache.nifi.processors.azure.AzureConstants;
|
||||||
|
|
||||||
@ -50,13 +49,12 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
|||||||
@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class })
|
@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class })
|
||||||
@CapabilityDescription("Puts content into an Azure Storage Blob")
|
@CapabilityDescription("Puts content into an Azure Storage Blob")
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"),
|
@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
|
||||||
@WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"),
|
@WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
|
||||||
@WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
|
@WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
|
||||||
@WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
|
@WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
|
||||||
@WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
|
@WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
|
||||||
@WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
|
@WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob")})
|
||||||
@WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
|
|
||||||
public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
|
public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
|
||||||
|
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
@ -80,21 +78,23 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||||||
|
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
long length = flowFile.getSize();
|
long length = flowFile.getSize();
|
||||||
session.read(flowFile, new InputStreamCallback() {
|
session.read(flowFile, rawIn -> {
|
||||||
@Override
|
InputStream in = rawIn;
|
||||||
public void process(final InputStream rawIn) throws IOException {
|
if (!(in instanceof BufferedInputStream)) {
|
||||||
final InputStream in = new BufferedInputStream(rawIn);
|
// do not double-wrap
|
||||||
try {
|
in = new BufferedInputStream(rawIn);
|
||||||
blob.upload(in, length);
|
}
|
||||||
BlobProperties properties = blob.getProperties();
|
|
||||||
attributes.put("azure.container", containerName);
|
try {
|
||||||
attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
|
blob.upload(in, length);
|
||||||
attributes.put("azure.etag", properties.getEtag());
|
BlobProperties properties = blob.getProperties();
|
||||||
attributes.put("azure.length", String.valueOf(length));
|
attributes.put("azure.container", containerName);
|
||||||
attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
|
attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
|
||||||
} catch (StorageException | URISyntaxException e) {
|
attributes.put("azure.etag", properties.getEtag());
|
||||||
throw new IOException(e);
|
attributes.put("azure.length", String.valueOf(length));
|
||||||
}
|
attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
|
||||||
|
} catch (StorageException | URISyntaxException e) {
|
||||||
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -159,7 +159,7 @@ public class BlobInfo implements Comparable<BlobInfo>, Serializable, ListableEnt
|
|||||||
return etag.compareTo(o.etag);
|
return etag.compareTo(o.etag);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected BlobInfo(final Builder builder) {
|
private BlobInfo(final Builder builder) {
|
||||||
this.primaryUri = builder.primaryUri;
|
this.primaryUri = builder.primaryUri;
|
||||||
this.secondaryUri = builder.secondaryUri;
|
this.secondaryUri = builder.secondaryUri;
|
||||||
this.contentType = builder.contentType;
|
this.contentType = builder.contentType;
|
||||||
|
@ -0,0 +1,39 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<head>
|
||||||
|
<meta charset="utf-8" />
|
||||||
|
<title>FetchAzureBlobStorage Processor</title>
|
||||||
|
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
|
||||||
|
</head>
|
||||||
|
|
||||||
|
<body>
|
||||||
|
|
||||||
|
<h2>Apache NiFi Azure Processors</h2>
|
||||||
|
|
||||||
|
<h3>Important Security Note</h3>
|
||||||
|
<p>
|
||||||
|
There are certain risks in allowing the account name and key to be stored as flowfile
|
||||||
|
attributes. While it does provide for a more flexible flow by allowing the account name and key
|
||||||
|
be fetched dynamically from the flow file attributes, care must be taken to restrict access to
|
||||||
|
the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
|
||||||
|
In addition, the provenance repositories may be put on encrypted disk partitions.
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<a href="#" onclick="history.back()">Return to a previous page</a>
|
||||||
|
</p>
|
||||||
|
</body>
|
||||||
|
</html>
|
@ -0,0 +1,39 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<head>
|
||||||
|
<meta charset="utf-8" />
|
||||||
|
<title>ListAzureBlobStorage Processor</title>
|
||||||
|
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
|
||||||
|
</head>
|
||||||
|
|
||||||
|
<body>
|
||||||
|
|
||||||
|
<h2>Apache NiFi Azure Processors</h2>
|
||||||
|
|
||||||
|
<h3>Important Security Note</h3>
|
||||||
|
<p>
|
||||||
|
There are certain risks in allowing the account name and key to be stored as flowfile
|
||||||
|
attributes. While it does provide for a more flexible flow by allowing the account name and key
|
||||||
|
be fetched dynamically from the flow file attributes, care must be taken to restrict access to
|
||||||
|
the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
|
||||||
|
In addition, the provenance repositories may be put on encrypted disk partitions.
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<a href="#" onclick="history.back()">Return to a previous page</a>
|
||||||
|
</p>
|
||||||
|
</body>
|
||||||
|
</html>
|
@ -0,0 +1,39 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<head>
|
||||||
|
<meta charset="utf-8" />
|
||||||
|
<title>PutAzureBlobStorage Processor</title>
|
||||||
|
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
|
||||||
|
</head>
|
||||||
|
|
||||||
|
<body>
|
||||||
|
|
||||||
|
<h2>Apache NiFi Azure Processors</h2>
|
||||||
|
|
||||||
|
<h3>Important Security Note</h3>
|
||||||
|
<p>
|
||||||
|
There are certain risks in allowing the account name and key to be stored as flowfile
|
||||||
|
attributes. While it does provide for a more flexible flow by allowing the account name and key
|
||||||
|
be fetched dynamically from the flow file attributes, care must be taken to restrict access to
|
||||||
|
the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
|
||||||
|
In addition, the provenance repositories may be put on encrypted disk partitions.
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
<a href="#" onclick="history.back()">Return to a previous page</a>
|
||||||
|
</p>
|
||||||
|
</body>
|
||||||
|
</html>
|
@ -16,7 +16,17 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.azure.storage;
|
package org.apache.nifi.processors.azure.storage;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||||
|
import com.microsoft.azure.storage.StorageException;
|
||||||
|
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||||
|
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||||
|
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||||
|
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
|
||||||
|
import com.microsoft.azure.storage.blob.ListBlobItem;
|
||||||
|
import org.apache.nifi.processors.azure.AzureConstants;
|
||||||
|
import org.apache.nifi.util.file.FileUtils;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
@ -25,19 +35,7 @@ import java.net.URISyntaxException;
|
|||||||
import java.security.InvalidKeyException;
|
import java.security.InvalidKeyException;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
import org.apache.nifi.util.file.FileUtils;
|
import static org.junit.Assert.fail;
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
|
|
||||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
|
||||||
import com.microsoft.azure.storage.StorageException;
|
|
||||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
|
||||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
|
||||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
|
||||||
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
|
|
||||||
import com.microsoft.azure.storage.blob.ListBlobItem;
|
|
||||||
import com.microsoft.azure.storage.table.CloudTable;
|
|
||||||
import com.microsoft.azure.storage.table.CloudTableClient;
|
|
||||||
|
|
||||||
public abstract class AbstractAzureIT {
|
public abstract class AbstractAzureIT {
|
||||||
protected static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
|
protected static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
|
||||||
@ -90,17 +88,10 @@ public abstract class AbstractAzureIT {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected static CloudBlobContainer getContainer() throws InvalidKeyException, URISyntaxException, StorageException {
|
protected static CloudBlobContainer getContainer() throws InvalidKeyException, URISyntaxException, StorageException {
|
||||||
String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey());
|
String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, getAccountName(), getAccountKey());
|
||||||
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
|
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
|
||||||
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
|
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
|
||||||
return blobClient.getContainerReference(TEST_CONTAINER_NAME);
|
return blobClient.getContainerReference(TEST_CONTAINER_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static CloudTable getTable() throws InvalidKeyException, URISyntaxException, StorageException {
|
|
||||||
String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey());
|
|
||||||
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
|
|
||||||
CloudTableClient tableClient = storageAccount.createCloudTableClient();
|
|
||||||
return tableClient.getTableReference(TEST_TABLE_NAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
@ -23,6 +23,7 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.nifi.processors.azure.AbstractAzureProcessor;
|
||||||
import org.apache.nifi.processors.azure.AzureConstants;
|
import org.apache.nifi.processors.azure.AzureConstants;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
@ -51,7 +52,7 @@ public class ITFetchAzureBlobStorage extends AbstractAzureIT {
|
|||||||
runner.enqueue(new byte[0], attributes);
|
runner.enqueue(new byte[0], attributes);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1);
|
||||||
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
|
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
|
||||||
for (MockFlowFile flowFile : flowFilesForRelationship) {
|
for (MockFlowFile flowFile : flowFilesForRelationship) {
|
||||||
flowFile.assertContentEquals("0123456789".getBytes());
|
flowFile.assertContentEquals("0123456789".getBytes());
|
||||||
|
Loading…
x
Reference in New Issue
Block a user