NIFI-2674 - Always adding flow files to result set flowfiles, catching runtime exceptions while converting to avro

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #953
This commit is contained in:
Bryan Rosander 2016-08-25 19:52:55 -04:00 committed by Matt Burgess
parent 7879a99206
commit 102a9a2b74
2 changed files with 53 additions and 12 deletions

View File

@ -241,15 +241,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final AtomicLong nrOfRows = new AtomicLong(0L); final AtomicLong nrOfRows = new AtomicLong(0L);
FlowFile fileToProcess = session.create(); FlowFile fileToProcess = session.create();
try {
fileToProcess = session.write(fileToProcess, out -> { fileToProcess = session.write(fileToProcess, out -> {
// Max values will be updated in the state property map by the callback // Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter); final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, dbAdapter);
try { try {
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile)); nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile));
} catch (SQLException e) { } catch (SQLException | RuntimeException e) {
throw new ProcessException("Error during database query or conversion of records to Avro.", e); throw new ProcessException("Error during database query or conversion of records to Avro.", e);
} }
}); });
} catch (ProcessException e) {
// Add flowfile to results before rethrowing so it will be removed from session in outer catch
resultSetFlowFiles.add(fileToProcess);
throw e;
}
if (nrOfRows.get() > 0) { if (nrOfRows.get() > 0) {
// set attribute how many rows were selected // set attribute how many rows were selected
@ -269,7 +276,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
new Object[]{fileToProcess, nrOfRows.get()}); new Object[]{fileToProcess, nrOfRows.get()});
session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
resultSetFlowFiles.add(fileToProcess); resultSetFlowFiles.add(fileToProcess);
} else { } else {
// If there were no rows returned, don't send the flowfile // If there were no rows returned, don't send the flowfile
@ -395,7 +401,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
} catch (ParseException | SQLException e) { } catch (ParseException | SQLException e) {
throw new IOException(e); throw new IOException(e);
} }
} }
} }
} }

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumReader;
@ -70,6 +71,7 @@ public class QueryDatabaseTableTest {
private TestRunner runner; private TestRunner runner;
private final static String DB_LOCATION = "target/db_qdt"; private final static String DB_LOCATION = "target/db_qdt";
private DatabaseAdapter dbAdapter; private DatabaseAdapter dbAdapter;
private HashMap<String, DatabaseAdapter> origDbAdapters;
@BeforeClass @BeforeClass
@ -106,8 +108,9 @@ public class QueryDatabaseTableTest {
public void setup() throws InitializationException, IOException { public void setup() throws InitializationException, IOException {
final DBCPService dbcp = new DBCPServiceSimpleImpl(); final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>(); final Map<String, String> dbcpProperties = new HashMap<>();
origDbAdapters = new HashMap<>(QueryDatabaseTable.dbAdapters);
dbAdapter = new GenericDatabaseAdapter(); dbAdapter = new GenericDatabaseAdapter();
QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter);
processor = new MockQueryDatabaseTable(); processor = new MockQueryDatabaseTable();
runner = TestRunners.newTestRunner(processor); runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties); runner.addControllerService("dbcp", dbcp, dbcpProperties);
@ -120,6 +123,8 @@ public class QueryDatabaseTableTest {
@After @After
public void teardown() { public void teardown() {
runner = null; runner = null;
QueryDatabaseTable.dbAdapters.clear();
QueryDatabaseTable.dbAdapters.putAll(origDbAdapters);
} }
@Test @Test
@ -365,6 +370,37 @@ public class QueryDatabaseTableTest {
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).assertAttributeEquals(QueryDatabaseTable.RESULT_ROW_COUNT, "2"); runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).assertAttributeEquals(QueryDatabaseTable.RESULT_ROW_COUNT, "2");
} }
@Test
public void testWithRuntimeException() throws SQLException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
// Ignore, usually due to Derby not having DROP TABLE IF EXISTS
}
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT");
QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() {
@Override
public String getName() {
throw new DataFileWriter.AppendWriteException(null);
}
});
runner.run();
assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
}
@Test @Test
public void testWithSqlException() throws SQLException { public void testWithSqlException() throws SQLException {
// load test data to database // load test data to database