diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java index bd6962c0f9..55a4326908 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -169,6 +169,22 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); + public static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder() + .name("esql-auto-commit") + .displayName("Set Auto Commit") + .description("Enables or disables the auto commit functionality of the DB connection. Default value is 'true'. " + + "The default value can be used with most of the JDBC drivers and this functionality doesn't have any impact in most of the cases " + + "since this processor is used to read data. " + + "However, for some JDBC drivers such as PostgreSQL driver, it is required to disable the auto committing functionality " + + "to limit the number of result rows fetching at a time. " + + "When auto commit is enabled, postgreSQL driver loads whole result set to memory at once. " + + "This could lead for a large amount of memory usage when executing queries which fetch large data sets. " + + "More Details of this behaviour in PostgreSQL driver can be found in https://jdbc.postgresql.org//documentation/head/query.html. ") + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + protected List propDescriptors; protected DBCPService dbcpService; @@ -236,195 +252,202 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { } int resultCount = 0; - try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes()); - final PreparedStatement st = con.prepareStatement(selectQuery)) { - if (fetchSize != null && fetchSize > 0) { - try { - st.setFetchSize(fetchSize); - } catch (SQLException se) { - // Not all drivers support this, just log the error (at debug level) and move on - logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se); - } - } - st.setQueryTimeout(queryTimeout); // timeout in seconds - - // Execute pre-query, throw exception and cleanup Flow Files if fail - Pair failure = executeConfigStatements(con, preQueries); - if (failure != null) { - // In case of failure, assigning config query to "selectQuery" to follow current error handling - selectQuery = failure.getLeft(); - throw failure.getRight(); - } - - if (fileToProcess != null) { - JdbcCommon.setParameters(st, fileToProcess.getAttributes()); - } - logger.debug("Executing query {}", new Object[]{selectQuery}); - - int fragmentIndex = 0; - final String fragmentId = UUID.randomUUID().toString(); - - final StopWatch executionTime = new StopWatch(true); - - boolean hasResults = st.execute(); - - long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS); - - boolean hasUpdateCount = st.getUpdateCount() != -1; - - Map inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes(); - String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key()); - while (hasResults || hasUpdateCount) { - //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet - if (hasResults) { - final AtomicLong nrOfRows = new AtomicLong(0L); - + try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes())) { + con.setAutoCommit(context.getProperty(AUTO_COMMIT).asBoolean()); + try (final PreparedStatement st = con.prepareStatement(selectQuery)) { + if (fetchSize != null && fetchSize > 0) { try { - final ResultSet resultSet = st.getResultSet(); - do { - final StopWatch fetchTime = new StopWatch(true); + st.setFetchSize(fetchSize); + } catch (SQLException se) { + // Not all drivers support this, just log the error (at debug level) and move on + logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se); + } + } + st.setQueryTimeout(queryTimeout); // timeout in seconds - FlowFile resultSetFF; - if (fileToProcess == null) { - resultSetFF = session.create(); - } else { - resultSetFF = session.create(fileToProcess); - } + // Execute pre-query, throw exception and cleanup Flow Files if fail + Pair failure = executeConfigStatements(con, preQueries); + if (failure != null) { + // In case of failure, assigning config query to "selectQuery" to follow current error handling + selectQuery = failure.getLeft(); + throw failure.getRight(); + } - if (inputFileAttrMap != null) { - resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap); - } + if (fileToProcess != null) { + JdbcCommon.setParameters(st, fileToProcess.getAttributes()); + } + logger.debug("Executing query {}", new Object[]{selectQuery}); + int fragmentIndex = 0; + final String fragmentId = UUID.randomUUID().toString(); - try { - resultSetFF = session.write(resultSetFF, out -> { - try { - nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null)); - } catch (Exception e) { - throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e); - } - }); + final StopWatch executionTime = new StopWatch(true); - long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS); + boolean hasResults = st.execute(); - // set attributes - final Map attributesToAdd = new HashMap<>(); - attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed)); - attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed)); - attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed)); - attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount)); - if (inputFileUUID != null) { - attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID); - } - attributesToAdd.putAll(sqlWriter.getAttributesToAdd()); - resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd); - sqlWriter.updateCounters(session); + long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS); - // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes - if (maxRowsPerFlowFile > 0) { - // if row count is zero and this is not the first fragment, drop it instead of committing it. - if (nrOfRows.get() == 0 && fragmentIndex > 0) { - session.remove(resultSetFF); - break; - } + boolean hasUpdateCount = st.getUpdateCount() != -1; - resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId); - resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex)); - } + Map inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes(); + String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key()); + while (hasResults || hasUpdateCount) { + //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet + if (hasResults) { + final AtomicLong nrOfRows = new AtomicLong(0L); - logger.info("{} contains {} records; transferring to 'success'", new Object[]{resultSetFF, nrOfRows.get()}); + try { + final ResultSet resultSet = st.getResultSet(); + do { + final StopWatch fetchTime = new StopWatch(true); - // Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise - if(context.hasIncomingConnection()) { - session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed); + FlowFile resultSetFF; + if (fileToProcess == null) { + resultSetFF = session.create(); } else { - session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed); + resultSetFF = session.create(fileToProcess); } - resultSetFlowFiles.add(resultSetFF); - // If we've reached the batch size, send out the flow files - if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) { - session.transfer(resultSetFlowFiles, REL_SUCCESS); - // Need to remove the original input file if it exists - if (fileToProcess != null) { - session.remove(fileToProcess); - fileToProcess = null; + if (inputFileAttrMap != null) { + resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap); + } + + + try { + resultSetFF = session.write(resultSetFF, out -> { + try { + nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null)); + } catch (Exception e) { + throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e); + } + }); + + long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS); + + // set attributes + final Map attributesToAdd = new HashMap<>(); + attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed)); + attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed)); + attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed)); + attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount)); + if (inputFileUUID != null) { + attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID); + } + attributesToAdd.putAll(sqlWriter.getAttributesToAdd()); + resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd); + sqlWriter.updateCounters(session); + + // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes + if (maxRowsPerFlowFile > 0) { + // if row count is zero and this is not the first fragment, drop it instead of committing it. + if (nrOfRows.get() == 0 && fragmentIndex > 0) { + session.remove(resultSetFF); + break; + } + + resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId); + resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex)); } - session.commitAsync(); - resultSetFlowFiles.clear(); - } + logger.info("{} contains {} records; transferring to 'success'", new Object[]{resultSetFF, nrOfRows.get()}); - fragmentIndex++; - } catch (Exception e) { - // Remove any result set flow file(s) and propagate the exception - session.remove(resultSetFF); - session.remove(resultSetFlowFiles); - if (e instanceof ProcessException) { - throw (ProcessException) e; - } else { - throw new ProcessException(e); + // Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise + if (context.hasIncomingConnection()) { + session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed); + } else { + session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed); + } + resultSetFlowFiles.add(resultSetFF); + + // If we've reached the batch size, send out the flow files + if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) { + session.transfer(resultSetFlowFiles, REL_SUCCESS); + // Need to remove the original input file if it exists + if (fileToProcess != null) { + session.remove(fileToProcess); + fileToProcess = null; + } + + session.commitAsync(); + resultSetFlowFiles.clear(); + } + + fragmentIndex++; + } catch (Exception e) { + // Remove any result set flow file(s) and propagate the exception + session.remove(resultSetFF); + session.remove(resultSetFlowFiles); + if (e instanceof ProcessException) { + throw (ProcessException) e; + } else { + throw new ProcessException(e); + } + } + } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile); + + // If we are splitting results but not outputting batches, set count on all FlowFiles + if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) { + for (int i = 0; i < resultSetFlowFiles.size(); i++) { + resultSetFlowFiles.set(i, + session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex))); } } - } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile); - - // If we are splitting results but not outputting batches, set count on all FlowFiles - if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) { - for (int i = 0; i < resultSetFlowFiles.size(); i++) { - resultSetFlowFiles.set(i, - session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex))); - } + } catch (final SQLException e) { + throw new ProcessException(e); } - } catch (final SQLException e) { - throw new ProcessException(e); + + resultCount++; } - resultCount++; + // are there anymore result sets? + try { + hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT); + hasUpdateCount = st.getUpdateCount() != -1; + } catch (SQLException ex) { + hasResults = false; + hasUpdateCount = false; + } } - // are there anymore result sets? - try { - hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT); - hasUpdateCount = st.getUpdateCount() != -1; - } catch (SQLException ex) { - hasResults = false; - hasUpdateCount = false; + // Execute post-query, throw exception and cleanup Flow Files if fail + failure = executeConfigStatements(con, postQueries); + if (failure != null) { + selectQuery = failure.getLeft(); + resultSetFlowFiles.forEach(ff -> session.remove(ff)); + throw failure.getRight(); } - } - // Execute post-query, throw exception and cleanup Flow Files if fail - failure = executeConfigStatements(con, postQueries); - if (failure != null) { - selectQuery = failure.getLeft(); - resultSetFlowFiles.forEach(ff -> session.remove(ff)); - throw failure.getRight(); - } - - // Transfer any remaining files to SUCCESS - session.transfer(resultSetFlowFiles, REL_SUCCESS); - resultSetFlowFiles.clear(); - - //If we had at least one result then it's OK to drop the original file, but if we had no results then - // pass the original flow file down the line to trigger downstream processors - if (fileToProcess != null) { - if (resultCount > 0) { - session.remove(fileToProcess); - } else { - fileToProcess = session.write(fileToProcess, out -> sqlWriter.writeEmptyResultSet(out, getLogger())); - fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0"); - fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType()); - session.transfer(fileToProcess, REL_SUCCESS); + // If the auto commit is set to false, commit() is called for consistency + if (!con.getAutoCommit()) { + con.commit(); } - } else if (resultCount == 0) { - //If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only) - // Then generate an empty Output FlowFile - FlowFile resultSetFF = session.create(); - resultSetFF = session.write(resultSetFF, out -> sqlWriter.writeEmptyResultSet(out, getLogger())); - resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0"); - resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType()); - session.transfer(resultSetFF, REL_SUCCESS); + // Transfer any remaining files to SUCCESS + session.transfer(resultSetFlowFiles, REL_SUCCESS); + resultSetFlowFiles.clear(); + + //If we had at least one result then it's OK to drop the original file, but if we had no results then + // pass the original flow file down the line to trigger downstream processors + if (fileToProcess != null) { + if (resultCount > 0) { + session.remove(fileToProcess); + } else { + fileToProcess = session.write(fileToProcess, out -> sqlWriter.writeEmptyResultSet(out, getLogger())); + fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0"); + fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType()); + session.transfer(fileToProcess, REL_SUCCESS); + } + } else if (resultCount == 0) { + //If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only) + // Then generate an empty Output FlowFile + FlowFile resultSetFF = session.create(); + + resultSetFF = session.write(resultSetFF, out -> sqlWriter.writeEmptyResultSet(out, getLogger())); + resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0"); + resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType()); + session.transfer(resultSetFF, REL_SUCCESS); + } } } catch (final ProcessException | SQLException e) { //If we had at least one result then it's OK to drop the original file, but if we had no results then diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index b903e46107..cc819db405 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -134,6 +134,7 @@ public class ExecuteSQL extends AbstractExecuteSQL { pds.add(MAX_ROWS_PER_FLOW_FILE); pds.add(OUTPUT_BATCH_SIZE); pds.add(FETCH_SIZE); + pds.add(AUTO_COMMIT); propDescriptors = Collections.unmodifiableList(pds); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java index 76eeaae0a3..2ebddacef0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java @@ -140,6 +140,7 @@ public class ExecuteSQLRecord extends AbstractExecuteSQL { pds.add(MAX_ROWS_PER_FLOW_FILE); pds.add(OUTPUT_BATCH_SIZE); pds.add(FETCH_SIZE); + pds.add(AUTO_COMMIT); propDescriptors = Collections.unmodifiableList(pds); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 16cd1d5929..7227ce275f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -175,6 +175,18 @@ public class TestExecuteSQL { invokeOnTrigger(1, QUERY_WITH_EL, true, null, true); // 1 second max time } + @Test + public void testAutoCommitFalse() throws InitializationException, ClassNotFoundException, SQLException, IOException { + runner.setProperty(ExecuteSQL.AUTO_COMMIT, "false"); + invokeOnTrigger(null, QUERY_WITHOUT_EL, true, null, false); + } + + @Test + public void testAutoCommitTrue() throws InitializationException, ClassNotFoundException, SQLException, IOException { + runner.setProperty(ExecuteSQL.AUTO_COMMIT, "true"); + invokeOnTrigger(null, QUERY_WITHOUT_EL, true, null, false); + } + @Test public void testWithNullIntColumn() throws SQLException { // remove previous test database, if any @@ -556,6 +568,11 @@ public class TestExecuteSQL { SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100); LOGGER.info("test data loaded"); + //commit loaded data if auto-commit is dissabled + if (!con.getAutoCommit()){ + con.commit(); + } + // ResultSet size will be 1x200x100 = 20 000 rows // because of where PER.ID = ${person.id} final int nrOfRows = 20000; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java index fe17e9464c..edf013f904 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java @@ -169,6 +169,18 @@ public class TestExecuteSQLRecord { assertEquals(ProvenanceEventType.FETCH, runner.getProvenanceEvents().get(1).getEventType()); } + @Test + public void testAutoCommitFalse() throws InitializationException, ClassNotFoundException, SQLException, IOException { + runner.setProperty(ExecuteSQL.AUTO_COMMIT, "false"); + invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, true, null, false); + } + + @Test + public void testAutoCommitTrue() throws InitializationException, ClassNotFoundException, SQLException, IOException { + runner.setProperty(ExecuteSQL.AUTO_COMMIT, "true"); + invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, true, null, false); + } + @Test public void testWithOutputBatching() throws InitializationException, SQLException { // remove previous test database, if any @@ -545,6 +557,11 @@ public class TestExecuteSQLRecord { SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100); LOGGER.info("test data loaded"); + //commit loaded data if auto-commit is dissabled + if (!con.getAutoCommit()){ + con.commit(); + } + // ResultSet size will be 1x200x100 = 20 000 rows // because of where PER.ID = ${person.id} final int nrOfRows = 20000;