From 3b99e1235200a505c42b7da00f923a8193b55a7f Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Mon, 6 Jun 2016 11:19:32 -0400 Subject: [PATCH] NIFI-1973 Allow ExecuteSQL to use flow file content as SQL query This closes #498. Signed-off-by: Bryan Bende --- .../nifi/processors/standard/ExecuteSQL.java | 41 +++++++++++++++++-- .../processors/standard/TestExecuteSQL.java | 38 ++++++++++++++--- 2 files changed, 70 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 1e775bf456..95893d8d76 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.sql.Connection; import java.sql.ResultSet; @@ -29,11 +30,14 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.flowfile.FlowFile; @@ -43,6 +47,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.JdbcCommon; @@ -57,6 +62,7 @@ import org.apache.nifi.util.StopWatch; + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + "select query. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") +@WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query") public class ExecuteSQL extends AbstractProcessor { public static final String RESULT_ROW_COUNT = "executesql.row.count"; @@ -81,8 +87,12 @@ public class ExecuteSQL extends AbstractProcessor { public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder() .name("SQL select query") - .description("SQL select query") - .required(true) + .description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes " + + "using Expression Language. If this property is specified, it will be used regardless of the content of " + + "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected " + + "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression " + + "Language is not evaluated for flow file contents.") + .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); @@ -122,6 +132,17 @@ public class ExecuteSQL extends AbstractProcessor { return propDescriptors; } + @OnScheduled + public void setup(ProcessContext context) { + // If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization + if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) { + final String errorString = "Either the Select Query must be specified or there must be an incoming connection " + + "providing flowfile(s) containing a SQL select query"; + getLogger().error(errorString); + throw new ProcessException(errorString); + } + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile fileToProcess = null; @@ -138,9 +159,23 @@ public class ExecuteSQL extends AbstractProcessor { final ComponentLog logger = getLogger(); final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); - final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); final StopWatch stopWatch = new StopWatch(true); + final String selectQuery; + if (context.getProperty(SQL_SELECT_QUERY).isSet()) { + selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); + } else { + // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query. + // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled. + final StringBuilder queryContents = new StringBuilder(); + session.read(fileToProcess, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + queryContents.append(IOUtils.toString(in)); + } + }); + selectQuery = queryContents.toString(); + } try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 4da9b1f93c..5e2a64a117 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -107,21 +107,40 @@ public class TestExecuteSQL { runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 0); } + @Test + public void testIncomingConnectionWithNoFlowFileAndNoQuery() throws InitializationException { + runner.setIncomingConnection(true); + runner.run(); + runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0); + runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 0); + } + + @Test(expected = AssertionError.class) + public void testNoIncomingConnectionAndNoQuery() throws InitializationException { + runner.setIncomingConnection(false); + runner.run(); + } + @Test public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException { runner.setIncomingConnection(false); - invokeOnTrigger(null, QUERY_WITHOUT_EL, false); + invokeOnTrigger(null, QUERY_WITHOUT_EL, false, true); } @Test public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { - invokeOnTrigger(null, QUERY_WITH_EL, true); + invokeOnTrigger(null, QUERY_WITH_EL, true, true); + } + + @Test + public void testSelectQueryInFlowFile() throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(null, QUERY_WITHOUT_EL, true, false); } @Test public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException { // Does to seem to have any effect when using embedded Derby - invokeOnTrigger(1, QUERY_WITH_EL, true); // 1 second max time + invokeOnTrigger(1, QUERY_WITH_EL, true, true); // 1 second max time } @Test @@ -177,7 +196,7 @@ public class TestExecuteSQL { runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); } - public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile) + public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final boolean setQueryProperty) throws InitializationException, ClassNotFoundException, SQLException, IOException { if (queryTimeout != null) { @@ -196,13 +215,20 @@ public class TestExecuteSQL { // ResultSet size will be 1x200x100 = 20 000 rows // because of where PER.ID = ${person.id} final int nrOfRows = 20000; - runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query); if (incomingFlowFile) { // incoming FlowFile content is not used, but attributes are used final Map attributes = new HashMap<>(); attributes.put("person.id", "10"); - runner.enqueue("Hello".getBytes(), attributes); + if (!setQueryProperty) { + runner.enqueue(query.getBytes(), attributes); + } else { + runner.enqueue("Hello".getBytes(), attributes); + } + } + + if(setQueryProperty) { + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query); } runner.run();