From 0876cf12b17ca247703f30c87773d7efac947bdd Mon Sep 17 00:00:00 2001 From: patricker Date: Wed, 4 Oct 2017 11:30:51 +0800 Subject: [PATCH] NIFI-3432: Handle Multiple Result Sets in ExecuteSQL NIFI-3432 Signed-off-by: Matthew Burgess This closes #1471 --- .../nifi/processors/standard/ExecuteSQL.java | 100 ++++++++++++------ .../processors/standard/util/JdbcCommon.java | 10 ++ .../processors/standard/TestExecuteSQL.java | 7 +- 3 files changed, 86 insertions(+), 31 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index ad795953c4..210412634f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -196,44 +196,84 @@ public class ExecuteSQL extends AbstractProcessor { selectQuery = queryContents.toString(); } + int resultCount=0; try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { st.setQueryTimeout(queryTimeout); // timeout in seconds - final AtomicLong nrOfRows = new AtomicLong(0L); - if (fileToProcess == null) { - fileToProcess = session.create(); - } - fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - try { - logger.debug("Executing query {}", new Object[]{selectQuery}); - final ResultSet resultSet = st.executeQuery(selectQuery); - final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() - .convertNames(convertNamesForAvro) - .useLogicalTypes(useAvroLogicalTypes) - .defaultPrecision(defaultPrecision) - .defaultScale(defaultScale) - .build(); - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); - } catch (final SQLException e) { - throw new ProcessException(e); - } + + logger.debug("Executing query {}", new Object[]{selectQuery}); + boolean results = st.execute(selectQuery); + + + while(results){ + FlowFile resultSetFF; + if(fileToProcess == null){ + resultSetFF = session.create(); + } else { + resultSetFF = session.create(fileToProcess); + resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes()); } - }); - long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS); + final AtomicLong nrOfRows = new AtomicLong(0L); + resultSetFF = session.write(resultSetFF, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { - // set attribute how many rows were selected - fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - fileToProcess = session.putAttribute(fileToProcess, RESULT_QUERY_DURATION, String.valueOf(duration)); - fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); + final ResultSet resultSet = st.getResultSet(); + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() + .convertNames(convertNamesForAvro) + .useLogicalTypes(useAvroLogicalTypes) + .defaultPrecision(defaultPrecision) + .defaultScale(defaultScale) + .build(); + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); + } catch (final SQLException e) { + throw new ProcessException(e); + } + } + }); - logger.info("{} contains {} Avro records; transferring to 'success'", - new Object[]{fileToProcess, nrOfRows.get()}); - session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", duration); - session.transfer(fileToProcess, REL_SUCCESS); + long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS); + + // set attribute how many rows were selected + resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(duration)); + resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); + + logger.info("{} contains {} Avro records; transferring to 'success'", + new Object[]{resultSetFF, nrOfRows.get()}); + session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", duration); + session.transfer(resultSetFF, REL_SUCCESS); + + resultCount++; + // are there anymore result sets? + try{ + results = st.getMoreResults(); + } catch(SQLException ex){ + results = false; + } + } + + //If we had at least one result then it's OK to drop the original file, but if we had no results then + // pass the original flow file down the line to trigger downstream processors + if(fileToProcess != null){ + if(resultCount > 0){ + session.remove(fileToProcess); + } else { + fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + JdbcCommon.createEmptyAvroStream(out); + } + }); + + session.transfer(fileToProcess, REL_SUCCESS); + } + } } catch (final ProcessException | SQLException e) { + //If we had at least one result then it's OK to drop the original file, but if we had no results then + // pass the original flow file down the line to trigger downstream processors if (fileToProcess == null) { // This can happen if any exceptions occur while setting up the connection, statement, etc. logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure", diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index bd9a74c280..fd8f71e482 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -173,6 +173,16 @@ public class JdbcCommon { return convertToAvroStream(rs, outStream, options, callback); } + public static void createEmptyAvroStream(final OutputStream outStream) throws IOException { + final FieldAssembler builder = SchemaBuilder.record("NiFi_ExecuteSQL_Record").namespace("any.data").fields(); + final Schema schema = builder.endRecord(); + + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + try (final DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter)) { + dataFileWriter.create(schema, outStream); + } + } + public static class AvroConversionOptions { private final String recordName; private final int maxRows; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 5fd1af8200..69b7ae5d91 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -222,7 +222,10 @@ public class TestExecuteSQL { runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS"); runner.run(); - runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 1); + //No incoming flow file containing a query, and an exception causes no outbound flowfile. + // There should be no flow files on either relationship + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_FAILURE, 0); + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 0); } public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final boolean setQueryProperty) @@ -284,6 +287,8 @@ public class TestExecuteSQL { } } + + /** * Simple implementation only for ExecuteSQL processor testing. *