diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 9aa9d598c7..f24159204a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -51,14 +51,13 @@ import org.apache.nifi.util.StopWatch; @EventDriven @InputRequirement(Requirement.INPUT_ALLOWED) -@Tags({ "sql", "select", "jdbc", "query", "database" }) +@Tags({"sql", "select", "jdbc", "query", "database"}) @CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + - "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + - "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + - "select query. " + - "FlowFile attribute 'executesql.row.count' indicates how many rows were selected." - ) + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + + "select query. " + + "FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") public class ExecuteSQL extends AbstractProcessor { public static final String RESULT_ROW_COUNT = "executesql.row.count"; @@ -155,7 +154,7 @@ public class ExecuteSQL extends AbstractProcessor { @Override public void process(final OutputStream out) throws IOException { try { - logger.debug("Executing query {}", new Object[] { selectQuery }); + logger.debug("Executing query {}", new Object[] {selectQuery}); final ResultSet resultSet = st.executeQuery(selectQuery); nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out)); } catch (final SQLException e) { @@ -167,8 +166,7 @@ public class ExecuteSQL extends AbstractProcessor { // set attribute how many rows were selected outgoing = session.putAttribute(outgoing, RESULT_ROW_COUNT, nrOfRows.get().toString()); - logger.info("{} contains {} Avro records", new Object[] { nrOfRows.get() }); - logger.info("Transferred {} to 'success'", new Object[] { outgoing }); + logger.info("{} contains {} Avro records; transferring to 'success'", new Object[] {outgoing, nrOfRows.get()}); session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(outgoing, REL_SUCCESS); } catch (final ProcessException | SQLException e) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 95a521037c..89ed47fb69 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.sql.Statement; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -63,20 +64,20 @@ public class TestExecuteSQL { final static String DB_LOCATION = "target/db"; final static String QUERY_WITH_EL = "select " - + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" - + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" - + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" - + ", ROW_NUMBER() OVER () as rownr " - + " from persons PER, products PRD, relationships REL" - + " where PER.ID = ${person.id}"; + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL" + + " where PER.ID = ${person.id}"; final static String QUERY_WITHOUT_EL = "select " - + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" - + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" - + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" - + ", ROW_NUMBER() OVER () as rownr " - + " from persons PER, products PRD, relationships REL" - + " where PER.ID = 10"; + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode" + + ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode" + + ", ROW_NUMBER() OVER () as rownr " + + " from persons PER, products PRD, relationships REL" + + " where PER.ID = 10"; @BeforeClass @@ -123,8 +124,36 @@ public class TestExecuteSQL { invokeOnTrigger(1, QUERY_WITH_EL, true); // 1 second max time } + @Test + public void testWithNullIntColumn() throws SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)"); + + runner.setIncomingConnection(false); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT"); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "2"); + } + public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile) - throws InitializationException, ClassNotFoundException, SQLException, IOException { + throws InitializationException, ClassNotFoundException, SQLException, IOException { if (queryTimeout != null) { runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs"); @@ -135,7 +164,7 @@ public class TestExecuteSQL { dbLocation.delete(); // load test data to database - final Connection con = ((DBCPService)runner.getControllerService("dbcp")).getConnection(); + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000); LOGGER.info("test data loaded");