NIFI-7103 Adding PutDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2 Storage.

added data-lake dependency
NIFI-7103 fixed indentation
Update to add IllegalArgumentException
Fixed indentation and logging
nifi-7103 review changes
nifi-7103 root directory and exception change

This closes #4126.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
muazmaz 2020-03-09 21:53:22 -07:00 committed by Peter Turcsanyi
parent f4a7aafe4a
commit daddf400a2
6 changed files with 345 additions and 2 deletions

View File

@ -70,11 +70,16 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-datalake</artifactId>
<version>12.0.1</version>
</dependency>
<!-- overriding jackson-core in azure-storage -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<version>2.10.3</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.commons.lang3.StringUtils;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Map;
public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
.name("storage-account-name").displayName("Storage Account Name")
.description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile " +
"attribute. While it does provide for a more flexible flow by allowing the account name to " +
"be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
"In addition, the provenance repositories may be put on encrypted disk partitions." +
" Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " +
"the preferred way is to configure them through a controller service")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.sensitive(true).build();
public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder()
.name("storage-account-key").displayName("Storage Account Key")
.description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " +
"one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " +
"There are certain risks in allowing the account key to be stored as a flowfile " +
"attribute. While it does provide for a more flexible flow by allowing the account key to " +
"be fetched dynamically from a flow file attribute, care must be taken to restrict access to " +
"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
"In addition, the provenance repositories may be put on encrypted disk partitions.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.sensitive(true).build();
public static final PropertyDescriptor SAS_TOKEN = new PropertyDescriptor.Builder()
.name("storage-sas-token").displayName("SAS Token")
.description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key. " +
"There are certain risks in allowing the SAS token to be stored as a flowfile " +
"attribute. While it does provide for a more flexible flow by allowing the account name to " +
"be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " +
"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " +
"In addition, the provenance repositories may be put on encrypted disk partitions.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder()
.name("filesystem-name").displayName("Filesystem Name")
.description("Name of the Azure Storage File System. It is assumed to be already existing.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name("directory-name").displayName("Directory Name")
.description("Name of the Azure Storage Directory. It will be created if not already existing")
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder()
.name("file-name").displayName("File Name")
.description("The filename")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.defaultValue("nifi.${uuid}")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
"Files that have been successfully written to Azure storage are transferred to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description(
"Files that could not be written to Azure storage for some reason are transferred to this relationship")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(
Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME, AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY,
AbstractAzureDataLakeStorageProcessor.SAS_TOKEN, AbstractAzureDataLakeStorageProcessor.FILESYSTEM,
AbstractAzureDataLakeStorageProcessor.DIRECTORY, AbstractAzureDataLakeStorageProcessor.FILE));
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(
AbstractAzureBlobProcessor.REL_SUCCESS,
AbstractAzureBlobProcessor.REL_FAILURE)));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
public static Collection<ValidationResult> validateCredentialProperties(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue();
final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue();
final String sasToken = validationContext.getProperty(SAS_TOKEN).getValue();
if (StringUtils.isNotBlank(accountName)
&& ((StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) || (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)))) {
results.add(new ValidationResult.Builder().subject("Azure Storage Credentials").valid(false)
.explanation("either " + ACCOUNT_NAME.getDisplayName() + " with " + ACCOUNT_KEY.getDisplayName() +
" or " + ACCOUNT_NAME.getDisplayName() + " with " + SAS_TOKEN.getDisplayName() +
" must be specified, not both")
.build());
}
return results;
}
public static DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) {
final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap();
final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue();
final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue();
final String sasToken = context.getProperty(SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
final String endpoint = String.format("https://%s.dfs.core.windows.net", accountName);
DataLakeServiceClient storageClient;
if (StringUtils.isNotBlank(accountKey)) {
final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
accountKey);
storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
.buildClient();
} else if (StringUtils.isNotBlank(sasToken)) {
storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
.buildClient();
} else {
throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
ACCOUNT_KEY.getDisplayName(), SAS_TOKEN.getDisplayName()));
}
return storageClient;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final Collection<ValidationResult> results = validateCredentialProperties(validationContext);
return results;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import java.io.BufferedInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
@WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"),
@WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"),
@WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"),
@WritesAttribute(attribute = "azure.length", description = "Length of the file")})
@InputRequirement(Requirement.INPUT_REQUIRED)
public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
try {
final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
final long length = flowFile.getSize();
if (length > 0) {
try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
fileClient.append(in, 0, length);
}
}
fileClient.flush(length);
final Map<String, String> attributes = new HashMap<>();
attributes.put("azure.filesystem", fileSystem);
attributes.put("azure.directory", directory);
attributes.put("azure.filename", fileName);
attributes.put("azure.primaryUri", fileClient.getFileUrl());
attributes.put("azure.length", String.valueOf(length));
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
} catch (Exception e) {
getLogger().error("Failed to create file, due to {}", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -21,3 +21,4 @@ org.apache.nifi.processors.azure.storage.PutAzureBlobStorage
org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage
org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage
org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage

View File

@ -31,6 +31,7 @@ public abstract class AbstractAzureBlobStorageIT extends AbstractAzureStorageIT
protected static final String TEST_CONTAINER_NAME_PREFIX = "nifi-test-container";
protected static final String TEST_BLOB_NAME = "nifi-test-blob";
protected static final String TEST_FILE_NAME = "nifi-test-file";
protected CloudBlobContainer container;

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class ITPutAzureDataLakeStorage extends AbstractAzureBlobStorageIT {
@Override
protected Class<? extends Processor> getProcessorClass() {
return PutAzureDataLakeStorage.class;
}
@Before
public void setUp() {
runner.setProperty(PutAzureDataLakeStorage.FILE, TEST_FILE_NAME);
}
@Test
public void testPutFile() throws Exception {
runner.assertValid();
runner.enqueue("0123456789".getBytes());
runner.run();
assertResult();
}
private void assertResult() throws Exception {
runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) {
flowFile.assertContentEquals("0123456789".getBytes());
flowFile.assertAttributeEquals("azure.length", "10");
}
}
}