NIFI-3432: Handle Multiple Result Sets in ExecuteSQL

NIFI-3432

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #1471
This commit is contained in:
patricker 2017-10-04 11:30:51 +08:00 committed by Matthew Burgess
parent d4168f5ff1
commit 0876cf12b1
3 changed files with 86 additions and 31 deletions

View File

@ -196,44 +196,84 @@ public class ExecuteSQL extends AbstractProcessor {
selectQuery = queryContents.toString(); selectQuery = queryContents.toString();
} }
int resultCount=0;
try (final Connection con = dbcpService.getConnection(); try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) { final Statement st = con.createStatement()) {
st.setQueryTimeout(queryTimeout); // timeout in seconds st.setQueryTimeout(queryTimeout); // timeout in seconds
final AtomicLong nrOfRows = new AtomicLong(0L);
if (fileToProcess == null) { logger.debug("Executing query {}", new Object[]{selectQuery});
fileToProcess = session.create(); boolean results = st.execute(selectQuery);
}
fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
@Override while(results){
public void process(final OutputStream out) throws IOException { FlowFile resultSetFF;
try { if(fileToProcess == null){
logger.debug("Executing query {}", new Object[]{selectQuery}); resultSetFF = session.create();
final ResultSet resultSet = st.executeQuery(selectQuery); } else {
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() resultSetFF = session.create(fileToProcess);
.convertNames(convertNamesForAvro) resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
.useLogicalTypes(useAvroLogicalTypes)
.defaultPrecision(defaultPrecision)
.defaultScale(defaultScale)
.build();
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
} catch (final SQLException e) {
throw new ProcessException(e);
}
} }
});
long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS); final AtomicLong nrOfRows = new AtomicLong(0L);
resultSetFF = session.write(resultSetFF, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
// set attribute how many rows were selected final ResultSet resultSet = st.getResultSet();
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
fileToProcess = session.putAttribute(fileToProcess, RESULT_QUERY_DURATION, String.valueOf(duration)); .convertNames(convertNamesForAvro)
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); .useLogicalTypes(useAvroLogicalTypes)
.defaultPrecision(defaultPrecision)
.defaultScale(defaultScale)
.build();
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
} catch (final SQLException e) {
throw new ProcessException(e);
}
}
});
logger.info("{} contains {} Avro records; transferring to 'success'", long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
new Object[]{fileToProcess, nrOfRows.get()});
session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", duration); // set attribute how many rows were selected
session.transfer(fileToProcess, REL_SUCCESS); resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration));
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{resultSetFF, nrOfRows.get()});
session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration);
session.transfer(resultSetFF, REL_SUCCESS);
resultCount++;
// are there anymore result sets?
try{
results = st.getMoreResults();
} catch(SQLException ex){
results = false;
}
}
//If we had at least one result then it's OK to drop the original file, but if we had no results then
// pass the original flow file down the line to trigger downstream processors
if(fileToProcess != null){
if(resultCount > 0){
session.remove(fileToProcess);
} else {
fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
JdbcCommon.createEmptyAvroStream(out);
}
});
session.transfer(fileToProcess, REL_SUCCESS);
}
}
} catch (final ProcessException | SQLException e) { } catch (final ProcessException | SQLException e) {
//If we had at least one result then it's OK to drop the original file, but if we had no results then
// pass the original flow file down the line to trigger downstream processors
if (fileToProcess == null) { if (fileToProcess == null) {
// This can happen if any exceptions occur while setting up the connection, statement, etc. // This can happen if any exceptions occur while setting up the connection, statement, etc.
logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure", logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure",

View File

@ -173,6 +173,16 @@ public class JdbcCommon {
return convertToAvroStream(rs, outStream, options, callback); return convertToAvroStream(rs, outStream, options, callback);
} }
public static void createEmptyAvroStream(final OutputStream outStream) throws IOException {
final FieldAssembler<Schema> builder = SchemaBuilder.record("NiFi_ExecuteSQL_Record").namespace("any.data").fields();
final Schema schema = builder.endRecord();
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(schema, outStream);
}
}
public static class AvroConversionOptions { public static class AvroConversionOptions {
private final String recordName; private final String recordName;
private final int maxRows; private final int maxRows;

View File

@ -222,7 +222,10 @@ public class TestExecuteSQL {
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS"); runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS");
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); //No incoming flow file containing a query, and an exception causes no outbound flowfile.
// There should be no flow files on either relationship
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 0);
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 0);
} }
public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final boolean setQueryProperty) public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final boolean setQueryProperty)
@ -284,6 +287,8 @@ public class TestExecuteSQL {
} }
} }
/** /**
* Simple implementation only for ExecuteSQL processor testing. * Simple implementation only for ExecuteSQL processor testing.
* *