From daddf400a2628c8768c452d6f835613916ae8139 Mon Sep 17 00:00:00 2001 From: muazmaz Date: Mon, 9 Mar 2020 21:53:22 -0700 Subject: [PATCH] NIFI-7103 Adding PutDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2 Storage. added data-lake dependency NIFI-7103 fixed indentation Update to add IllegalArgumentException Fixed indentation and logging nifi-7103 review changes nifi-7103 root directory and exception change This closes #4126. Signed-off-by: Peter Turcsanyi --- .../nifi-azure-processors/pom.xml | 7 +- ...AbstractAzureDataLakeStorageProcessor.java | 184 ++++++++++++++++++ .../storage/PutAzureDataLakeStorage.java | 95 +++++++++ .../org.apache.nifi.processor.Processor | 3 +- .../storage/AbstractAzureBlobStorageIT.java | 1 + .../storage/ITPutAzureDataLakeStorage.java | 57 ++++++ 6 files changed, 345 insertions(+), 2 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index b8b9dcd681..5e909a5121 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -70,11 +70,16 @@ com.microsoft.azure azure-storage + + com.azure + azure-storage-file-datalake + 12.0.1 + com.fasterxml.jackson.core jackson-core - ${jackson.version} + 2.10.3 org.apache.nifi diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java new file mode 100644 index 0000000000..3368fe2804 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.commons.lang3.StringUtils; + +import com.azure.storage.common.StorageSharedKeyCredential; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Map; + +public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor { + + public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder() + .name("storage-account-name").displayName("Storage Account Name") + .description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile " + + "attribute. While it does provide for a more flexible flow by allowing the account name to " + + "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " + + "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + + "In addition, the provenance repositories may be put on encrypted disk partitions." + + " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " + + "the preferred way is to configure them through a controller service") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .sensitive(true).build(); + + public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder() + .name("storage-account-key").displayName("Storage Account Key") + .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " + + "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " + + "There are certain risks in allowing the account key to be stored as a flowfile " + + "attribute. While it does provide for a more flexible flow by allowing the account key to " + + "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + + "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + + "In addition, the provenance repositories may be put on encrypted disk partitions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .sensitive(true).build(); + + public static final PropertyDescriptor SAS_TOKEN = new PropertyDescriptor.Builder() + .name("storage-sas-token").displayName("SAS Token") + .description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key. " + + "There are certain risks in allowing the SAS token to be stored as a flowfile " + + "attribute. While it does provide for a more flexible flow by allowing the account name to " + + "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " + + "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + + "In addition, the provenance repositories may be put on encrypted disk partitions.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder() + .name("filesystem-name").displayName("Filesystem Name") + .description("Name of the Azure Storage File System. It is assumed to be already existing.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .build(); + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .name("directory-name").displayName("Directory Name") + .description("Name of the Azure Storage Directory. It will be created if not already existing") + .addValidator(Validator.VALID) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .build(); + + public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder() + .name("file-name").displayName("File Name") + .description("The filename") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .defaultValue("nifi.${uuid}") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description( + "Files that have been successfully written to Azure storage are transferred to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description( + "Files that could not be written to Azure storage for some reason are transferred to this relationship") + .build(); + + private static final List PROPERTIES = Collections.unmodifiableList( + Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME, AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY, + AbstractAzureDataLakeStorageProcessor.SAS_TOKEN, AbstractAzureDataLakeStorageProcessor.FILESYSTEM, + AbstractAzureDataLakeStorageProcessor.DIRECTORY, AbstractAzureDataLakeStorageProcessor.FILE)); + + private static final Set RELATIONSHIPS = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList( + AbstractAzureBlobProcessor.REL_SUCCESS, + AbstractAzureBlobProcessor.REL_FAILURE))); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + public static Collection validateCredentialProperties(final ValidationContext validationContext) { + final List results = new ArrayList<>(); + final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue(); + final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue(); + final String sasToken = validationContext.getProperty(SAS_TOKEN).getValue(); + + if (StringUtils.isNotBlank(accountName) + && ((StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) || (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)))) { + results.add(new ValidationResult.Builder().subject("Azure Storage Credentials").valid(false) + .explanation("either " + ACCOUNT_NAME.getDisplayName() + " with " + ACCOUNT_KEY.getDisplayName() + + " or " + ACCOUNT_NAME.getDisplayName() + " with " + SAS_TOKEN.getDisplayName() + + " must be specified, not both") + .build()); + } + return results; + } + + public static DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) { + final Map attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap(); + final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue(); + final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue(); + final String sasToken = context.getProperty(SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue(); + final String endpoint = String.format("https://%s.dfs.core.windows.net", accountName); + DataLakeServiceClient storageClient; + if (StringUtils.isNotBlank(accountKey)) { + final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, + accountKey); + storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential) + .buildClient(); + } else if (StringUtils.isNotBlank(sasToken)) { + storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken) + .buildClient(); + } else { + throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.", + ACCOUNT_KEY.getDisplayName(), SAS_TOKEN.getDisplayName())); + } + return storageClient; + } + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final Collection results = validateCredentialProperties(validationContext); + return results; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java new file mode 100644 index 0000000000..122b8d627c --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import com.azure.storage.file.datalake.DataLakeDirectoryClient; +import com.azure.storage.file.datalake.DataLakeFileClient; +import com.azure.storage.file.datalake.DataLakeFileSystemClient; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; + + +@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) +@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2") +@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"), + @WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"), + @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"), + @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"), + @WritesAttribute(attribute = "azure.length", description = "Length of the file")}) +@InputRequirement(Requirement.INPUT_REQUIRED) + +public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + final long startNanos = System.nanoTime(); + try { + final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); + final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); + final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue(); + final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); + final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem); + final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory); + final DataLakeFileClient fileClient = directoryClient.createFile(fileName); + final long length = flowFile.getSize(); + if (length > 0) { + try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) { + fileClient.append(in, 0, length); + + } + } + fileClient.flush(length); + final Map attributes = new HashMap<>(); + attributes.put("azure.filesystem", fileSystem); + attributes.put("azure.directory", directory); + attributes.put("azure.filename", fileName); + attributes.put("azure.primaryUri", fileClient.getFileUrl()); + attributes.put("azure.length", String.valueOf(length)); + flowFile = session.putAllAttributes(flowFile, attributes); + + + session.transfer(flowFile, REL_SUCCESS); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis); + } catch (Exception e) { + getLogger().error("Failed to create file, due to {}", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 61b0df3bb5..976daf069c 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -20,4 +20,5 @@ org.apache.nifi.processors.azure.storage.ListAzureBlobStorage org.apache.nifi.processors.azure.storage.PutAzureBlobStorage org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage -org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage \ No newline at end of file +org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage +org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java index 4361de79fa..714688f112 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorageIT.java @@ -31,6 +31,7 @@ public abstract class AbstractAzureBlobStorageIT extends AbstractAzureStorageIT protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container"; protected static final String TEST_BLOB_NAME = "nifi-test-blob"; + protected static final String TEST_FILE_NAME = "nifi-test-file"; protected CloudBlobContainer container; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java new file mode 100644 index 0000000000..8c6b836def --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.util.MockFlowFile; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class ITPutAzureDataLakeStorage extends AbstractAzureBlobStorageIT { + + @Override + protected Class getProcessorClass() { + return PutAzureDataLakeStorage.class; + } + + @Before + public void setUp() { + runner.setProperty(PutAzureDataLakeStorage.FILE, TEST_FILE_NAME); + } + + @Test + public void testPutFile() throws Exception { + runner.assertValid(); + runner.enqueue("0123456789".getBytes()); + runner.run(); + + assertResult(); + } + + + private void assertResult() throws Exception { + runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1); + List flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS); + for (MockFlowFile flowFile : flowFilesForRelationship) { + flowFile.assertContentEquals("0123456789".getBytes()); + flowFile.assertAttributeEquals("azure.length", "10"); + } + + } +}