NIFI-5322: Addressed bug that caused QueryRecord to fail (rollback session instead of routing to 'failure') whenever a record in the incoming FlowFile did not adhere to its schema. This happened because the InputStream for the FlowFile was not properly closed. Also updated the text of the Exception to include information from its 'cause' so user is better able to understand the underlying issue.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2803
This commit is contained in:
Mark Payne 2018-06-19 14:46:01 -04:00 committed by Matthew Burgess
parent 8996b7f6d6
commit 2760b07770
3 changed files with 43 additions and 3 deletions

View File

@ -467,7 +467,13 @@ public class QueryRecord extends AbstractProcessor {
final FlowFileTable<?, ?> table = cachedStatement.getTable();
table.setFlowFile(session, flowFile);
final ResultSet rs = stmt.executeQuery();
final ResultSet rs;
try {
rs = stmt.executeQuery();
} catch (final Throwable t) {
table.close();
throw t;
}
return new QueryResult() {
@Override
@ -516,11 +522,18 @@ public class QueryRecord extends AbstractProcessor {
rootSchema.setCacheEnabled(false);
statement = connection.createStatement();
resultSet = statement.executeQuery(sql);
try {
resultSet = statement.executeQuery(sql);
} catch (final Throwable t) {
flowFileTable.close();
throw t;
}
final ResultSet rs = resultSet;
final Statement stmt = statement;
final Connection conn = connection;
return new QueryResult() {
@Override
public void close() throws IOException {

View File

@ -62,7 +62,7 @@ public class FlowFileEnumerator<InternalType> implements Enumerator<Object> {
currentRow = filterColumns(recordParser.nextRecord());
break;
} catch (final Exception e) {
throw new ProcessException("Failed to read next record in stream for " + flowFile, e);
throw new ProcessException("Failed to read next record in stream for " + flowFile + " due to " + e.getMessage(), e);
}
}

View File

@ -70,6 +70,33 @@ public class TestQueryRecord {
return runner;
}
@Test
public void testStreamClosedWhenBadData() throws InitializationException {
final MockRecordParser parser = new MockRecordParser();
parser.failAfter(0);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("age", RecordFieldType.INT);
parser.addRecord("Tom", 49);
final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
TestRunner runner = getRunner();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
runner.addControllerService("writer", writer);
runner.enableControllerService(writer);
runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''");
runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(QueryRecord.REL_FAILURE, 1);
}
@Test
public void testSimple() throws InitializationException, IOException, SQLException {
final MockRecordParser parser = new MockRecordParser();