NIFI-6370: Allow multiple SQL statements in PutDatabaseRecord

This closes #3528.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Matthew Burgess 2019-06-10 12:07:44 -04:00 committed by Koji Kawamura
parent f15332ff87
commit 05f3cadee8
2 changed files with 107 additions and 4 deletions

View File

@ -239,6 +239,17 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor ALLOW_MULTIPLE_STATEMENTS = new PropertyDescriptor.Builder()
.name("put-db-record-allow-multiple-statements")
.displayName("Allow Multiple SQL Statements")
.description("If the Statement Type is 'SQL' (as set in the statement.type attribute), this field indicates whether to split the field value by a semicolon and execute each statement "
+ "separately. If any statement causes an error, the entire set of statements will be rolled back. If the Statement Type is not 'SQL', this field is ignored.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final PropertyDescriptor QUOTED_IDENTIFIERS = new PropertyDescriptor.Builder()
.name("put-db-record-quoted-identifiers")
.displayName("Quote Column Identifiers")
@ -309,6 +320,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
pds.add(UNMATCHED_COLUMN_BEHAVIOR);
pds.add(UPDATE_KEYS);
pds.add(FIELD_CONTAINING_SQL);
pds.add(ALLOW_MULTIPLE_STATEMENTS);
pds.add(QUOTED_IDENTIFIERS);
pds.add(QUOTED_TABLE_IDENTIFIER);
pds.add(QUERY_TIMEOUT);
@ -404,7 +416,15 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
getLogger().warn("Failed to process {} due to {}", new Object[]{inputFlowFile, e}, e);
if (e instanceof BatchUpdateException) {
// Check if there was a BatchUpdateException or if multiple SQL statements were being executed and one failed
final String statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue();
String statementType = statementTypeProperty;
if (USE_ATTR_TYPE.equals(statementTypeProperty)) {
statementType = inputFlowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
}
if (e instanceof BatchUpdateException
|| (SQL_TYPE.equalsIgnoreCase(statementType) && context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean())) {
try {
// Although process session will move forward in order to route the failed FlowFile,
// database transaction should be rolled back to avoid partial batch update.
@ -567,8 +587,16 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
throw new MalformedRecordException(format("Record had no (or null) value for Field Containing SQL: %s, FlowFile %s", sqlField, flowFile));
}
// Execute the statement as-is
s.execute((String) sql);
// Execute the statement(s) as-is
if (context.getProperty(ALLOW_MULTIPLE_STATEMENTS).asBoolean()) {
String regex = "(?<!\\\\);";
String[] sqlStatements = ((String) sql).split(regex);
for (String statement : sqlStatements) {
s.execute(statement);
}
} else {
s.execute((String) sql);
}
}
result.routeTo(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl);

View File

@ -53,7 +53,6 @@ import static org.junit.Assert.assertTrue
import static org.junit.Assert.fail
import static org.mockito.Matchers.anyMap
import static org.mockito.Mockito.doAnswer
import static org.mockito.Mockito.only
import static org.mockito.Mockito.spy
import static org.mockito.Mockito.times
import static org.mockito.Mockito.verify
@ -413,6 +412,82 @@ class TestPutDatabaseRecord {
conn.close()
}
@Test
void testMultipleInsertsViaSqlStatementType() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons)
final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)
parser.addSchemaField("sql", RecordFieldType.STRING)
parser.addRecord('''INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101);INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102)''')
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE)
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql')
runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, 'true')
def attrs = [:]
attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql'
runner.enqueue(new byte[0], attrs)
runner.run()
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
final Connection conn = dbcp.getConnection()
final Statement stmt = conn.createStatement()
final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
assertTrue(rs.next())
assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2))
assertEquals(101, rs.getInt(3))
assertTrue(rs.next())
assertEquals(2, rs.getInt(1))
assertEquals('rec2', rs.getString(2))
assertEquals(102, rs.getInt(3))
assertFalse(rs.next())
stmt.close()
conn.close()
}
@Test
void testMultipleInsertsViaSqlStatementTypeBadSQL() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons)
final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)
parser.addSchemaField("sql", RecordFieldType.STRING)
parser.addRecord('''INSERT INTO PERSONS (id, name, code) VALUES (1, 'rec1',101);
INSERT INTO PERSONS (id, name, code) VALUES (2, 'rec2',102);
INSERT INTO PERSONS2 (id, name, code) VALUES (2, 'rec2',102);''')
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.USE_ATTR_TYPE)
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, 'sql')
runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, 'true')
def attrs = [:]
attrs[PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE] = 'sql'
runner.enqueue(new byte[0], attrs)
runner.run()
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 0)
runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1)
final Connection conn = dbcp.getConnection()
final Statement stmt = conn.createStatement()
final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
// The first two legitimate statements should have been rolled back
assertFalse(rs.next())
stmt.close()
conn.close()
}
@Test
void testSqlStatementTypeNoValue() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons)