From 856dedab122f767cf52b18d28b0adf3ba3c09c70 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Wed, 25 Oct 2017 09:53:16 -0400 Subject: [PATCH] NIFI-4522: Add SQL Statement property to PutSQL Signed-off-by: Pierre Villard This closes #2225. --- .../nifi/processors/standard/PutSQL.java | 26 +++++++++-- .../nifi/processors/standard/TestPutSQL.java | 46 +++++++++++++++++++ 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index f75ccaa021..b50dcd0671 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -139,6 +139,19 @@ public class PutSQL extends AbstractSessionFactoryProcessor { .identifiesControllerService(DBCPService.class) .required(true) .build(); + + static final PropertyDescriptor SQL_STATEMENT = new PropertyDescriptor.Builder() + .name("putsql-sql-statement") + .displayName("SQL Statement") + .description("The SQL statement to execute. The statement can be empty, a constant value, or built from attributes " + + "using Expression Language. If this property is specified, it will be used regardless of the content of " + + "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected " + + "to contain a valid SQL statement, to be issued by the processor to the database.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder() .name("Support Fragmented Transactions") .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. " @@ -197,6 +210,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); properties.add(CONNECTION_POOL); + properties.add(SQL_STATEMENT); properties.add(SUPPORT_TRANSACTIONS); properties.add(TRANSACTION_TIMEOUT); properties.add(BATCH_SIZE); @@ -269,7 +283,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor { groups.add(fragmentedEnclosure); for (final FlowFile flowFile : flowFiles) { - final String sql = getSQL(session, flowFile); + final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet() + ? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue() + : getSQL(session, flowFile); final StatementFlowFileEnclosure enclosure = sqlToEnclosure .computeIfAbsent(sql, k -> new StatementFlowFileEnclosure(sql)); @@ -280,7 +296,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor { private final GroupingFunction groupFlowFilesBySQLBatch = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> { for (final FlowFile flowFile : flowFiles) { - final String sql = getSQL(session, flowFile); + final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet() + ? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue() + : getSQL(session, flowFile); // Get or create the appropriate PreparedStatement to use. final StatementFlowFileEnclosure enclosure = sqlToEnclosure @@ -304,7 +322,9 @@ public class PutSQL extends AbstractSessionFactoryProcessor { private GroupingFunction groupFlowFilesBySQL = (context, session, fc, conn, flowFiles, groups, sqlToEnclosure, result) -> { for (final FlowFile flowFile : flowFiles) { - final String sql = getSQL(session, flowFile); + final String sql = context.getProperty(PutSQL.SQL_STATEMENT).isSet() + ? context.getProperty(PutSQL.SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue() + : getSQL(session, flowFile); // Get or create the appropriate PreparedStatement to use. final StatementFlowFileEnclosure enclosure = sqlToEnclosure diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java index 5a2909b1b2..b804447bbb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java @@ -1412,6 +1412,52 @@ public class TestPutSQL { runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0); } + @Test + public void testStatementsFromProperty() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutSQL.class); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp"); + runner.setProperty(PutSQL.SQL_STATEMENT, "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (${row.id}, 'Mark', 84)"); + + recreateTable("PERSONS", createPersons); + + runner.enqueue("This statement should be ignored".getBytes(), new HashMap() {{ + put("row.id", "1"); + }}); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + + runner.setProperty(PutSQL.SQL_STATEMENT, "UPDATE PERSONS SET NAME='George' WHERE ID=${row.id}"); + runner.enqueue("This statement should be ignored".getBytes(), new HashMap() {{ + put("row.id", "1"); + }}); + runner.run(); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("George", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertFalse(rs.next()); + } + } + } + /** * Simple implementation only for testing purposes */