diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java index 83c5c14bb6..40d276c815 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java @@ -107,7 +107,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) - .defaultValue("nifi.${uuid}") + .defaultValue("${azure.filename}") .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description( diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java index 5cbf7f0789..8403841104 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java @@ -35,7 +35,7 @@ import com.azure.storage.file.datalake.DataLakeServiceClient; import java.util.concurrent.TimeUnit; @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) -@SeeAlso({PutAzureDataLakeStorage.class}) +@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class}) @CapabilityDescription("Deletes the provided file from Azure Data Lake Storage") @InputRequirement(Requirement.INPUT_REQUIRED) public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java new file mode 100644 index 0000000000..d3068fb97f --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; + +import com.azure.storage.file.datalake.DataLakeDirectoryClient; +import com.azure.storage.file.datalake.DataLakeFileSystemClient; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.DataLakeFileClient; + + +@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) +@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class}) +@CapabilityDescription("Fetch the provided file from Azure Data Lake Storage") +@InputRequirement(Requirement.INPUT_REQUIRED) +public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + + try { + final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); + final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); + final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue(); + final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); + final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem); + final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory); + final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName); + + flowFile = session.write(flowFile, os -> fileClient.read(os)); + session.getProvenanceReporter().modifyContent(flowFile); + session.transfer(flowFile, REL_SUCCESS); + + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis); + } catch (Exception e) { + getLogger().error("Failure to fetch file from Azure Data Lake Storage, due to {}", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index f67c98efaa..b59c366f80 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -42,7 +42,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) -@SeeAlso({DeleteAzureDataLakeStorage.class}) +@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class}) @CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2") @WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"), @WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"), diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 9f8417df01..6e0933064c 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -22,4 +22,5 @@ org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage -org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage \ No newline at end of file +org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage +org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java new file mode 100644 index 0000000000..93a414f4fe --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.util.MockFlowFile; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class ITFetchAzureDataLakeStorage extends AbstractAzureBlobStorageIT { + + @Override + protected Class getProcessorClass() { + return FetchAzureDataLakeStorage.class; + } + + @Before + public void setUp() throws Exception { + runner.setProperty(PutAzureDataLakeStorage.FILE, TEST_FILE_NAME); + + } + + @Test + public void testFetchFile() throws Exception { + runner.assertValid(); + runner.enqueue(new byte[0]); + runner.run(); + + assertResult(); + } + + private void assertResult() throws Exception { + runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1); + List flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS); + for (MockFlowFile flowFile : flowFilesForRelationship) { + flowFile.assertContentEquals("0123456789".getBytes()); + flowFile.assertAttributeEquals("azure.length", "10"); + } + } +}