From d51b306731c367818be4b9e7bc1869c109376b61 Mon Sep 17 00:00:00 2001 From: Pierre Villard <pierre.villard.fr@gmail.com> Date: Mon, 18 Nov 2024 17:58:42 +0100 Subject: [PATCH] NIFI-14005 Add processor GetFileResource processor This closes #9519 Signed-off-by: Lucas Ottersbach <ottersbach@apache.org> --- .../processors/standard/GetFileResource.java | 150 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../standard/TestGetFileResource.java | 59 +++++++ 3 files changed, 210 insertions(+) create mode 100644 nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileResource.java create mode 100644 nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetFileResource.java diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileResource.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileResource.java new file mode 100644 index 0000000000..e04029621e --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileResource.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.behavior.Restriction; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.RequiredPermission; +import org.apache.nifi.components.resource.ResourceCardinality; +import org.apache.nifi.components.resource.ResourceType; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.stream.io.StreamUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Tags({ "test", "file", "generate", "load" }) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@CapabilityDescription(""" + This processor creates FlowFiles with the content of the configured File Resource. GetFileResource + is useful for load testing, configuration, and simulation. + """) +@DynamicProperty( + name = "Generated FlowFile attribute name", value = "Generated FlowFile attribute value", expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT, + description = "Specifies an attribute on generated FlowFiles defined by the Dynamic Property's key and value." +) +@WritesAttributes( + { + @WritesAttribute(attribute = "mime.type", description = "Sets the MIME type of the output if the 'MIME Type' property is set"), + @WritesAttribute(attribute = "Dynamic property key", description = "Value for the corresponding dynamic property, if any is set") + } +) +@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") +@Restricted( + restrictions = { + @Restriction(requiredPermission = RequiredPermission.READ_FILESYSTEM, explanation = "Provides operator the ability to read from any file that NiFi has access to."), + @Restriction(requiredPermission = RequiredPermission.REFERENCE_REMOTE_RESOURCES, explanation = "File Resource can reference resources over HTTP/HTTPS") + } +) +public class GetFileResource extends AbstractProcessor { + + public static final PropertyDescriptor FILE_RESOURCE = new PropertyDescriptor.Builder() + .name("File Resource") + .description("Location of the File Resource (Local File or URL). This file will be used as content of the generated FlowFiles.") + .required(true) + .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.URL) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor MIME_TYPE = new PropertyDescriptor.Builder() + .name("MIME Type") + .description("Specifies the value to set for the [mime.type] attribute.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = List.of(FILE_RESOURCE, MIME_TYPE); + + public static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .build(); + + private static final Set<Relationship> RELATIONSHIPS = Set.of(SUCCESS); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .description("Specifies an attribute on generated FlowFiles defined by the Dynamic Property's key and value.") + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .dynamic(true) + .build(); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final Map<PropertyDescriptor, String> processorProperties = context.getProperties(); + final Map<String, String> generatedAttributes = new HashMap<String, String>(); + for (final Map.Entry<PropertyDescriptor, String> entry : processorProperties.entrySet()) { + final PropertyDescriptor property = entry.getKey(); + if (property.isDynamic()) { + final String dynamicValue = context.getProperty(property).evaluateAttributeExpressions().getValue(); + generatedAttributes.put(property.getName(), dynamicValue); + } + } + + if (context.getProperty(MIME_TYPE).isSet()) { + generatedAttributes.put(CoreAttributes.MIME_TYPE.key(), context.getProperty(MIME_TYPE).getValue()); + } + + FlowFile flowFile = session.create(); + + try (final InputStream inputStream = context.getProperty(FILE_RESOURCE).asResource().read()) { + flowFile = session.write(flowFile, out -> StreamUtils.copy(inputStream, out)); + } catch (IOException e) { + getLogger().error("Could not create FlowFile from Resource [{}]", context.getProperty(FILE_RESOURCE).getValue(), e); + } + + flowFile = session.putAllAttributes(flowFile, generatedAttributes); + session.getProvenanceReporter().create(flowFile); + session.transfer(flowFile, SUCCESS); + } +} diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index e547adbba1..6d74dc3669 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -52,6 +52,7 @@ org.apache.nifi.processors.standard.GenerateRecord org.apache.nifi.processors.standard.GenerateFlowFile org.apache.nifi.processors.standard.GenerateTableFetch org.apache.nifi.processors.standard.GetFile +org.apache.nifi.processors.standard.GetFileResource org.apache.nifi.processors.standard.GetFTP org.apache.nifi.processors.standard.GetSFTP org.apache.nifi.processors.standard.HandleHttpRequest diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetFileResource.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetFileResource.java new file mode 100644 index 0000000000..574946a2ac --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetFileResource.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; + +/** + * Unit tests for the GetFileResource processor. + */ +public class TestGetFileResource { + + @Test + public void testInvalidConfiguration() { + final TestRunner runner = TestRunners.newTestRunner(new GetFileResource()); + runner.setProperty(GetFileResource.FILE_RESOURCE, ""); + runner.assertNotValid(); + } + + @Test + public void testGetFileResource() throws IOException { + final String filePath = "src/test/resources/TestCountText/jabberwocky.txt"; + final TestRunner runner = TestRunners.newTestRunner(new GetFileResource()); + + runner.setProperty(GetFileResource.FILE_RESOURCE, filePath); + runner.setProperty(GetFileResource.MIME_TYPE, "text/plain"); + runner.setProperty("foo", "foo"); + runner.assertValid(); + + runner.run(); + + runner.assertTransferCount(GenerateFlowFile.SUCCESS, 1); + final MockFlowFile ff = runner.getFlowFilesForRelationship(GenerateFlowFile.SUCCESS).get(0); + ff.assertContentEquals(new File(filePath)); + ff.assertAttributeEquals("foo", "foo"); + ff.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain"); + } + +} \ No newline at end of file