diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java index 82072f6a49..f9c2153fbb 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java @@ -305,67 +305,70 @@ public class ExecuteGroovyScript extends AbstractProcessor { } /** - * init controller services + * init SQL variables from DBCP services */ @SuppressWarnings("unchecked") - private void onInitCTL(HashMap CTL) throws SQLException { - for (Map.Entry e : (Set) CTL.entrySet()) { - if (e.getValue() instanceof DBCPService) { - DBCPService s = (DBCPService) e.getValue(); - OSql sql = new OSql(s.getConnection()); - sql.getConnection().setAutoCommit(false); - e.setValue(sql); + private void onInitSQL(HashMap SQL) throws SQLException { + for (Map.Entry e : (Set) SQL.entrySet()) { + DBCPService s = (DBCPService) e.getValue(); + OSql sql = new OSql(s.getConnection()); + //try to set autocommit to false + try { + if (sql.getConnection().getAutoCommit()) { + sql.getConnection().setAutoCommit(false); + } + } catch (Throwable ei) { + getLogger().warn("Failed to set autocommit=false for `" + e.getKey() + "`", ei); + } + e.setValue(sql); + } + } + + /** + * before commit SQL services + */ + private void onCommitSQL(HashMap SQL) throws SQLException { + for (Map.Entry e : (Set) SQL.entrySet()) { + OSql sql = (OSql) e.getValue(); + if (!sql.getConnection().getAutoCommit()) { + sql.commit(); } } } /** - * before commit controller services + * finalize SQL services. no exceptions should be thrown. */ - private void onCommitCTL(HashMap CTL) throws SQLException { - for (Map.Entry e : (Set) CTL.entrySet()) { - if (e.getValue() instanceof OSql) { - OSql sql = (OSql) e.getValue(); + private void onFinitSQL(HashMap SQL) { + for (Map.Entry e : (Set) SQL.entrySet()) { + OSql sql = (OSql) e.getValue(); + try { + if (!sql.getConnection().getAutoCommit()) { + sql.getConnection().setAutoCommit(true); //default autocommit value in nifi + } + } catch (Throwable ei) { + getLogger().warn("Failed to set autocommit=true for `" + e.getKey() + "`", ei); + } + try { + sql.close(); + sql = null; + } catch (Throwable ei) { + } + } + } + + /** + * exception SQL services + */ + private void onFailSQL(HashMap SQL) { + for (Map.Entry e : (Set) SQL.entrySet()) { + OSql sql = (OSql) e.getValue(); + try { if (!sql.getConnection().getAutoCommit()) { - sql.commit(); - } - } - } - } - - /** - * finalize controller services - */ - private void onFinitCTL(HashMap CTL) { - for (Map.Entry e : (Set) CTL.entrySet()) { - if (e.getValue() instanceof OSql) { - OSql sql = (OSql) e.getValue(); - try { - sql.getConnection().setAutoCommit(true); //default autocommit value in nifi - } catch (Throwable ei) { - } - try { - sql.close(); - sql = null; - } catch (Throwable ei) { - } - } - } - } - - /** - * exception controller services - */ - private void onFailCTL(HashMap CTL) { - for (Map.Entry e : (Set) CTL.entrySet()) { - if (e.getValue() instanceof OSql) { - OSql sql = (OSql) e.getValue(); - try { - if (!sql.getConnection().getAutoCommit()) { - sql.rollback(); - } - } catch (Throwable ei) { + sql.rollback(); } + } catch (Throwable ei) { + //the rollback error usually not important because it precessed with DML error that is really important } } } @@ -379,6 +382,7 @@ public class ExecuteGroovyScript extends AbstractProcessor { HashMap CTL = new AccessMap("CTL"); + HashMap SQL = new AccessMap("SQL"); try { Script script = getGroovyScript(); //compilation must be moved to validation @@ -393,6 +397,9 @@ public class ExecuteGroovyScript extends AbstractProcessor { //get controller service ControllerService ctl = context.getProperty(property.getKey()).asControllerService(ControllerService.class); CTL.put(property.getKey().getName().substring(4), ctl); + } else if (property.getKey().getName().startsWith("SQL.")) { + DBCPService dbcp = context.getProperty(property.getKey()).asControllerService(DBCPService.class); + SQL.put(property.getKey().getName().substring(4), dbcp); } else { // Add the dynamic property bound to its full PropertyValue to the script engine if (property.getValue() != null) { @@ -401,7 +408,7 @@ public class ExecuteGroovyScript extends AbstractProcessor { } } } - onInitCTL(CTL); + onInitSQL(SQL); bindings.put("session", session); bindings.put("context", context); @@ -409,15 +416,16 @@ public class ExecuteGroovyScript extends AbstractProcessor { bindings.put("REL_SUCCESS", REL_SUCCESS); bindings.put("REL_FAILURE", REL_FAILURE); bindings.put("CTL", CTL); + bindings.put("SQL", SQL); script.run(); bindings.clear(); - onCommitCTL(CTL); + onCommitSQL(SQL); session.commit(); } catch (Throwable t) { getLogger().error(t.toString(), t); - onFailCTL(CTL); + onFailSQL(SQL); if (toFailureOnError) { //transfer all received to failure with two new attributes: ERROR_MESSAGE and ERROR_STACKTRACE. session.revertReceivedTo(REL_FAILURE, StackTraceUtils.deepSanitize(t)); @@ -425,7 +433,7 @@ public class ExecuteGroovyScript extends AbstractProcessor { session.rollback(true); } } finally { - onFinitCTL(CTL); + onFinitSQL(SQL); } } diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java index 8351e75aca..e275177325 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java @@ -229,7 +229,7 @@ public class ExecuteGroovyScriptTest { @Test public void test_sql_01_select() throws Exception { runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_01_select.groovy"); - runner.setProperty("CTL.sql", "dbcp"); + runner.setProperty("SQL.mydb", "dbcp"); runner.assertValid(); runner.run(); @@ -244,7 +244,7 @@ public class ExecuteGroovyScriptTest { @Test public void test_sql_02_blob_write() throws Exception { runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_02_blob_write.groovy"); - runner.setProperty("CTL.sql", "dbcp"); + runner.setProperty("SQL.mydb", "dbcp"); //runner.setProperty("ID", "0"); runner.assertValid(); @@ -263,7 +263,7 @@ public class ExecuteGroovyScriptTest { public void test_sql_03_blob_read() throws Exception { //read blob from database written at previous step and write to flow file runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_03_blob_read.groovy"); - runner.setProperty("CTL.sql", "dbcp"); + runner.setProperty("SQL.mydb", "dbcp"); runner.setProperty("ID", "0"); runner.setValidateExpressionUsage(false); runner.assertValid(); @@ -280,7 +280,7 @@ public class ExecuteGroovyScriptTest { public void test_sql_04_insert_and_json() throws Exception { //read blob from database written at previous step and write to flow file runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_04_insert_and_json.groovy"); - runner.setProperty("CTL.sql", "dbcp"); + runner.setProperty("SQL.mydb", "dbcp"); runner.setValidateExpressionUsage(false); runner.assertValid(); diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_no_input.groovy b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_no_input.groovy index 60f8782ebd..1f8da076f9 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_no_input.groovy +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_no_input.groovy @@ -21,7 +21,6 @@ http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html and refactored for ExecuteGroovyScript */ -//assume you defined CTL.conn property linked to desired database connection pool def flowFile = session.create() //flowfile.write defined here: org\apache\nifi\processors\groovyx\GroovyMethods.java diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_01_select.groovy b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_01_select.groovy index 8b0cc01e19..ba24f3b440 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_01_select.groovy +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_01_select.groovy @@ -24,8 +24,8 @@ and refactored and simplified for ExecuteGroovyScript def flowFile = session.create() flowFile.write("UTF-8"){wout -> - //assume CTL.sql property is linked to desired database connection pool - CTL.sql.eachRow('select * from mytable'){ row-> + //assume SQL.mydb property is linked to desired database connection pool + SQL.mydb.eachRow('select * from mytable'){ row-> wout << row.name << '\n' } } diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_02_blob_write.groovy b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_02_blob_write.groovy index 28563e52bd..f4c0675497 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_02_blob_write.groovy +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_02_blob_write.groovy @@ -26,7 +26,7 @@ flowFile.read{ rawIn-> p_id : flowFile.ID as Long, p_data : Sql.BLOB( rawIn ), ] - assert 1==CTL.sql.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id") + assert 1==SQL.mydb.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id") } //transfer original to output REL_SUCCESS << flowFile diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_03_blob_read.groovy b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_03_blob_read.groovy index b37ff03bf0..4677740d0b 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_03_blob_read.groovy +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_03_blob_read.groovy @@ -19,7 +19,7 @@ def flowFile = session.create() //read blob into flowFile content flowFile.write{out-> //get id from property with name ID - def row = CTL.sql.firstRow("select data from mytable where id = ${ ID.value as Long }") + def row = SQL.mydb.firstRow("select data from mytable where id = ${ ID.value as Long }") assert row : "row with id=`${ID}` not found" //write blob stream to flowFile output stream out << row.data.getBinaryStream() diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_04_insert_and_json.groovy b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_04_insert_and_json.groovy index 002438fe1c..7a9752cdca 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_04_insert_and_json.groovy +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_sql_04_insert_and_json.groovy @@ -40,14 +40,14 @@ rows.each{row-> //at this point row is a map with keys corresponding to mytable column names. //build query: insert into mytable(a,b,c,...) values(:a, :b, :c, ...) //and pass row-map as an argument to this query - CTL.sql.executeInsert(row, "insert into mytable( ${row.keySet().join(',')} ) values( :${row.keySet().join(', :')} )") + SQL.mydb.executeInsert(row, "insert into mytable( ${row.keySet().join(',')} ) values( :${row.keySet().join(', :')} )") //create new flowfile based on original without copying content, //write new content and add into outFiles list outFiles << flowFile.clone(false).write( "UTF-8", JsonOutput.toJson(row) ) } //just easier to assert sql here -assert 2+rows.size() == CTL.sql.firstRow("select count(*) cnt from mytable").cnt +assert 2+rows.size() == SQL.mydb.firstRow("select count(*) cnt from mytable").cnt flowFile.remove() //transfer all new files to success relationship