NIFI-1833 - Azure Storage processors

Signed-off-by: Bryan Rosander <brosander@apache.org>
This commit is contained in:
Simon Elliston Ball 2016-05-02 00:35:34 +01:00 committed by Bryan Rosander
parent 7d242076ce
commit 3488a169ca
No known key found for this signature in database
GPG Key ID: 2065F38F3FF65D23
14 changed files with 1114 additions and 10 deletions

View File

@ -35,6 +35,12 @@
<artifactId>nifi-azure-processors</artifactId>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -1,14 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@ -31,11 +31,35 @@ language governing permissions and limitations under the License. -->
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.6</version>
</dependency>
<!--<dependency>
<groupId>com.microsoft.eventhubs.client</groupId>
<artifactId>eventhubs-client</artifactId>
<version>0.9.1</version>
</dependency>-->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>5.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
@ -57,5 +81,10 @@ language governing permissions and limitations under the License. -->
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-processors</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor {
public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
public static final List<PropertyDescriptor> properties = Collections
.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import com.microsoft.azure.storage.CloudStorageAccount;
public abstract class AbstractAzureProcessor extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any failed fetches will be transferred to the failure relation.").build();
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
protected CloudStorageAccount createStorageConnection(ProcessContext context) {
final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
return createStorageConnectionFromNameAndKey(accountName, accountKey);
}
protected CloudStorageAccount createStorageConnection(ProcessContext context, FlowFile flowFile) {
final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
return createStorageConnectionFromNameAndKey(accountName, accountKey);
}
private CloudStorageAccount createStorageConnectionFromNameAndKey(String accountName, String accountKey) {
final String storageConnectionString = String.format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s", accountName, accountKey);
try {
return createStorageAccountFromConnectionString(storageConnectionString);
} catch (InvalidKeyException | IllegalArgumentException | URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
/**
* Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format.
*
* @param storageConnectionString
* Connection string for the storage service or the emulator
* @return The newly created CloudStorageAccount object
*
*/
protected static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
CloudStorageAccount storageAccount;
try {
storageAccount = CloudStorageAccount.parse(storageConnectionString);
} catch (IllegalArgumentException | URISyntaxException e) {
throw e;
} catch (InvalidKeyException e) {
throw e;
}
return storageAccount;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.util.StandardValidators;
public final class AzureConstants {
public static final String BLOCK = "Block";
public static final String PAGE = "Page";
public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("Storage Account Key").description("The storage account key")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("Storage Account Name").description("The storage account name")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("Container name").description("Name of the azure storage container")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
private AzureConstants() {
// do not instantiate
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.AzureConstants;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile")
@InputRequirement(Requirement.INPUT_REQUIRED)
@WritesAttributes({
@WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched")
})
public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
public static final List<PropertyDescriptor> PROPERTIES = Collections
.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
try {
CloudStorageAccount storageAccount = createStorageConnection(context, flowFile);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
CloudBlobContainer container = blobClient.getContainerReference(containerName);
final Map<String, String> attributes = new HashMap<>();
final CloudBlob blob = container.getBlockBlobReference(blobPath);
// TODO - we may be able do fancier things with ranges and
// distribution of download over threads, investigate
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream os) throws IOException {
try {
blob.download(os);
} catch (StorageException e) {
throw new IOException(e);
}
}
});
long length = blob.getProperties().getLength();
attributes.put("azure.length", String.valueOf(length));
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
} catch (IllegalArgumentException | URISyntaxException | StorageException e1) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AzureConstants;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
import org.apache.nifi.processors.standard.AbstractListProcessor;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
@TriggerSerially
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@SeeAlso({ FetchAzureBlobStorage.class })
@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"),
@WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"), @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
@WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"), @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
@WritesAttribute(attribute = "azure.length", description = "Length of the blob"), @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
@WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), @WritesAttribute(attribute = "lang", description = "Language code for the content"),
@WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
+ "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true).required(false).build();
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
protected Map<String, String> createAttributes(BlobInfo entity, ProcessContext context) {
final Map<String, String> attributes = new HashMap<>();
attributes.put("azure.etag", entity.getEtag());
attributes.put("azure.primaryUri", entity.getPrimaryUri());
attributes.put("azure.secondaryUri", entity.getSecondaryUri());
attributes.put("azure.blobname", entity.getName());
attributes.put("azure.blobtype", entity.getBlobType());
attributes.put("azure.length", String.valueOf(entity.getLength()));
attributes.put("azure.timestamp", String.valueOf(entity.getTimestamp()));
attributes.put("mime.type", entity.getContentType());
attributes.put("lang", entity.getContentLanguage());
return attributes;
}
@Override
protected String getPath(final ProcessContext context) {
return context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue();
}
@Override
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
// TODO - implement
return false;
}
@Override
protected Scope getStateScope(final ProcessContext context) {
return Scope.CLUSTER;
}
@Override
protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue();
String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
if (prefix == null) {
prefix = "";
}
final List<BlobInfo> listing = new ArrayList<>();
try {
CloudStorageAccount storageAccount = createStorageConnection(context);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
CloudBlobContainer container = blobClient.getContainerReference(containerName);
BlobRequestOptions blobRequestOptions = null;
OperationContext operationContext = null;
for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), blobRequestOptions, operationContext)) {
if (blob instanceof CloudBlob) {
CloudBlob cloudBlob = (CloudBlob) blob;
BlobProperties properties = cloudBlob.getProperties();
StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
Builder builder = new BlobInfo.Builder().primaryUri(uri.getPrimaryUri().toString()).secondaryUri(uri.getSecondaryUri().toString()).contentType(properties.getContentType())
.contentLanguage(properties.getContentLanguage()).etag(properties.getEtag()).lastModifiedTime(properties.getLastModified().getTime()).length(properties.getLength());
if (blob instanceof CloudBlockBlob) {
builder.blobType(AzureConstants.BLOCK);
} else {
builder.blobType(AzureConstants.PAGE);
}
listing.add(builder.build());
}
}
} catch (IllegalArgumentException | URISyntaxException | StorageException e) {
throw (new IOException(e));
}
return listing;
}
protected static CloudStorageAccount createStorageConnection(ProcessContext context) {
final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", accountName, accountKey);
try {
return createStorageAccountFromConnectionString(storageConnectionString);
} catch (InvalidKeyException | URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
/**
* Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format.
*
* @param storageConnectionString
* Connection string for the storage service or the emulator
* @return The newly created CloudStorageAccount object
*
*/
private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
CloudStorageAccount storageAccount;
try {
storageAccount = CloudStorageAccount.parse(storageConnectionString);
} catch (IllegalArgumentException | URISyntaxException e) {
System.out.println("\nConnection string specifies an invalid URI.");
System.out.println("Please confirm the connection string is in the Azure connection string format.");
throw e;
} catch (InvalidKeyException e) {
System.out.println("\nConnection string specifies an invalid key.");
System.out.println("Please confirm the AccountName and AccountKey in the connection string are valid.");
throw e;
}
return storageAccount;
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.AzureConstants;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class })
@CapabilityDescription("Puts content into an Azure Storage Blob")
@InputRequirement(Requirement.INPUT_REQUIRED)
@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"),
@WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"),
@WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
@WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
@WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
@WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
@WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
try {
CloudStorageAccount storageAccount = createStorageConnection(context, flowFile);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
CloudBlobContainer container = blobClient.getContainerReference(containerName);
CloudBlob blob = container.getBlockBlobReference(blobPath);
final Map<String, String> attributes = new HashMap<>();
long length = flowFile.getSize();
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
final InputStream in = new BufferedInputStream(rawIn);
try {
blob.upload(in, length);
BlobProperties properties = blob.getProperties();
attributes.put("azure.container", containerName);
attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
attributes.put("azure.etag", properties.getEtag());
attributes.put("azure.length", String.valueOf(length));
attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
} catch (StorageException | URISyntaxException e) {
throw new IOException(e);
}
}
});
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
} catch (IllegalArgumentException | URISyntaxException | StorageException e) {
getLogger().error("Failed to put Azure blob {}", new Object[]{blobPath}, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import java.io.Serializable;
import org.apache.nifi.processors.standard.util.ListableEntity;
public class BlobInfo implements Comparable<BlobInfo>, Serializable, ListableEntity {
private static final long serialVersionUID = 1L;
private final String primaryUri;
private final String secondaryUri;
private final String contentType;
private final String contentLanguage;
private final String etag;
private final long lastModifiedTime;
private final long length;
private final String blobType;
public static long getSerialversionuid() {
return serialVersionUID;
}
public String getPrimaryUri() {
return primaryUri;
}
public String getSecondaryUri() {
return secondaryUri;
}
public String getContentType() {
return contentType;
}
public String getContentLanguage() {
return contentLanguage;
}
public String getEtag() {
return etag;
}
public long getLastModifiedTime() {
return lastModifiedTime;
}
public long getLength() {
return length;
}
public String getBlobType() {
return blobType;
}
public static final class Builder {
private String primaryUri;
private String secondaryUri;
private String contentType;
private String contentLanguage;
private String etag;
private long lastModifiedTime;
private long length;
private String blobType;
public Builder primaryUri(String primaryUri) {
this.primaryUri = primaryUri;
return this;
}
public Builder secondaryUri(String secondaryUri) {
this.secondaryUri = secondaryUri;
return this;
}
public Builder contentType(String contentType) {
this.contentType = contentType;
return this;
}
public Builder contentLanguage(String contentLanguage) {
this.contentLanguage = contentLanguage;
return this;
}
public Builder etag(String etag) {
this.etag = etag;
return this;
}
public Builder lastModifiedTime(long lastModifiedTime) {
this.lastModifiedTime = lastModifiedTime;
return this;
}
public Builder length(long length) {
this.length = length;
return this;
}
public Builder blobType(String blobType) {
this.blobType = blobType;
return this;
}
public BlobInfo build() {
return new BlobInfo(this);
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((etag == null) ? 0 : etag.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
BlobInfo other = (BlobInfo) obj;
if (etag == null) {
if (other.etag != null) {
return false;
}
} else if (!etag.equals(other.etag)) {
return false;
}
return true;
}
@Override
public int compareTo(BlobInfo o) {
return etag.compareTo(o.etag);
}
protected BlobInfo(final Builder builder) {
this.primaryUri = builder.primaryUri;
this.secondaryUri = builder.secondaryUri;
this.contentType = builder.contentType;
this.contentLanguage = builder.contentLanguage;
this.etag = builder.etag;
this.lastModifiedTime = builder.lastModifiedTime;
this.length = builder.length;
this.blobType = builder.blobType;
}
@Override
public String getName() {
String primaryUri = getPrimaryUri();
return primaryUri.substring(primaryUri.lastIndexOf('/') + 1);
}
@Override
public String getIdentifier() {
return getPrimaryUri();
}
@Override
public long getTimestamp() {
return getLastModifiedTime();
}
}

View File

@ -13,4 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.azure.eventhub.PutAzureEventHub
org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage
org.apache.nifi.processors.azure.storage.ListAzureBlobStorage
org.apache.nifi.processors.azure.storage.PutAzureBlobStorage

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import static org.junit.Assert.fail;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Properties;
import org.apache.nifi.util.file.FileUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
import com.microsoft.azure.storage.table.CloudTable;
import com.microsoft.azure.storage.table.CloudTableClient;
public abstract class AbstractAzureIT {
protected static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
public static final String TEST_CONTAINER_NAME = "nifitest";
private static final Properties CONFIG;
protected static final String TEST_BLOB_NAME = "testing";
protected static final String TEST_TABLE_NAME = "testing";
static {
final FileInputStream fis;
CONFIG = new Properties();
try {
fis = new FileInputStream(CREDENTIALS_FILE);
try {
CONFIG.load(fis);
} catch (IOException e) {
fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
} finally {
FileUtils.closeQuietly(fis);
}
} catch (FileNotFoundException e) {
fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
}
}
@BeforeClass
public static void oneTimeSetup() throws StorageException, InvalidKeyException, URISyntaxException {
CloudBlobContainer container = getContainer();
container.createIfNotExists();
}
@AfterClass
public static void tearDown() throws InvalidKeyException, URISyntaxException, StorageException {
CloudBlobContainer container = getContainer();
for (ListBlobItem blob : container.listBlobs()) {
if (blob instanceof CloudBlob) {
((CloudBlob) blob).delete(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS, null, null, null);
}
}
}
public static String getAccountName() {
return CONFIG.getProperty("accountName");
}
public static String getAccountKey() {
return CONFIG.getProperty("accountKey");
}
protected static CloudBlobContainer getContainer() throws InvalidKeyException, URISyntaxException, StorageException {
String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey());
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
return blobClient.getContainerReference(TEST_CONTAINER_NAME);
}
protected static CloudTable getTable() throws InvalidKeyException, URISyntaxException, StorageException {
String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey());
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
CloudTableClient tableClient = storageAccount.createCloudTableClient();
return tableClient.getTableReference(TEST_TABLE_NAME);
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.processors.azure.AzureConstants;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import com.microsoft.azure.storage.StorageException;
public class ITFetchAzureBlobStorage extends AbstractAzureIT {
@Test
public void testFetchingBlob() throws InvalidKeyException, URISyntaxException, StorageException, IOException {
final TestRunner runner = TestRunners.newTestRunner(new FetchAzureBlobStorage());
runner.setValidateExpressionUsage(true);
runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}");
final Map<String, String> attributes = new HashMap<>();
attributes.put("azure.primaryUri", "http://" + getAccountName() + ".blob.core.windows.net/" + TEST_CONTAINER_NAME + "/" + TEST_BLOB_NAME);
attributes.put("azure.blobname", TEST_BLOB_NAME);
attributes.put("azure.blobtype", AzureConstants.BLOCK);
runner.enqueue(new byte[0], attributes);
runner.run();
runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) {
flowFile.assertContentEquals("0123456789".getBytes());
flowFile.assertAttributeEquals("azure.length", "10");
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import org.apache.nifi.processors.azure.AzureConstants;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
public class ITListAzureBlobStorage extends AbstractAzureIT {
@BeforeClass
public static void setupSomeFiles() throws InvalidKeyException, URISyntaxException, StorageException, IOException {
CloudBlobContainer container = getContainer();
container.createIfNotExists();
CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME);
byte[] buf = "0123456789".getBytes();
InputStream in = new ByteArrayInputStream(buf);
blob.upload(in, 10);
}
@AfterClass
public static void tearDown() throws InvalidKeyException, URISyntaxException, StorageException {
CloudBlobContainer container = getContainer();
container.deleteIfExists();
}
@Test
public void testListsAzureBlobStorageContent() {
final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage());
runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
// requires multiple runs to deal with List processor checking
runner.run(3);
runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1);
for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
entry.assertAttributeEquals("azure.length", "10");
entry.assertAttributeEquals("mime.type", "application/octet-stream");
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import java.io.IOException;
import java.util.List;
import org.apache.nifi.processors.azure.AzureConstants;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class ITPutAzureStorageBlob extends AbstractAzureIT {
@Test
public void testPuttingBlob() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new PutAzureBlobStorage());
runner.setValidateExpressionUsage(true);
runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload");
runner.enqueue("0123456789".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) {
flowFile.assertContentEquals("0123456789".getBytes());
flowFile.assertAttributeEquals("azure.length", "10");
}
}
}