diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 8cdcd6bc0c..4bc2b65d0c 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -419,6 +419,11 @@ public class StandardProcessorTestRunner implements TestRunner { context.assertValid(); } + @Override + public boolean isValid() { + return context.isValid(); + } + @Override public void assertNotValid() { Assertions.assertFalse(context.isValid(), "Processor appears to be valid but expected it to be invalid"); diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index 84bb37140d..52b5f843a3 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -394,6 +394,8 @@ public interface TestRunner { */ void assertValid(); + boolean isValid(); + /** * Assert that the currently configured set of properties/annotation data * are NOT valid diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java index 846adb64c4..2f03504119 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-py4j-integration-tests/src/test/java/org.apache.nifi.py4j/PythonControllerInteractionIT.java @@ -19,15 +19,14 @@ package org.apache.nifi.py4j; import org.apache.nifi.components.AsyncLoadedProcessor; import org.apache.nifi.components.AsyncLoadedProcessor.LoadState; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.mock.MockProcessorInitializationContext; -import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.python.ControllerServiceTypeLookup; import org.apache.nifi.python.PythonBridge; import org.apache.nifi.python.PythonBridgeInitializationContext; @@ -40,7 +39,6 @@ import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.MockPropertyValue; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.AfterAll; @@ -48,9 +46,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.File; import java.io.FileOutputStream; @@ -66,14 +61,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; public class PythonControllerInteractionIT { private static PythonBridge bridge; @@ -256,6 +250,7 @@ public class PythonControllerInteractionIT { runner.enqueue("name, number\nJohn Doe, 500"); // Trigger the processor + waitForValid(runner); runner.run(); runner.assertTransferCount("original", 1); runner.assertTransferCount("success", 1); @@ -284,6 +279,7 @@ public class PythonControllerInteractionIT { runner.enqueue(""); // Trigger the processor + waitForValid(runner); runner.run(); runner.assertTransferCount("original", 1); runner.assertTransferCount("success", 1); @@ -365,6 +361,7 @@ public class PythonControllerInteractionIT { runner.enqueue(""); // Trigger the processor + waitForValid(runner); runner.run(); runner.assertTransferCount("original", 1); runner.assertTransferCount("success", 1); @@ -432,6 +429,7 @@ public class PythonControllerInteractionIT { runnerV1.enqueue(""); // Trigger the processor + waitForValid(runnerV1); runnerV1.run(); runnerV1.assertTransferCount("success", 1); runnerV1.assertTransferCount("original", 1); @@ -443,12 +441,29 @@ public class PythonControllerInteractionIT { runnerV2.enqueue(""); // Trigger the processor + waitForValid(runnerV2); runnerV2.run(); runnerV2.assertTransferCount("success", 1); runnerV2.assertTransferCount("original", 1); runnerV2.getFlowFilesForRelationship("success").get(0).assertContentEquals("Hello, World 2"); } + private void waitForValid(final TestRunner runner) { + final long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60L); + while (System.currentTimeMillis() < maxTime) { + if (runner.isValid()) { + return; + } + + try { + Thread.sleep(10L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for processor to be valid"); + } + } + } + @Test public void testRecordTransformWithDynamicProperties() throws InitializationException { // Create a SetRecordField Processor @@ -469,22 +484,6 @@ public class PythonControllerInteractionIT { [{"name":"Jane Doe","number":"8"}]"""); } - private ProcessContext createContext(final Map propertyValues) { - final ProcessContext context = Mockito.mock(ProcessContext.class); - - when(context.getProperties()).thenReturn(propertyValues); - when(context.getProperty(any(String.class))).thenAnswer(new Answer<>() { - @Override - public Object answer(final InvocationOnMock invocationOnMock) { - final String name = invocationOnMock.getArgument(0, String.class); - final PropertyDescriptor descriptor = new PropertyDescriptor.Builder().name(name).build(); - final String stringValue = propertyValues.get(descriptor); - return new MockPropertyValue(stringValue); - } - }); - - return context; - } private TestRunner createRecordTransformRunner(final String type) throws InitializationException { final Processor processor = createProcessor("SetRecordField"); @@ -524,6 +523,29 @@ public class PythonControllerInteractionIT { } + @Test + public void testCustomRelationships() { + final FlowFileTransformProxy processor = createFlowFileTransform("RouteFlowFile"); + final TestRunner runner = TestRunners.newTestRunner(processor); + + final Set relationships = runner.getProcessor().getRelationships(); + assertEquals(4, relationships.size()); + assertTrue(relationships.stream().anyMatch(rel -> rel.getName().equals("small"))); + assertTrue(relationships.stream().anyMatch(rel -> rel.getName().equals("large"))); + assertTrue(relationships.stream().anyMatch(rel -> rel.getName().equals("original"))); + assertTrue(relationships.stream().anyMatch(rel -> rel.getName().equals("failure"))); + + runner.enqueue(new byte[25]); + runner.enqueue(new byte[75 * 1024]); + runner.run(2); + + runner.assertTransferCount("original", 2); + runner.assertTransferCount("small", 1); + runner.assertTransferCount("large", 1); + runner.assertTransferCount("failure", 0); + } + + private RecordSchema createSimpleRecordSchema(final String... fieldNames) { return createSimpleRecordSchema(Arrays.asList(fieldNames)); } diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/relationship.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/relationship.py index 6985508754..bbaf6ac0e9 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/relationship.py +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-extension-api/src/main/python/src/nifiapi/relationship.py @@ -19,7 +19,7 @@ class Relationship: self.description = description self.auto_terminated = auto_terminated - def to_java_descriptor(self, gateway): + def to_java_relationship(self, gateway): return gateway.jvm.org.apache.nifi.processor.Relationship.Builder() \ .name(self.name) \ .description(self.description) \ diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/PythonProcessorAdapter.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/PythonProcessorAdapter.py index 4029a70c5e..44d46febb8 100644 --- a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/PythonProcessorAdapter.py +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-framework/src/main/python/framework/PythonProcessorAdapter.py @@ -31,6 +31,7 @@ class PythonProcessorAdapter: class Java: implements = ['org.apache.nifi.python.processor.PythonProcessorAdapter'] + def __init__(self, gateway, processor, extension_manager, controller_service_type_lookup): self.processor = processor self.gateway = gateway @@ -42,6 +43,7 @@ class PythonProcessorAdapter: if is_method_defined(processor, 'getRelationships'): self.relationships = None + self.cached_relationships = ([], None) else: self.relationships = gateway.jvm.java.util.HashSet() success = gateway.jvm.org.apache.nifi.processor.Relationship.Builder() \ @@ -63,7 +65,20 @@ class PythonProcessorAdapter: # to call the Processor's implementation. This allows for dynamically changing the Relationships based on # configuration, etc. if self.relationships is None: - return self.processor.getRelationships() + processor_rels = self.processor.getRelationships() + + # If the relationships haven't changed, return the cached set + # This is to avoid creating a new HashSet and Java Relationship objects every time getRelationships is called, which is very expensive + if processor_rels == self.cached_relationships[0]: + return self.cached_relationships[1] + + hash_set = self.gateway.jvm.java.util.HashSet() + for rel in processor_rels: + hash_set.add(rel.to_java_relationship(self.gateway)) + + # Cache and return the results + self.cached_relationships = (processor_rels, hash_set) + return hash_set else: return self.relationships diff --git a/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/RouteFlowFile.py b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/RouteFlowFile.py new file mode 100644 index 0000000000..eb0eb6c0c6 --- /dev/null +++ b/nifi-nar-bundles/nifi-py4j-bundle/nifi-python-test-extensions/src/main/resources/extensions/RouteFlowFile.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.relationship import Relationship + +class RouteFlowFile(FlowFileTransform): + class Java: + implements = ['org.apache.nifi.python.processor.FlowFileTransform'] + class ProcessorDetails: + version = '0.0.1-SNAPSHOT' + description = "Routes a FlowFile to 'small' or 'large' based on whether the size of the flowfile exceeds 50 KB" + + REL_SMALL = Relationship(name="small", description="FlowFiles smaller than 50 KB") + REL_LARGE = Relationship(name="large", description="FlowFiles larger than 50 KB") + + def __init__(self, **kwargs): + pass + + def transform(self, context, flowFile): + size = flowFile.getSize() + if size > 50000: + return FlowFileTransformResult(relationship = "large") + else: + return FlowFileTransformResult(relationship = "small") + + def getRelationships(self): + return [self.REL_SMALL, self.REL_LARGE]