NIFI-7334 Adding FetchDataLakeStorage Processor to provide native support for Azure Data lake Gen 2 Storage.

NIFI-7334 Update to FetchDataLakeStorage Processor

This closes #4212.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
muazmaz 2020-04-14 23:36:19 -07:00 committed by Peter Turcsanyi
parent 8e3f42051f
commit 58118cf904
6 changed files with 136 additions and 4 deletions

View File

@ -107,7 +107,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.defaultValue("nifi.${uuid}")
.defaultValue("${azure.filename}")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(

View File

@ -35,7 +35,7 @@ import com.azure.storage.file.datalake.DataLakeServiceClient;
import java.util.concurrent.TimeUnit;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class})
@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
@CapabilityDescription("Deletes the provided file from Azure Data Lake Storage")
@InputRequirement(Requirement.INPUT_REQUIRED)
public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class})
@CapabilityDescription("Fetch the provided file from Azure Data Lake Storage")
@InputRequirement(Requirement.INPUT_REQUIRED)
public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
try {
final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
flowFile = session.write(flowFile, os -> fileClient.read(os));
session.getProvenanceReporter().modifyContent(flowFile);
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
} catch (Exception e) {
getLogger().error("Failure to fetch file from Azure Data Lake Storage, due to {}", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -42,7 +42,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({DeleteAzureDataLakeStorage.class})
@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
@WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"),

View File

@ -22,4 +22,5 @@ org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage
org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage
org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class ITFetchAzureDataLakeStorage extends AbstractAzureBlobStorageIT {
@Override
protected Class<? extends Processor> getProcessorClass() {
return FetchAzureDataLakeStorage.class;
}
@Before
public void setUp() throws Exception {
runner.setProperty(PutAzureDataLakeStorage.FILE, TEST_FILE_NAME);
}
@Test
public void testFetchFile() throws Exception {
runner.assertValid();
runner.enqueue(new byte[0]);
runner.run();
assertResult();
}
private void assertResult() throws Exception {
runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) {
flowFile.assertContentEquals("0123456789".getBytes());
flowFile.assertAttributeEquals("azure.length", "10");
}
}
}