diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index f24159204a..02d9f886a7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -53,50 +53,49 @@ import org.apache.nifi.util.StopWatch; @InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"sql", "select", "jdbc", "query", "database"}) @CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." - + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + - "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + - "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + - "select query. " + - "FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") + + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + + "select query. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") public class ExecuteSQL extends AbstractProcessor { public static final String RESULT_ROW_COUNT = "executesql.row.count"; // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Successfully created FlowFile from SQL query result set.") - .build(); + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship") - .build(); + .name("failure") + .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship") + .build(); private final Set relationships; public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() - .name("Database Connection Pooling Service") - .description("The Controller Service that is used to obtain connection to database") - .required(true) - .identifiesControllerService(DBCPService.class) - .build(); + .name("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder() - .name("SQL select query") - .description("SQL select query") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("SQL select query") + .description("SQL select query") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() - .name("Max Wait Time") - .description("The maximum amount of time allowed for a running SQL select query " - + " , zero means there is no limit. Max time less than 1 second will be equal to zero.") - .defaultValue("0 seconds") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .sensitive(false) - .build(); + .name("Max Wait Time") + .description("The maximum amount of time allowed for a running SQL select query " + + " , zero means there is no limit. Max time less than 1 second will be equal to zero.") + .defaultValue("0 seconds") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .sensitive(false) + .build(); private final List propDescriptors; @@ -125,36 +124,36 @@ public class ExecuteSQL extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile incoming = null; + FlowFile fileToProcess = null; if (context.hasIncomingConnection()) { - incoming = session.get(); + fileToProcess = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. - if (incoming == null && context.hasNonLoopConnection()) { + if (fileToProcess == null && context.hasNonLoopConnection()) { return; } } final ProcessorLog logger = getLogger(); - final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); - final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue(); + final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); - final StopWatch stopWatch = new StopWatch(true); try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { st.setQueryTimeout(queryTimeout); // timeout in seconds final LongHolder nrOfRows = new LongHolder(0L); - FlowFile outgoing = (incoming == null ? session.create() : incoming); - outgoing = session.write(outgoing, new OutputStreamCallback() { + if (fileToProcess == null) { + fileToProcess = session.create(); + } + fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { try { - logger.debug("Executing query {}", new Object[] {selectQuery}); + logger.debug("Executing query {}", new Object[]{selectQuery}); final ResultSet resultSet = st.executeQuery(selectQuery); nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out)); } catch (final SQLException e) { @@ -164,17 +163,30 @@ public class ExecuteSQL extends AbstractProcessor { }); // set attribute how many rows were selected - outgoing = session.putAttribute(outgoing, RESULT_ROW_COUNT, nrOfRows.get().toString()); + fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, nrOfRows.get().toString()); - logger.info("{} contains {} Avro records; transferring to 'success'", new Object[] {outgoing, nrOfRows.get()}); - session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(outgoing, REL_SUCCESS); + logger.info("{} contains {} Avro records; transferring to 'success'", + new Object[]{fileToProcess, nrOfRows.get()}); + session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", + stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(fileToProcess, REL_SUCCESS); } catch (final ProcessException | SQLException e) { - if (incoming == null) { - logger.error("Unable to execute SQL select query {} due to {}. No incoming flow file to route to failure", new Object[] {selectQuery, e}); + if (fileToProcess == null) { + // This can happen if any exceptions occur while setting up the connection, statement, etc. + logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure", + new Object[]{selectQuery, e}); + context.yield(); } else { - logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] {selectQuery, incoming, e}); - session.transfer(incoming, REL_FAILURE); + if (context.hasIncomingConnection()) { + logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", + new Object[]{selectQuery, fileToProcess, e}); + fileToProcess = session.penalize(fileToProcess); + } else { + logger.error("Unable to execute SQL select query {} due to {}; routing to failure", + new Object[]{selectQuery, e}); + context.yield(); + } + session.transfer(fileToProcess, REL_FAILURE); } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index e381706e9e..4da9b1f93c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -152,6 +152,31 @@ public class TestExecuteSQL { runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "2"); } + @Test + public void testWithSqlException() throws SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NO_ROWS"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NO_ROWS (id integer)"); + + runner.setIncomingConnection(false); + // Try a valid SQL statment that will generate an error (val1 does not exist, e.g.) + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS"); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); + } + public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile) throws InitializationException, ClassNotFoundException, SQLException, IOException {