NIFI-4522: Add SQL Statement property to PutSQL

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2225.
This commit is contained in:
Matthew Burgess 2017-10-25 09:53:16 -04:00 committed by Pierre Villard
parent 1625f719e6
commit 856dedab12
2 changed files with 69 additions and 3 deletions

View File

@ -139,6 +139,19 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
.identifiesControllerService(DBCPService.class)
.required(true)
.build();
static final PropertyDescriptor SQL_STATEMENT = new PropertyDescriptor.Builder()
.name("putsql-sql-statement")
.displayName("SQL Statement")
.description("The SQL statement to execute. The statement can be empty, a constant value, or built from attributes "
+ "using Expression Language. If this property is specified, it will be used regardless of the content of "
+ "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected "
+ "to contain a valid SQL statement, to be issued by the processor to the database.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder()
.name("Support Fragmented Transactions")
.description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
@ -197,6 +210,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONNECTION_POOL);
properties.add(SQL_STATEMENT);
properties.add(SUPPORT_TRANSACTIONS);
properties.add(TRANSACTION_TIMEOUT);
properties.add(BATCH_SIZE);
@ -269,7 +283,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
groups.add(fragmentedEnclosure);
for (final FlowFile flowFile : flowFiles) {
final String sql = getSQL(session, flowFile);
final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet()
? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue()
: getSQL(session, flowFile);
final StatementFlowFileEnclosure enclosure = sqlToEnclosure
.computeIfAbsent(sql, k -> new StatementFlowFileEnclosure(sql));
@ -280,7 +296,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
private final GroupingFunction groupFlowFilesBySQLBatch = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> {
for (final FlowFile flowFile : flowFiles) {
final String sql = getSQL(session, flowFile);
final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet()
? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue()
: getSQL(session, flowFile);
// Get or create the appropriate PreparedStatement to use.
final StatementFlowFileEnclosure enclosure = sqlToEnclosure
@ -304,7 +322,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
private GroupingFunction groupFlowFilesBySQL = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> {
for (final FlowFile flowFile : flowFiles) {
final String sql = getSQL(session, flowFile);
final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet()
? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue()
: getSQL(session, flowFile);
// Get or create the appropriate PreparedStatement to use.
final StatementFlowFileEnclosure enclosure = sqlToEnclosure

View File

@ -1412,6 +1412,52 @@ public class TestPutSQL {
runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0);
}
@Test
public void testStatementsFromProperty() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.setProperty(PutSQL.SQL_STATEMENT, "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (${row.id}, 'Mark', 84)");
recreateTable("PERSONS", createPersons);
runner.enqueue("This statement should be ignored".getBytes(), new HashMap<String,String>() {{
put("row.id", "1");
}});
runner.run();
runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("Mark", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
runner.setProperty(PutSQL.SQL_STATEMENT, "UPDATE PERSONS SET NAME='George' WHERE ID=${row.id}");
runner.enqueue("This statement should be ignored".getBytes(), new HashMap<String,String>() {{
put("row.id", "1");
}});
runner.run();
try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("George", rs.getString(2));
assertEquals(84, rs.getInt(3));
assertFalse(rs.next());
}
}
}
/**
* Simple implementation only for testing purposes
*/