diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index d7f4b24af3..cfe68b5bfa 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -241,15 +241,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final AtomicLong nrOfRows = new AtomicLong(0L); FlowFile fileToProcess = session.create(); - fileToProcess = session.write(fileToProcess, out -> { - // Max values will be updated in the state property map by the callback - final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter); - try { - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile)); - } catch (SQLException e) { - throw new ProcessException("Error during database query or conversion of records to Avro.", e); - } - }); + + try { + fileToProcess = session.write(fileToProcess, out -> { + // Max values will be updated in the state property map by the callback + final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter); + try { + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile)); + } catch (SQLException | RuntimeException e) { + throw new ProcessException("Error during database query or conversion of records to Avro.", e); + } + }); + } catch (ProcessException e) { + // Add flowfile to results before rethrowing so it will be removed from session in outer catch + resultSetFlowFiles.add(fileToProcess); + throw e; + } if (nrOfRows.get() > 0) { // set attribute how many rows were selected @@ -269,7 +276,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { new Object[]{fileToProcess, nrOfRows.get()}); session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - resultSetFlowFiles.add(fileToProcess); } else { // If there were no rows returned, don't send the flowfile @@ -395,7 +401,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { } catch (ParseException | SQLException e) { throw new IOException(e); } - } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index e97e52926e..979dd387cc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; @@ -70,6 +71,7 @@ public class QueryDatabaseTableTest { private TestRunner runner; private final static String DB_LOCATION = "target/db_qdt"; private DatabaseAdapter dbAdapter; + private HashMap origDbAdapters; @BeforeClass @@ -106,8 +108,9 @@ public class QueryDatabaseTableTest { public void setup() throws InitializationException, IOException { final DBCPService dbcp = new DBCPServiceSimpleImpl(); final Map dbcpProperties = new HashMap<>(); + origDbAdapters = new HashMap<>(QueryDatabaseTable.dbAdapters); dbAdapter = new GenericDatabaseAdapter(); - + QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter); processor = new MockQueryDatabaseTable(); runner = TestRunners.newTestRunner(processor); runner.addControllerService("dbcp", dbcp, dbcpProperties); @@ -120,6 +123,8 @@ public class QueryDatabaseTableTest { @After public void teardown() { runner = null; + QueryDatabaseTable.dbAdapters.clear(); + QueryDatabaseTable.dbAdapters.putAll(origDbAdapters); } @Test @@ -365,6 +370,37 @@ public class QueryDatabaseTableTest { runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).assertAttributeEquals(QueryDatabaseTable.RESULT_ROW_COUNT, "2"); } + @Test + public void testWithRuntimeException() throws SQLException { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + // Ignore, usually due to Derby not having DROP TABLE IF EXISTS + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)"); + + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT"); + + QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() { + @Override + public String getName() { + throw new DataFileWriter.AppendWriteException(null); + } + }); + runner.run(); + + assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty()); + } + @Test public void testWithSqlException() throws SQLException { // load test data to database