diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java index 3368fe2804..83c5c14bb6 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java @@ -1,184 +1,184 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.context.PropertyContext; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.commons.lang3.StringUtils; - -import com.azure.storage.common.StorageSharedKeyCredential; -import com.azure.storage.file.datalake.DataLakeServiceClient; -import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; - -import java.util.Arrays; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.Map; - -public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor { - - public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder() - .name("storage-account-name").displayName("Storage Account Name") - .description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile " + - "attribute. While it does provide for a more flexible flow by allowing the account name to " + - "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " + - "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + - "In addition, the provenance repositories may be put on encrypted disk partitions." + - " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " + - "the preferred way is to configure them through a controller service") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(true) - .sensitive(true).build(); - - public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder() - .name("storage-account-key").displayName("Storage Account Key") - .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " + - "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " + - "There are certain risks in allowing the account key to be stored as a flowfile " + - "attribute. While it does provide for a more flexible flow by allowing the account key to " + - "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + - "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + - "In addition, the provenance repositories may be put on encrypted disk partitions.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(false) - .sensitive(true).build(); - - public static final PropertyDescriptor SAS_TOKEN = new PropertyDescriptor.Builder() - .name("storage-sas-token").displayName("SAS Token") - .description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key. " + - "There are certain risks in allowing the SAS token to be stored as a flowfile " + - "attribute. While it does provide for a more flexible flow by allowing the account name to " + - "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " + - "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + - "In addition, the provenance repositories may be put on encrypted disk partitions.") - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .sensitive(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder() - .name("filesystem-name").displayName("Filesystem Name") - .description("Name of the Azure Storage File System. It is assumed to be already existing.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(true) - .build(); - - public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() - .name("directory-name").displayName("Directory Name") - .description("Name of the Azure Storage Directory. It will be created if not already existing") - .addValidator(Validator.VALID) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(true) - .build(); - - public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder() - .name("file-name").displayName("File Name") - .description("The filename") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(true) - .defaultValue("nifi.${uuid}") - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description( - "Files that have been successfully written to Azure storage are transferred to this relationship") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description( - "Files that could not be written to Azure storage for some reason are transferred to this relationship") - .build(); - - private static final List PROPERTIES = Collections.unmodifiableList( - Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME, AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY, - AbstractAzureDataLakeStorageProcessor.SAS_TOKEN, AbstractAzureDataLakeStorageProcessor.FILESYSTEM, - AbstractAzureDataLakeStorageProcessor.DIRECTORY, AbstractAzureDataLakeStorageProcessor.FILE)); - - private static final Set RELATIONSHIPS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList( - AbstractAzureBlobProcessor.REL_SUCCESS, - AbstractAzureBlobProcessor.REL_FAILURE))); - - @Override - protected List getSupportedPropertyDescriptors() { - return PROPERTIES; - } - - public static Collection validateCredentialProperties(final ValidationContext validationContext) { - final List results = new ArrayList<>(); - final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue(); - final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue(); - final String sasToken = validationContext.getProperty(SAS_TOKEN).getValue(); - - if (StringUtils.isNotBlank(accountName) - && ((StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) || (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)))) { - results.add(new ValidationResult.Builder().subject("Azure Storage Credentials").valid(false) - .explanation("either " + ACCOUNT_NAME.getDisplayName() + " with " + ACCOUNT_KEY.getDisplayName() + - " or " + ACCOUNT_NAME.getDisplayName() + " with " + SAS_TOKEN.getDisplayName() + - " must be specified, not both") - .build()); - } - return results; - } - - public static DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) { - final Map attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap(); - final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue(); - final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue(); - final String sasToken = context.getProperty(SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue(); - final String endpoint = String.format("https://%s.dfs.core.windows.net", accountName); - DataLakeServiceClient storageClient; - if (StringUtils.isNotBlank(accountKey)) { - final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, - accountKey); - storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential) - .buildClient(); - } else if (StringUtils.isNotBlank(sasToken)) { - storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken) - .buildClient(); - } else { - throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.", - ACCOUNT_KEY.getDisplayName(), SAS_TOKEN.getDisplayName())); - } - return storageClient; - } - - @Override - protected Collection customValidate(final ValidationContext validationContext) { - final Collection results = validateCredentialProperties(validationContext); - return results; - } - - @Override - public Set getRelationships() { - return RELATIONSHIPS; - } +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.commons.lang3.StringUtils; + +import com.azure.storage.common.StorageSharedKeyCredential; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Map; + +public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor { + + public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder() + .name("storage-account-name").displayName("Storage Account Name") + .description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile " + + "attribute. While it does provide for a more flexible flow by allowing the account name to " + + "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " + + "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + + "In addition, the provenance repositories may be put on encrypted disk partitions." + + " Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " + + "the preferred way is to configure them through a controller service") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .sensitive(true).build(); + + public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder() + .name("storage-account-key").displayName("Storage Account Key") + .description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " + + "one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " + + "There are certain risks in allowing the account key to be stored as a flowfile " + + "attribute. While it does provide for a more flexible flow by allowing the account key to " + + "be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + + "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + + "In addition, the provenance repositories may be put on encrypted disk partitions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .sensitive(true).build(); + + public static final PropertyDescriptor SAS_TOKEN = new PropertyDescriptor.Builder() + .name("storage-sas-token").displayName("SAS Token") + .description("Shared Access Signature token, including the leading '?'. Specify either SAS Token (recommended) or Account Key. " + + "There are certain risks in allowing the SAS token to be stored as a flowfile " + + "attribute. While it does provide for a more flexible flow by allowing the account name to " + + "be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " + + "the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + + "In addition, the provenance repositories may be put on encrypted disk partitions.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder() + .name("filesystem-name").displayName("Filesystem Name") + .description("Name of the Azure Storage File System. It is assumed to be already existing.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .build(); + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .name("directory-name").displayName("Directory Name") + .description("Name of the Azure Storage Directory. In case of the PutAzureDatalakeStorage processor, it will be created if not already existing.") + .addValidator(Validator.VALID) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .build(); + + public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder() + .name("file-name").displayName("File Name") + .description("The filename") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .defaultValue("nifi.${uuid}") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description( + "Files that have been successfully written to Azure storage are transferred to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description( + "Files that could not be written to Azure storage for some reason are transferred to this relationship") + .build(); + + private static final List PROPERTIES = Collections.unmodifiableList( + Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME, AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY, + AbstractAzureDataLakeStorageProcessor.SAS_TOKEN, AbstractAzureDataLakeStorageProcessor.FILESYSTEM, + AbstractAzureDataLakeStorageProcessor.DIRECTORY, AbstractAzureDataLakeStorageProcessor.FILE)); + + private static final Set RELATIONSHIPS = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList( + AbstractAzureBlobProcessor.REL_SUCCESS, + AbstractAzureBlobProcessor.REL_FAILURE))); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + public static Collection validateCredentialProperties(final ValidationContext validationContext) { + final List results = new ArrayList<>(); + final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue(); + final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue(); + final String sasToken = validationContext.getProperty(SAS_TOKEN).getValue(); + + if (StringUtils.isNotBlank(accountName) + && ((StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) || (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)))) { + results.add(new ValidationResult.Builder().subject("Azure Storage Credentials").valid(false) + .explanation("either " + ACCOUNT_NAME.getDisplayName() + " with " + ACCOUNT_KEY.getDisplayName() + + " or " + ACCOUNT_NAME.getDisplayName() + " with " + SAS_TOKEN.getDisplayName() + + " must be specified, not both") + .build()); + } + return results; + } + + public static DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) { + final Map attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap(); + final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue(); + final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue(); + final String sasToken = context.getProperty(SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue(); + final String endpoint = String.format("https://%s.dfs.core.windows.net", accountName); + DataLakeServiceClient storageClient; + if (StringUtils.isNotBlank(accountKey)) { + final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, + accountKey); + storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential) + .buildClient(); + } else if (StringUtils.isNotBlank(sasToken)) { + storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken) + .buildClient(); + } else { + throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.", + ACCOUNT_KEY.getDisplayName(), SAS_TOKEN.getDisplayName())); + } + return storageClient; + } + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final Collection results = validateCredentialProperties(validationContext); + return results; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java new file mode 100644 index 0000000000..5cbf7f0789 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; + +import com.azure.storage.file.datalake.DataLakeDirectoryClient; +import com.azure.storage.file.datalake.DataLakeFileClient; +import com.azure.storage.file.datalake.DataLakeFileSystemClient; +import com.azure.storage.file.datalake.DataLakeServiceClient; + +import java.util.concurrent.TimeUnit; + +@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) +@SeeAlso({PutAzureDataLakeStorage.class}) +@CapabilityDescription("Deletes the provided file from Azure Data Lake Storage") +@InputRequirement(Requirement.INPUT_REQUIRED) +public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + try { + final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); + final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); + final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue(); + final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); + final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem); + final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory); + + final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName); + fileClient.delete(); + session.transfer(flowFile, REL_SUCCESS); + + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis); + } catch (Exception e) { + getLogger().error("Failed to delete the specified file from Azure Data Lake Storage, due to {}", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index 122b8d627c..f67c98efaa 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -1,95 +1,97 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import java.io.BufferedInputStream; -import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; - -import com.azure.storage.file.datalake.DataLakeDirectoryClient; -import com.azure.storage.file.datalake.DataLakeFileClient; -import com.azure.storage.file.datalake.DataLakeFileSystemClient; -import com.azure.storage.file.datalake.DataLakeServiceClient; -import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; - - -@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) -@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2") -@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"), - @WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"), - @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"), - @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"), - @WritesAttribute(attribute = "azure.length", description = "Length of the file")}) -@InputRequirement(Requirement.INPUT_REQUIRED) - -public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - final long startNanos = System.nanoTime(); - try { - final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); - final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); - final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue(); - final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); - final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem); - final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory); - final DataLakeFileClient fileClient = directoryClient.createFile(fileName); - final long length = flowFile.getSize(); - if (length > 0) { - try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) { - fileClient.append(in, 0, length); - - } - } - fileClient.flush(length); - final Map attributes = new HashMap<>(); - attributes.put("azure.filesystem", fileSystem); - attributes.put("azure.directory", directory); - attributes.put("azure.filename", fileName); - attributes.put("azure.primaryUri", fileClient.getFileUrl()); - attributes.put("azure.length", String.valueOf(length)); - flowFile = session.putAllAttributes(flowFile, attributes); - - - session.transfer(flowFile, REL_SUCCESS); - final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis); - } catch (Exception e) { - getLogger().error("Failed to create file, due to {}", e); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } - } +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import com.azure.storage.file.datalake.DataLakeDirectoryClient; +import com.azure.storage.file.datalake.DataLakeFileClient; +import com.azure.storage.file.datalake.DataLakeFileSystemClient; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; + + +@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) +@SeeAlso({DeleteAzureDataLakeStorage.class}) +@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2") +@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"), + @WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"), + @WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"), + @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"), + @WritesAttribute(attribute = "azure.length", description = "Length of the file")}) +@InputRequirement(Requirement.INPUT_REQUIRED) + +public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + final long startNanos = System.nanoTime(); + try { + final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); + final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); + final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue(); + final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); + final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem); + final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory); + final DataLakeFileClient fileClient = directoryClient.createFile(fileName); + final long length = flowFile.getSize(); + if (length > 0) { + try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) { + fileClient.append(in, 0, length); + + } + } + fileClient.flush(length); + final Map attributes = new HashMap<>(); + attributes.put("azure.filesystem", fileSystem); + attributes.put("azure.directory", directory); + attributes.put("azure.filename", fileName); + attributes.put("azure.primaryUri", fileClient.getFileUrl()); + attributes.put("azure.length", String.valueOf(length)); + flowFile = session.putAllAttributes(flowFile, attributes); + + + session.transfer(flowFile, REL_SUCCESS); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis); + } catch (Exception e) { + getLogger().error("Failed to create file, due to {}", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 976daf069c..9f8417df01 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -21,4 +21,5 @@ org.apache.nifi.processors.azure.storage.PutAzureBlobStorage org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage -org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage \ No newline at end of file +org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage +org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java new file mode 100644 index 0000000000..e8c0230ee9 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.util.MockFlowFile; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class ITDeleteAzureDataLakeStorage extends AbstractAzureBlobStorageIT { + + @Override + protected Class getProcessorClass() { + return DeleteAzureDataLakeStorage.class; + } + + @Before + public void setUp() { + runner.setProperty(DeleteAzureDataLakeStorage.FILE, TEST_FILE_NAME); + } + + @Test + public void testDeleteFile() throws Exception { + runner.assertValid(); + runner.enqueue(new byte[0]); + runner.run(1); + + assertResult(); + } + + + private void assertResult() throws Exception { + runner.assertAllFlowFilesTransferred(DeleteAzureDataLakeStorage.REL_SUCCESS, 1); + List flowFilesForRelationship = runner.getFlowFilesForRelationship(DeleteAzureDataLakeStorage.REL_SUCCESS); + for (MockFlowFile flowFile : flowFilesForRelationship) { + flowFile.assertContentEquals("0123456789".getBytes()); + flowFile.assertAttributeEquals("azure.length", "10"); + + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java index 8c6b836def..dba8abddbc 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java @@ -1,57 +1,57 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.azure.storage; - -import org.apache.nifi.processor.Processor; -import org.apache.nifi.util.MockFlowFile; -import org.junit.Before; -import org.junit.Test; - -import java.util.List; - -public class ITPutAzureDataLakeStorage extends AbstractAzureBlobStorageIT { - - @Override - protected Class getProcessorClass() { - return PutAzureDataLakeStorage.class; - } - - @Before - public void setUp() { - runner.setProperty(PutAzureDataLakeStorage.FILE, TEST_FILE_NAME); - } - - @Test - public void testPutFile() throws Exception { - runner.assertValid(); - runner.enqueue("0123456789".getBytes()); - runner.run(); - - assertResult(); - } - - - private void assertResult() throws Exception { - runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1); - List flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS); - for (MockFlowFile flowFile : flowFilesForRelationship) { - flowFile.assertContentEquals("0123456789".getBytes()); - flowFile.assertAttributeEquals("azure.length", "10"); - } - - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.util.MockFlowFile; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class ITPutAzureDataLakeStorage extends AbstractAzureBlobStorageIT { + + @Override + protected Class getProcessorClass() { + return PutAzureDataLakeStorage.class; + } + + @Before + public void setUp() { + runner.setProperty(PutAzureDataLakeStorage.FILE, TEST_FILE_NAME); + } + + @Test + public void testPutFile() throws Exception { + runner.assertValid(); + runner.enqueue("0123456789".getBytes()); + runner.run(); + + assertResult(); + } + + + private void assertResult() throws Exception { + runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1); + List flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS); + for (MockFlowFile flowFile : flowFilesForRelationship) { + flowFile.assertContentEquals("0123456789".getBytes()); + flowFile.assertAttributeEquals("azure.length", "10"); + } + + } +}