NIFI-1973 Allow ExecuteSQL to use flow file content as SQL query

This closes #498.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Matt Burgess 2016-06-06 11:19:32 -04:00 committed by Bryan Bende
parent 6de738fd04
commit 3b99e12352
2 changed files with 70 additions and 9 deletions

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.sql.Connection; import java.sql.Connection;
import java.sql.ResultSet; import java.sql.ResultSet;
@ -29,11 +30,14 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
@ -43,6 +47,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.processors.standard.util.JdbcCommon;
@ -57,6 +62,7 @@ import org.apache.nifi.util.StopWatch;
+ "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. "
+ "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the "
+ "select query. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") + "select query. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
@WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query")
public class ExecuteSQL extends AbstractProcessor { public class ExecuteSQL extends AbstractProcessor {
public static final String RESULT_ROW_COUNT = "executesql.row.count"; public static final String RESULT_ROW_COUNT = "executesql.row.count";
@ -81,8 +87,12 @@ public class ExecuteSQL extends AbstractProcessor {
public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder() public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
.name("SQL select query") .name("SQL select query")
.description("SQL select query") .description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes "
.required(true) + "using Expression Language. If this property is specified, it will be used regardless of the content of "
+ "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected "
+ "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression "
+ "Language is not evaluated for flow file contents.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
@ -122,6 +132,17 @@ public class ExecuteSQL extends AbstractProcessor {
return propDescriptors; return propDescriptors;
} }
@OnScheduled
public void setup(ProcessContext context) {
// If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization
if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
final String errorString = "Either the Select Query must be specified or there must be an incoming connection "
+ "providing flowfile(s) containing a SQL select query";
getLogger().error(errorString);
throw new ProcessException(errorString);
}
}
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile fileToProcess = null; FlowFile fileToProcess = null;
@ -138,9 +159,23 @@ public class ExecuteSQL extends AbstractProcessor {
final ComponentLog logger = getLogger(); final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
final String selectQuery;
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
} else {
// If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
// If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
final StringBuilder queryContents = new StringBuilder();
session.read(fileToProcess, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
queryContents.append(IOUtils.toString(in));
}
});
selectQuery = queryContents.toString();
}
try (final Connection con = dbcpService.getConnection(); try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) { final Statement st = con.createStatement()) {

View File

@ -107,21 +107,40 @@ public class TestExecuteSQL {
runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 0); runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 0);
} }
@Test
public void testIncomingConnectionWithNoFlowFileAndNoQuery() throws InitializationException {
runner.setIncomingConnection(true);
runner.run();
runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0);
runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 0);
}
@Test(expected = AssertionError.class)
public void testNoIncomingConnectionAndNoQuery() throws InitializationException {
runner.setIncomingConnection(false);
runner.run();
}
@Test @Test
public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException { public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException {
runner.setIncomingConnection(false); runner.setIncomingConnection(false);
invokeOnTrigger(null, QUERY_WITHOUT_EL, false); invokeOnTrigger(null, QUERY_WITHOUT_EL, false, true);
} }
@Test @Test
public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(null, QUERY_WITH_EL, true); invokeOnTrigger(null, QUERY_WITH_EL, true, true);
}
@Test
public void testSelectQueryInFlowFile() throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(null, QUERY_WITHOUT_EL, true, false);
} }
@Test @Test
public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException { public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException {
// Does to seem to have any effect when using embedded Derby // Does to seem to have any effect when using embedded Derby
invokeOnTrigger(1, QUERY_WITH_EL, true); // 1 second max time invokeOnTrigger(1, QUERY_WITH_EL, true, true); // 1 second max time
} }
@Test @Test
@ -177,7 +196,7 @@ public class TestExecuteSQL {
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1);
} }
public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile) public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final boolean setQueryProperty)
throws InitializationException, ClassNotFoundException, SQLException, IOException { throws InitializationException, ClassNotFoundException, SQLException, IOException {
if (queryTimeout != null) { if (queryTimeout != null) {
@ -196,14 +215,21 @@ public class TestExecuteSQL {
// ResultSet size will be 1x200x100 = 20 000 rows // ResultSet size will be 1x200x100 = 20 000 rows
// because of where PER.ID = ${person.id} // because of where PER.ID = ${person.id}
final int nrOfRows = 20000; final int nrOfRows = 20000;
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
if (incomingFlowFile) { if (incomingFlowFile) {
// incoming FlowFile content is not used, but attributes are used // incoming FlowFile content is not used, but attributes are used
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("person.id", "10"); attributes.put("person.id", "10");
if (!setQueryProperty) {
runner.enqueue(query.getBytes(), attributes);
} else {
runner.enqueue("Hello".getBytes(), attributes); runner.enqueue("Hello".getBytes(), attributes);
} }
}
if(setQueryProperty) {
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
}
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);