This commit is contained in:
dlukyanov 2017-11-12 16:23:07 +02:00
parent b3eecec901
commit 2916ce1ec8
7 changed files with 73 additions and 66 deletions

View File

@ -305,44 +305,49 @@ public class ExecuteGroovyScript extends AbstractProcessor {
} }
/** /**
* init controller services * init SQL variables from DBCP services
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void onInitCTL(HashMap CTL) throws SQLException { private void onInitSQL(HashMap SQL) throws SQLException {
for (Map.Entry e : (Set<Map.Entry>) CTL.entrySet()) { for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
if (e.getValue() instanceof DBCPService) {
DBCPService s = (DBCPService) e.getValue(); DBCPService s = (DBCPService) e.getValue();
OSql sql = new OSql(s.getConnection()); OSql sql = new OSql(s.getConnection());
//try to set autocommit to false
try {
if (sql.getConnection().getAutoCommit()) {
sql.getConnection().setAutoCommit(false); sql.getConnection().setAutoCommit(false);
e.setValue(sql);
} }
} catch (Throwable ei) {
getLogger().warn("Failed to set autocommit=false for `" + e.getKey() + "`", ei);
}
e.setValue(sql);
} }
} }
/** /**
* before commit controller services * before commit SQL services
*/ */
private void onCommitCTL(HashMap CTL) throws SQLException { private void onCommitSQL(HashMap SQL) throws SQLException {
for (Map.Entry e : (Set<Map.Entry>) CTL.entrySet()) { for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
if (e.getValue() instanceof OSql) {
OSql sql = (OSql) e.getValue(); OSql sql = (OSql) e.getValue();
if (!sql.getConnection().getAutoCommit()) { if (!sql.getConnection().getAutoCommit()) {
sql.commit(); sql.commit();
} }
} }
} }
}
/** /**
* finalize controller services * finalize SQL services. no exceptions should be thrown.
*/ */
private void onFinitCTL(HashMap CTL) { private void onFinitSQL(HashMap SQL) {
for (Map.Entry e : (Set<Map.Entry>) CTL.entrySet()) { for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
if (e.getValue() instanceof OSql) {
OSql sql = (OSql) e.getValue(); OSql sql = (OSql) e.getValue();
try { try {
if (!sql.getConnection().getAutoCommit()) {
sql.getConnection().setAutoCommit(true); //default autocommit value in nifi sql.getConnection().setAutoCommit(true); //default autocommit value in nifi
}
} catch (Throwable ei) { } catch (Throwable ei) {
getLogger().warn("Failed to set autocommit=true for `" + e.getKey() + "`", ei);
} }
try { try {
sql.close(); sql.close();
@ -351,21 +356,19 @@ public class ExecuteGroovyScript extends AbstractProcessor {
} }
} }
} }
}
/** /**
* exception controller services * exception SQL services
*/ */
private void onFailCTL(HashMap CTL) { private void onFailSQL(HashMap SQL) {
for (Map.Entry e : (Set<Map.Entry>) CTL.entrySet()) { for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
if (e.getValue() instanceof OSql) {
OSql sql = (OSql) e.getValue(); OSql sql = (OSql) e.getValue();
try { try {
if (!sql.getConnection().getAutoCommit()) { if (!sql.getConnection().getAutoCommit()) {
sql.rollback(); sql.rollback();
} }
} catch (Throwable ei) { } catch (Throwable ei) {
} //the rollback error usually not important because it precessed with DML error that is really important
} }
} }
} }
@ -379,6 +382,7 @@ public class ExecuteGroovyScript extends AbstractProcessor {
HashMap CTL = new AccessMap("CTL"); HashMap CTL = new AccessMap("CTL");
HashMap SQL = new AccessMap("SQL");
try { try {
Script script = getGroovyScript(); //compilation must be moved to validation Script script = getGroovyScript(); //compilation must be moved to validation
@ -393,6 +397,9 @@ public class ExecuteGroovyScript extends AbstractProcessor {
//get controller service //get controller service
ControllerService ctl = context.getProperty(property.getKey()).asControllerService(ControllerService.class); ControllerService ctl = context.getProperty(property.getKey()).asControllerService(ControllerService.class);
CTL.put(property.getKey().getName().substring(4), ctl); CTL.put(property.getKey().getName().substring(4), ctl);
} else if (property.getKey().getName().startsWith("SQL.")) {
DBCPService dbcp = context.getProperty(property.getKey()).asControllerService(DBCPService.class);
SQL.put(property.getKey().getName().substring(4), dbcp);
} else { } else {
// Add the dynamic property bound to its full PropertyValue to the script engine // Add the dynamic property bound to its full PropertyValue to the script engine
if (property.getValue() != null) { if (property.getValue() != null) {
@ -401,7 +408,7 @@ public class ExecuteGroovyScript extends AbstractProcessor {
} }
} }
} }
onInitCTL(CTL); onInitSQL(SQL);
bindings.put("session", session); bindings.put("session", session);
bindings.put("context", context); bindings.put("context", context);
@ -409,15 +416,16 @@ public class ExecuteGroovyScript extends AbstractProcessor {
bindings.put("REL_SUCCESS", REL_SUCCESS); bindings.put("REL_SUCCESS", REL_SUCCESS);
bindings.put("REL_FAILURE", REL_FAILURE); bindings.put("REL_FAILURE", REL_FAILURE);
bindings.put("CTL", CTL); bindings.put("CTL", CTL);
bindings.put("SQL", SQL);
script.run(); script.run();
bindings.clear(); bindings.clear();
onCommitCTL(CTL); onCommitSQL(SQL);
session.commit(); session.commit();
} catch (Throwable t) { } catch (Throwable t) {
getLogger().error(t.toString(), t); getLogger().error(t.toString(), t);
onFailCTL(CTL); onFailSQL(SQL);
if (toFailureOnError) { if (toFailureOnError) {
//transfer all received to failure with two new attributes: ERROR_MESSAGE and ERROR_STACKTRACE. //transfer all received to failure with two new attributes: ERROR_MESSAGE and ERROR_STACKTRACE.
session.revertReceivedTo(REL_FAILURE, StackTraceUtils.deepSanitize(t)); session.revertReceivedTo(REL_FAILURE, StackTraceUtils.deepSanitize(t));
@ -425,7 +433,7 @@ public class ExecuteGroovyScript extends AbstractProcessor {
session.rollback(true); session.rollback(true);
} }
} finally { } finally {
onFinitCTL(CTL); onFinitSQL(SQL);
} }
} }

