diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java index 7b1d77a2ba..d2d22d8b7d 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -63,13 +64,14 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationContext; @EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) @Tags({"script", "groovy", "groovyx"}) @CapabilityDescription( "Experimental Extended Groovy script processor. The script is responsible for " + "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by " + "the script. If the handling is incomplete or incorrect, the session will be rolled back.") @Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") -@SeeAlso({}) +@SeeAlso(classNames={"org.apache.nifi.processors.script.ExecuteScript"}) @DynamicProperty(name = "A script engine property to update", value = "The value to set it to", supportsExpressionLanguage = true, @@ -90,20 +92,20 @@ public class ExecuteGroovyScript extends AbstractProcessor { .description("Body of script to execute. Only one of Script File or Script Body may be used").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false) .build(); - public static String[] VALID_BOOLEANS = {"true", "false"}; - public static final PropertyDescriptor REQUIRE_FLOW = new PropertyDescriptor.Builder().name("Requires flow file") - .description("If `true` then flowFile variable initialized and validated. So developer don't need to do flowFile = session.get(). If `false` the flowFile variable not initialized.") - .required(true).expressionLanguageSupported(false).allowableValues(VALID_BOOLEANS).defaultValue("false").build(); - public static String[] VALID_FAIL_STRATEGY = {"rollback", "transfer to failure"}; public static final PropertyDescriptor FAIL_STRATEGY = new PropertyDescriptor.Builder() .name("Failure strategy") - .description("If `transfer to failure` used then all flowFiles received from incoming queues in this session " - +"in case of exception will be transferred to `failure` relationship with additional attributes set: ERROR_MESSAGE and ERROR_STACKTRACE.") + .description("What to do with unhandled exceptions. If you want to manage exception by code then keep the default value `rollback`." + +" If `transfer to failure` selected and unhandled exception occured then all flowFiles received from incoming queues in this session" + +" will be transferred to `failure` relationship with additional attributes set: ERROR_MESSAGE and ERROR_STACKTRACE." + +" If `rollback` selected and unhandled exception occured then all flowFiles received from incoming queues will be returned back" + +" with penalize flag." + +" If the processor has no incoming connections then this parameter has no affect." + ) .required(true).expressionLanguageSupported(false).allowableValues(VALID_FAIL_STRATEGY).defaultValue(VALID_FAIL_STRATEGY[0]).build(); public static final PropertyDescriptor ADD_CLASSPATH = new PropertyDescriptor.Builder().name("Additional classpath").required(false) - .description("Classpath list separated by semicolon. You can use masks like `*`, `*.jar` in file name. Please avoid using this parameter because of deploy complexity :)") + .description("Classpath list separated by semicolon. You can use masks like `*`, `*.jar` in file name.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).build(); public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build(); @@ -127,7 +129,6 @@ public class ExecuteGroovyScript extends AbstractProcessor { List descriptors = new ArrayList<>(); descriptors.add(SCRIPT_FILE); descriptors.add(SCRIPT_BODY); - descriptors.add(REQUIRE_FLOW); descriptors.add(FAIL_STRATEGY); descriptors.add(ADD_CLASSPATH); this.descriptors = Collections.unmodifiableList(descriptors); @@ -366,24 +367,11 @@ public class ExecuteGroovyScript extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession _session) throws ProcessException { - String requireFlow = context.getProperty(REQUIRE_FLOW).getValue(); boolean toFailureOnError = VALID_FAIL_STRATEGY[1].equals(context.getProperty(FAIL_STRATEGY).getValue()); //create wrapped session to control list of newly created and files got from this session. //so transfer original input to failure will be possible GroovyProcessSessionWrap session = new GroovyProcessSessionWrap(_session, toFailureOnError); - FlowFile flowFile = null; - - if ("true".equals(requireFlow)) { - flowFile = session.get(); - if (flowFile == null) { - return; - } - } else { - if (toFailureOnError) { - throw new ProcessException("The parameter `" + REQUIRE_FLOW.getName() + "` must be true when `" + FAIL_STRATEGY.getName() + "` is " + VALID_FAIL_STRATEGY[1]); - } - } HashMap CTL = new HashMap() { @Override @@ -424,9 +412,6 @@ public class ExecuteGroovyScript extends AbstractProcessor { bindings.put("REL_SUCCESS", REL_SUCCESS); bindings.put("REL_FAILURE", REL_FAILURE); bindings.put("CTL", CTL); - if (flowFile != null) { - bindings.put("flowFile", flowFile); - } script.run(); bindings.clear(); diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java index fe11d2a4c9..8351e75aca 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java @@ -130,8 +130,7 @@ public class ExecuteGroovyScriptTest { */ @Test public void testReadFlowFileContentAndStoreInFlowFileAttribute() throws Exception { - runner.setProperty(proc.SCRIPT_BODY, "flowFile.testAttr = flowFile.read().getText('UTF-8'); REL_SUCCESS << flowFile;"); - runner.setProperty(proc.REQUIRE_FLOW, "true"); + runner.setProperty(proc.SCRIPT_BODY, "def flowFile = session.get(); if(!flowFile)return; flowFile.testAttr = flowFile.read().getText('UTF-8'); REL_SUCCESS << flowFile;"); //runner.setProperty(proc.FAIL_STRATEGY, "rollback"); runner.assertValid(); @@ -146,7 +145,6 @@ public class ExecuteGroovyScriptTest { @Test public void test_onTrigger_groovy() throws Exception { runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTrigger.groovy"); - runner.setProperty(proc.REQUIRE_FLOW, "false"); //runner.setProperty(proc.FAIL_STRATEGY, "rollback"); runner.assertValid(); @@ -160,7 +158,6 @@ public class ExecuteGroovyScriptTest { @Test public void test_onTriggerX_groovy() throws Exception { runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTriggerX.groovy"); - runner.setProperty(proc.REQUIRE_FLOW, "true"); //runner.setProperty(proc.FAIL_STRATEGY, "rollback"); runner.assertValid(); @@ -174,7 +171,6 @@ public class ExecuteGroovyScriptTest { @Test public void test_onTrigger_changeContent_groovy() throws Exception { runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTrigger_changeContent.groovy"); - runner.setProperty(proc.REQUIRE_FLOW, "false"); //runner.setProperty(proc.FAIL_STRATEGY, "rollback"); runner.assertValid(); @@ -191,7 +187,6 @@ public class ExecuteGroovyScriptTest { @Test public void test_onTrigger_changeContentX_groovy() throws Exception { runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTrigger_changeContentX.groovy"); - runner.setProperty(proc.REQUIRE_FLOW, "true"); //runner.setProperty(proc.FAIL_STRATEGY, "rollback"); runner.assertValid(); @@ -208,7 +203,6 @@ public class ExecuteGroovyScriptTest { @Test public void test_no_input_groovy() throws Exception { runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_no_input.groovy"); - runner.setProperty(proc.REQUIRE_FLOW, "false"); //runner.setProperty(proc.FAIL_STRATEGY, "rollback"); runner.assertValid(); runner.run(); @@ -222,22 +216,19 @@ public class ExecuteGroovyScriptTest { @Test public void test_good_script() throws Exception { - runner.setProperty(proc.SCRIPT_BODY, " REL_SUCCESS << flowFile "); - runner.setProperty(proc.REQUIRE_FLOW, "true"); + runner.setProperty(proc.SCRIPT_BODY, " def ff = session.get(); if(!ff)return; REL_SUCCESS << ff "); runner.assertValid(); } @Test public void test_bad_script() throws Exception { runner.setProperty(proc.SCRIPT_BODY, " { { "); - runner.setProperty(proc.REQUIRE_FLOW, "true"); runner.assertNotValid(); } @Test public void test_sql_01_select() throws Exception { runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_01_select.groovy"); - runner.setProperty(proc.REQUIRE_FLOW, "false"); runner.setProperty("CTL.sql", "dbcp"); runner.assertValid(); @@ -253,7 +244,6 @@ public class ExecuteGroovyScriptTest { @Test public void test_sql_02_blob_write() throws Exception { runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_02_blob_write.groovy"); - runner.setProperty(proc.REQUIRE_FLOW, "true"); runner.setProperty("CTL.sql", "dbcp"); //runner.setProperty("ID", "0"); runner.assertValid(); @@ -273,7 +263,6 @@ public class ExecuteGroovyScriptTest { public void test_sql_03_blob_read() throws Exception { //read blob from database written at previous step and write to flow file runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_03_blob_read.groovy"); - runner.setProperty(proc.REQUIRE_FLOW, "false"); runner.setProperty("CTL.sql", "dbcp"); runner.setProperty("ID", "0"); runner.setValidateExpressionUsage(false); @@ -291,7 +280,6 @@ public class ExecuteGroovyScriptTest { public void test_sql_04_insert_and_json() throws Exception { //read blob from database written at previous step and write to flow file runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_04_insert_and_json.groovy"); - runner.setProperty(proc.REQUIRE_FLOW, "true"); runner.setProperty("CTL.sql", "dbcp"); runner.setValidateExpressionUsage(false); runner.assertValid(); @@ -310,7 +298,6 @@ public class ExecuteGroovyScriptTest { @Test public void test_filter_01() throws Exception { runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get{it.FILTER=='3'}; if(!ff)return; REL_SUCCESS << ff;"); - runner.setProperty(proc.REQUIRE_FLOW, "false"); //runner.setProperty(proc.FAIL_STRATEGY, "rollback"); runner.assertValid(); @@ -330,8 +317,7 @@ public class ExecuteGroovyScriptTest { @Test public void test_read_01() throws Exception { - runner.setProperty(proc.SCRIPT_BODY, "assert flowFile.read().getText('UTF-8')=='1234'; REL_SUCCESS << flowFile;"); - runner.setProperty(proc.REQUIRE_FLOW, "true"); + runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get(); if(!ff)return; assert ff.read().getText('UTF-8')=='1234'; REL_SUCCESS << ff; "); runner.assertValid(); @@ -343,8 +329,7 @@ public class ExecuteGroovyScriptTest { @Test public void test_read_02() throws Exception { - runner.setProperty(proc.SCRIPT_BODY, "flowFile.read{s-> assert s.getText('UTF-8')=='1234' }; REL_SUCCESS << flowFile;"); - runner.setProperty(proc.REQUIRE_FLOW, "true"); + runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get(); if(!ff)return; ff.read{s-> assert s.getText('UTF-8')=='1234' }; REL_SUCCESS << ff; "); runner.assertValid(); @@ -356,8 +341,7 @@ public class ExecuteGroovyScriptTest { @Test public void test_read_03() throws Exception { - runner.setProperty(proc.SCRIPT_BODY, "flowFile.read('UTF-8'){r-> assert r.getText()=='1234' }; REL_SUCCESS << flowFile;"); - runner.setProperty(proc.REQUIRE_FLOW, "true"); + runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get(); if(!ff)return; ff.read('UTF-8'){r-> assert r.getText()=='1234' }; REL_SUCCESS << ff; "); runner.assertValid(); diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_onTriggerX.groovy b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_onTriggerX.groovy index 68746ade86..4186c6c9c2 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_onTriggerX.groovy +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_onTriggerX.groovy @@ -14,5 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +def flowFile = session.get() +if(!flowFile)return flowFile."from-content" = "test content" REL_SUCCESS << flowFile diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_onTrigger_changeContentX.groovy b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_onTrigger_changeContentX.groovy index 28193cd973..70fc10d339 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_onTrigger_changeContentX.groovy +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_onTrigger_changeContentX.groovy @@ -14,6 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +def flowFile = session.get() +if(!flowFile)return def selectedColumns = '' flowFile.write{inputStream, outputStream-> String[] header = null diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_02_blob_write.groovy b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_02_blob_write.groovy index 687de448fb..28563e52bd 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_02_blob_write.groovy +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_02_blob_write.groovy @@ -17,8 +17,8 @@ import groovy.sql.Sql -//the next line not needed because require flowFile property set to true -//def flowFile = session.get() +def flowFile = session.get() +if(!flowFile)return //write content of the flow file into database blob flowFile.read{ rawIn-> diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_04_insert_and_json.groovy b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_04_insert_and_json.groovy index 42f6d698f8..002438fe1c 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_04_insert_and_json.groovy +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_04_insert_and_json.groovy @@ -30,6 +30,9 @@ import groovy.json.JsonOutput ... ] */ +def flowFile = session.get() +if(!flowFile)return + def outFiles = [] //list for new flow files def rows = new JsonSlurper().parse( flowFile.read() )