NIFI-8249: Fixed issue with error during multiple FlowFile results in ExecuteSQL processors

This commit is contained in:
Matthew Burgess 2021-02-23 18:18:42 -05:00 committed by markap14
parent 2322b2cddf
commit 34fc94454f
2 changed files with 45 additions and 1 deletions

View File

@ -357,8 +357,9 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
fragmentIndex++;
} catch (Exception e) {
// Remove the result set flow file and propagate the exception
// Remove any result set flow file(s) and propagate the exception
session.remove(resultSetFF);
session.remove(resultSetFlowFiles);
if (e instanceof ProcessException) {
throw (ProcessException) e;
} else {

View File

@ -276,6 +276,49 @@ public class TestExecuteSQLRecord {
lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.INPUT_FLOWFILE_UUID, inputFlowFile.getAttribute(CoreAttributes.UUID.key()));
}
@Test
public void testWithOutputBatchingLastBatchFails() throws InitializationException, SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
}
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 varchar(50), constraint my_pk primary key (id))");
// Insert some valid numeric values (for TO_NUMBER call later)
for (int i = 0; i < 11; i++) {
stmt.execute("insert into TEST_NULL_INT (id, val1) VALUES (" + i + ", '" + i + "')");
}
// Insert invalid numeric value
stmt.execute("insert into TEST_NULL_INT (id, val1) VALUES (100, 'abc')");
Map<String, String> attrMap = new HashMap<>();
String testAttrName = "attr1";
String testAttrValue = "value1";
attrMap.put(testAttrName, testAttrValue);
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
runner.enableControllerService(recordWriter);
runner.setIncomingConnection(true);
runner.setProperty(ExecuteSQLRecord.MAX_ROWS_PER_FLOW_FILE, "5");
runner.enqueue("SELECT ID, CAST(VAL1 AS INTEGER) AS TN FROM TEST_NULL_INT", attrMap);
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQLRecord.REL_FAILURE, 1);
runner.assertTransferCount(ExecuteSQLRecord.REL_SUCCESS, 0);
}
@Test
public void testMaxRowsPerFlowFile() throws Exception {
// remove previous test database, if any