From 0390c0f1967d1a57a333d15e1ec41b06ceb88590 Mon Sep 17 00:00:00 2001 From: patricker Date: Wed, 1 Nov 2017 10:25:26 +0800 Subject: [PATCH] NIFI-4561 ExecuteSQL returns no FlowFile for some queries This closes #2243 Signed-off-by: Mike Thomsen --- .../nifi/processors/standard/ExecuteSQL.java | 105 +++++++++++------- .../processors/standard/TestExecuteSQL.java | 26 +++++ 2 files changed, 90 insertions(+), 41 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 203d02a53a..13be8d5457 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -21,6 +21,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -91,12 +92,15 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC }) @WritesAttributes({ @WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"), - @WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds") + @WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds"), + @WritesAttribute(attribute="executesql.resultset.index", description = "Assuming multiple result sets are returned, " + + "the zero based index of this result set.") }) public class ExecuteSQL extends AbstractProcessor { public static final String RESULT_ROW_COUNT = "executesql.row.count"; public static final String RESULT_QUERY_DURATION = "executesql.query.duration"; + public static final String RESULTSET_INDEX = "executesql.resultset.index"; // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -220,53 +224,60 @@ public class ExecuteSQL extends AbstractProcessor { JdbcCommon.setParameters(st, fileToProcess.getAttributes()); } logger.debug("Executing query {}", new Object[]{selectQuery}); - boolean results = st.execute(); + boolean hasResults = st.execute(); + boolean hasUpdateCount = st.getUpdateCount() != -1; + while(hasResults || hasUpdateCount) { + //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet + if (hasResults) { + FlowFile resultSetFF; + if (fileToProcess == null) { + resultSetFF = session.create(); + } else { + resultSetFF = session.create(fileToProcess); + resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes()); + } - while(results){ - FlowFile resultSetFF; - if(fileToProcess == null){ - resultSetFF = session.create(); - } else { - resultSetFF = session.create(fileToProcess); - resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes()); + final AtomicLong nrOfRows = new AtomicLong(0L); + resultSetFF = session.write(resultSetFF, out -> { + try { + + final ResultSet resultSet = st.getResultSet(); + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() + .convertNames(convertNamesForAvro) + .useLogicalTypes(useAvroLogicalTypes) + .defaultPrecision(defaultPrecision) + .defaultScale(defaultScale) + .build(); + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); + } catch (final SQLException e) { + throw new ProcessException(e); + } + }); + + long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS); + + // set attribute how many rows were selected + resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration)); + resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); + resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount)); + + logger.info("{} contains {} Avro records; transferring to 'success'", + new Object[]{resultSetFF, nrOfRows.get()}); + session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration); + session.transfer(resultSetFF, REL_SUCCESS); + + resultCount++; } - final AtomicLong nrOfRows = new AtomicLong(0L); - resultSetFF = session.write(resultSetFF, out -> { - try { - - final ResultSet resultSet = st.getResultSet(); - final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() - .convertNames(convertNamesForAvro) - .useLogicalTypes(useAvroLogicalTypes) - .defaultPrecision(defaultPrecision) - .defaultScale(defaultScale) - .build(); - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); - } catch (final SQLException e) { - throw new ProcessException(e); - } - }); - - long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS); - - // set attribute how many rows were selected - resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration)); - resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); - - logger.info("{} contains {} Avro records; transferring to 'success'", - new Object[]{resultSetFF, nrOfRows.get()}); - session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration); - session.transfer(resultSetFF, REL_SUCCESS); - - resultCount++; // are there anymore result sets? try{ - results = st.getMoreResults(); + hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT); + hasUpdateCount = st.getUpdateCount() != -1; } catch(SQLException ex){ - results = false; + hasResults = false; + hasUpdateCount = false; } } @@ -278,8 +289,20 @@ public class ExecuteSQL extends AbstractProcessor { } else { fileToProcess = session.write(fileToProcess, JdbcCommon::createEmptyAvroStream); + fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0"); + fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); session.transfer(fileToProcess, REL_SUCCESS); } + } else if(resultCount == 0){ + //If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only) + // Then generate an empty Output FlowFile + FlowFile resultSetFF = session.create(); + + resultSetFF = session.write(resultSetFF, out -> JdbcCommon.createEmptyAvroStream(out)); + + resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0"); + resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); + session.transfer(resultSetFF, REL_SUCCESS); } } catch (final ProcessException | SQLException e) { //If we had at least one result then it's OK to drop the original file, but if we had no results then diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 3a0b773912..b4a7c69edc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -189,6 +189,32 @@ public class TestExecuteSQL { runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "2"); + runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0"); + } + + @Test + public void testInsertStatementCreatesFlowFile() throws SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + runner.setIncomingConnection(false); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)"); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0"); } @Test