diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 66db49a690..5bc23f9570 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -62,6 +62,7 @@ public class MockProcessSession implements ProcessSession { private final MockFlowFileQueue processorQueue; private final Set beingProcessed = new HashSet<>(); private final List penalized = new ArrayList<>(); + private final Processor processor; private final Map currentVersions = new HashMap<>(); private final Map originalVersions = new HashMap<>(); @@ -77,6 +78,7 @@ public class MockProcessSession implements ProcessSession { private int removedCount = 0; public MockProcessSession(final SharedSessionState sharedState, final Processor processor) { + this.processor = processor; this.sharedState = sharedState; this.processorQueue = sharedState.getFlowFileQueue(); provenanceReporter = new MockProvenanceReporter(this, sharedState, processor.getIdentifier(), processor.getClass().getSimpleName()); @@ -650,6 +652,9 @@ public class MockProcessSession implements ProcessSession { transfer(flowFile); return; } + if(!processor.getRelationships().contains(relationship)){ + throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known"); + } validateState(flowFile); List list = transferMap.get(relationship); @@ -668,6 +673,9 @@ public class MockProcessSession implements ProcessSession { transfer(flowFiles); return; } + if(!processor.getRelationships().contains(relationship)){ + throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known"); + } for (final FlowFile flowFile : flowFiles) { validateState(flowFile); diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java index 2d883514b8..e16afb37ac 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessSession.java @@ -64,6 +64,27 @@ public class TestMockProcessSession { } } + @Test + public void testTransferUnknownRelationship() { + final Processor processor = new PoorlyBehavedProcessor(); + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, new AtomicLong(0L)), processor); + FlowFile ff1 = session.createFlowFile("hello, world".getBytes()); + final Relationship fakeRel = new Relationship.Builder().name("FAKE").build(); + try { + session.transfer(ff1, fakeRel); + Assert.fail("Should have thrown IllegalArgumentException"); + } catch (final IllegalArgumentException ie) { + + } + try { + session.transfer(Collections.singleton(ff1), fakeRel); + Assert.fail("Should have thrown IllegalArgumentException"); + } catch (final IllegalArgumentException ie) { + + } + + } + protected static class PoorlyBehavedProcessor extends AbstractProcessor { private static final Relationship REL_FAILURE = new Relationship.Builder() diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 6ea5afeebf..b5da0720ed 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1562,9 +1562,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void transfer(final FlowFile flowFile, final Relationship relationship) { validateRecordState(flowFile); - final StandardRepositoryRecord record = records.get(flowFile); - record.setTransferRelationship(relationship); - updateLastQueuedDate(record); final int numDestinations = context.getConnections(relationship).size(); final int multiplier = Math.max(1, numDestinations); @@ -1575,7 +1572,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE autoTerminated = true; } else if (numDestinations == 0 && relationship == Relationship.SELF) { selfRelationship = true; + } else if (numDestinations == 0) { + // the relationship specified is not known in this session/context + throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known"); } + final StandardRepositoryRecord record = records.get(flowFile); + record.setTransferRelationship(relationship); + updateLastQueuedDate(record); if (autoTerminated) { removedCount += multiplier; @@ -1616,6 +1619,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE autoTerminated = true; } else if (numDestinations == 0 && relationship == Relationship.SELF) { selfRelationship = true; + } else if (numDestinations == 0) { + // the relationship specified is not known in this session/context + throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known"); } final int multiplier = Math.max(1, numDestinations); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 55c9f5a5fe..23a170e34c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -97,6 +97,7 @@ public class TestStandardProcessSession { private ProvenanceEventRepository provenanceRepo; private MockFlowFileRepository flowFileRepo; + private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build(); @After public void cleanup() { @@ -187,11 +188,14 @@ public class TestStandardProcessSession { final Relationship relationship = (Relationship) arguments[0]; if (relationship == Relationship.SELF) { return Collections.emptySet(); - } else { + } else if (relationship == FAKE_RELATIONSHIP || relationship.equals(FAKE_RELATIONSHIP) ){ + return null; + }else { return new HashSet<>(connList); } } }).when(connectable).getConnections(Mockito.any(Relationship.class)); + when(connectable.getConnections()).thenReturn(new HashSet<>(connList)); contentRepo = new MockContentRepository(); @@ -1250,6 +1254,34 @@ public class TestStandardProcessSession { session.commit(); } + @Test + public void testTransferUnknownRelationship() { + final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder() + .id(1L) + .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") + .entryDate(System.currentTimeMillis()) + .build(); + + flowFileQueue.put(flowFileRecord1); + + FlowFile ff1 = session.get(); + ff1 = session.putAttribute(ff1, "index", "1"); + + try { + session.transfer(ff1, FAKE_RELATIONSHIP); + Assert.fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException iae) { + } + + try { + final Collection collection = new HashSet<>(); + collection.add(ff1); + session.transfer(collection, FAKE_RELATIONSHIP); + Assert.fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException iae) { + } + } + private static class MockFlowFileRepository implements FlowFileRepository { private boolean failOnUpdate = false; private final AtomicLong idGenerator = new AtomicLong(0L); diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java index ad41850836..8eb8ee0e08 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java @@ -16,21 +16,7 @@ */ package org.apache.nifi.processors.script; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.StringUtils; - -import javax.script.ScriptEngine; -import javax.script.ScriptEngineFactory; -import javax.script.ScriptEngineManager; -import javax.script.ScriptException; import java.io.File; import java.net.MalformedURLException; import java.net.URL; @@ -213,6 +199,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc /** * Performs common setup operations when the processor is scheduled to run. This method assumes the member * variables associated with properties have been filled. + * @param numberOfScriptEngines number of engines to setup */ public void setup(int numberOfScriptEngines) { @@ -231,6 +218,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc * javax.script APIs. Then, if any script configurators have been defined for this engine, their init() method is * called, and the configurator is saved for future calls. * + * @param numberOfScriptEngines number of engines to setup * @see org.apache.nifi.processors.script.ScriptEngineConfigurator */ protected void setupEngines(int numberOfScriptEngines) { @@ -316,6 +304,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc * If the parameter is null or empty, this class's classloader is returned * * @param modules An array of URLs to add to the class loader + * @return ClassLoader for script engine */ protected ClassLoader createScriptEngineModuleClassLoader(URL[] modules) { ClassLoader thisClassLoader = this.getClass().getClassLoader(); diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index b4a6c0d524..dd3ae585cd 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -63,16 +63,14 @@ import org.apache.nifi.processor.util.StandardValidators; public class InvokeScriptedProcessor extends AbstractScriptProcessor { private final AtomicReference processor = new AtomicReference<>(); - private final AtomicReference> validationResults = - new AtomicReference<>((Collection) new ArrayList()); + private final AtomicReference> validationResults = new AtomicReference<>(new ArrayList<>()); private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true); private ScriptEngine scriptEngine = null; /** - * Returns the valid relationships for this processor. SUCCESS and FAILURE are always returned, and if the script - * processor has defined additional relationships, those will be added as well. + * Returns the valid relationships for this processor as supplied by the script itself. * * @return a Set of Relationships supported by this processor */ @@ -82,7 +80,10 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { final Processor instance = processor.get(); if (instance != null) { try { - relationships.addAll(instance.getRelationships()); + final Set rels = instance.getRelationships(); + if(rels != null && !rels.isEmpty()){ + relationships.addAll(rels); + } } catch (final Throwable t) { final ComponentLog logger = getLogger(); final String message = "Unable to get relationships from scripted Processor: " + t; @@ -92,10 +93,6 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { logger.error(message, t); } } - } else { - // Return defaults for now - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); } return Collections.unmodifiableSet(relationships); } @@ -493,6 +490,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { } @OnStopped + @Override public void stop() { super.stop(); processor.set(null); diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java index 2dc700dd46..e0007fe185 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java @@ -58,8 +58,8 @@ public class TestInvokeGroovy extends BaseScriptTest { runner.enqueue("test content".getBytes(StandardCharsets.UTF_8)); runner.run(); - runner.assertAllFlowFilesTransferred("success", 1); - final List result = runner.getFlowFilesForRelationship("success"); + runner.assertAllFlowFilesTransferred("test", 1); + final List result = runner.getFlowFilesForRelationship("test"); result.get(0).assertAttributeEquals("from-content", "test content"); } @@ -166,8 +166,8 @@ public class TestInvokeGroovy extends BaseScriptTest { runner.enqueue("test content".getBytes(StandardCharsets.UTF_8)); runner.run(); - runner.assertAllFlowFilesTransferred(InvokeScriptedProcessor.REL_FAILURE, 1); - final List result = runner.getFlowFilesForRelationship(InvokeScriptedProcessor.REL_FAILURE); + runner.assertAllFlowFilesTransferred("FAILURE", 1); + final List result = runner.getFlowFilesForRelationship("FAILURE"); assertFalse(result.isEmpty()); } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy index 90d7d6c162..3830616a8e 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testScriptRoutesToFailure.groovy @@ -18,13 +18,18 @@ class testScriptRoutesToFailure implements Processor { def ComponentLog log + def REL_FAILURE = new Relationship.Builder() + .name("FAILURE") + .description("A FAILURE relationship") + .build(); + @Override void initialize(ProcessorInitializationContext context) { } @Override Set getRelationships() { - return [] as Set + return [REL_FAILURE] as Set } @Override @@ -32,7 +37,7 @@ class testScriptRoutesToFailure implements Processor { def session = sessionFactory.createSession() def flowFile = session.get() if(!flowFile) return - session.transfer(flowFile, InvokeScriptedProcessor.REL_FAILURE) + session.transfer(flowFile, REL_FAILURE) } @Override diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy index 9778f87f42..414b9dec0b 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_reader.groovy @@ -43,7 +43,7 @@ class GroovyProcessor implements Processor { } flowFile = session.putAttribute(flowFile, "from-content", "test content") // transfer - session.transfer(flowFile, InvokeScriptedProcessor.REL_SUCCESS) + session.transfer(flowFile, REL_TEST) session.commit() }