View File

@ -229,7 +229,7 @@ public class ExecuteGroovyScriptTest {
@Test @Test
public void test_sql_01_select() throws Exception { public void test_sql_01_select() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_01_select.groovy"); runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_01_select.groovy");
runner.setProperty("CTL.sql", "dbcp"); runner.setProperty("SQL.mydb", "dbcp");
runner.assertValid(); runner.assertValid();
runner.run(); runner.run();
@ -244,7 +244,7 @@ public class ExecuteGroovyScriptTest {
@Test @Test
public void test_sql_02_blob_write() throws Exception { public void test_sql_02_blob_write() throws Exception {
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_02_blob_write.groovy"); runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_02_blob_write.groovy");
runner.setProperty("CTL.sql", "dbcp"); runner.setProperty("SQL.mydb", "dbcp");
//runner.setProperty("ID", "0"); //runner.setProperty("ID", "0");
runner.assertValid(); runner.assertValid();
@ -263,7 +263,7 @@ public class ExecuteGroovyScriptTest {
public void test_sql_03_blob_read() throws Exception { public void test_sql_03_blob_read() throws Exception {
//read blob from database written at previous step and write to flow file //read blob from database written at previous step and write to flow file
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_03_blob_read.groovy"); runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_03_blob_read.groovy");
runner.setProperty("CTL.sql", "dbcp"); runner.setProperty("SQL.mydb", "dbcp");
runner.setProperty("ID", "0"); runner.setProperty("ID", "0");
runner.setValidateExpressionUsage(false); runner.setValidateExpressionUsage(false);
runner.assertValid(); runner.assertValid();
@ -280,7 +280,7 @@ public class ExecuteGroovyScriptTest {
public void test_sql_04_insert_and_json() throws Exception { public void test_sql_04_insert_and_json() throws Exception {
//read blob from database written at previous step and write to flow file //read blob from database written at previous step and write to flow file
runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_04_insert_and_json.groovy"); runner.setProperty(proc.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_sql_04_insert_and_json.groovy");
runner.setProperty("CTL.sql", "dbcp"); runner.setProperty("SQL.mydb", "dbcp");
runner.setValidateExpressionUsage(false); runner.setValidateExpressionUsage(false);
runner.assertValid(); runner.assertValid();

View File

@ -21,7 +21,6 @@ http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html
and refactored for ExecuteGroovyScript and refactored for ExecuteGroovyScript
*/ */
//assume you defined CTL.conn property linked to desired database connection pool
def flowFile = session.create() def flowFile = session.create()
//flowfile.write defined here: org\apache\nifi\processors\groovyx\GroovyMethods.java //flowfile.write defined here: org\apache\nifi\processors\groovyx\GroovyMethods.java

View File

@ -24,8 +24,8 @@ and refactored and simplified for ExecuteGroovyScript
def flowFile = session.create() def flowFile = session.create()
flowFile.write("UTF-8"){wout -> flowFile.write("UTF-8"){wout ->
//assume CTL.sql property is linked to desired database connection pool //assume SQL.mydb property is linked to desired database connection pool
CTL.sql.eachRow('select * from mytable'){ row-> SQL.mydb.eachRow('select * from mytable'){ row->
wout << row.name << '\n' wout << row.name << '\n'
} }
} }

View File

@ -26,7 +26,7 @@ flowFile.read{ rawIn->
p_id : flowFile.ID as Long, p_id : flowFile.ID as Long,
p_data : Sql.BLOB( rawIn ), p_data : Sql.BLOB( rawIn ),
] ]
assert 1==CTL.sql.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id") assert 1==SQL.mydb.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id")
} }
//transfer original to output //transfer original to output
REL_SUCCESS << flowFile REL_SUCCESS << flowFile

View File

@ -19,7 +19,7 @@ def flowFile = session.create()
//read blob into flowFile content //read blob into flowFile content
flowFile.write{out-> flowFile.write{out->
//get id from property with name ID //get id from property with name ID
def row = CTL.sql.firstRow("select data from mytable where id = ${ ID.value as Long }") def row = SQL.mydb.firstRow("select data from mytable where id = ${ ID.value as Long }")
assert row : "row with id=`${ID}` not found" assert row : "row with id=`${ID}` not found"
//write blob stream to flowFile output stream //write blob stream to flowFile output stream
out << row.data.getBinaryStream() out << row.data.getBinaryStream()

View File

@ -40,14 +40,14 @@ rows.each{row->
//at this point row is a map with keys corresponding to mytable column names. //at this point row is a map with keys corresponding to mytable column names.
//build query: insert into mytable(a,b,c,...) values(:a, :b, :c, ...) //build query: insert into mytable(a,b,c,...) values(:a, :b, :c, ...)
//and pass row-map as an argument to this query //and pass row-map as an argument to this query
CTL.sql.executeInsert(row, "insert into mytable( ${row.keySet().join(',')} ) values( :${row.keySet().join(', :')} )") SQL.mydb.executeInsert(row, "insert into mytable( ${row.keySet().join(',')} ) values( :${row.keySet().join(', :')} )")
//create new flowfile based on original without copying content, //create new flowfile based on original without copying content,
//write new content and add into outFiles list //write new content and add into outFiles list
outFiles << flowFile.clone(false).write( "UTF-8", JsonOutput.toJson(row) ) outFiles << flowFile.clone(false).write( "UTF-8", JsonOutput.toJson(row) )
} }
//just easier to assert sql here //just easier to assert sql here
assert 2+rows.size() == CTL.sql.firstRow("select count(*) cnt from mytable").cnt assert 2+rows.size() == SQL.mydb.firstRow("select count(*) cnt from mytable").cnt
flowFile.remove() flowFile.remove()
//transfer all new files to success relationship //transfer all new files to success relationship