From 932cfe22a379ea83e11fe283a3f789115cce1607 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Wed, 30 Aug 2023 23:30:22 -0400 Subject: [PATCH] NIFI-12010: Handle auto-commit and commit based on driver capabilities in SQL components Signed-off-by: Arpad Boda This closes #7663 --- .../nifi/admin/AuditDataSourceFactoryBean.java | 10 +++++++++- .../impl/StandardTransactionBuilder.java | 10 +++++++++- .../processors/groovyx/ExecuteGroovyScript.java | 13 +++++++++++-- .../apache/nifi/hive/metastore/ScriptRunner.java | 5 ++++- .../processors/standard/AbstractExecuteSQL.java | 12 +++++++++++- .../org/apache/nifi/processors/standard/PutSQL.java | 7 ++++++- .../nifi/processors/standard/TestExecuteSQL.java | 3 ++- .../nifi/record/sink/db/DatabaseRecordSink.java | 9 ++++++++- 8 files changed, 60 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java index 22165d2b68..040a207dcb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/AuditDataSourceFactoryBean.java @@ -27,6 +27,7 @@ import java.io.File; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; import java.sql.Statement; /** @@ -146,7 +147,14 @@ public class AuditDataSourceFactoryBean implements FactoryBean { try { // get a connection connection = connectionPool.getConnection(); - connection.setAutoCommit(false); + final boolean isAutoCommit = connection.getAutoCommit(); + if (isAutoCommit) { + try { + connection.setAutoCommit(false); + } catch (SQLFeatureNotSupportedException sfnse) { + logger.debug("setAutoCommit(false) not supported by this driver"); + } + } // create a statement for initializing the database statement = connection.createStatement(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/service/transaction/impl/StandardTransactionBuilder.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/service/transaction/impl/StandardTransactionBuilder.java index 7d4a1fcc44..e4b12180c2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/service/transaction/impl/StandardTransactionBuilder.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-administration/src/main/java/org/apache/nifi/admin/service/transaction/impl/StandardTransactionBuilder.java @@ -18,6 +18,7 @@ package org.apache.nifi.admin.service.transaction.impl; import java.sql.Connection; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; import javax.sql.DataSource; import org.apache.nifi.admin.service.transaction.Transaction; import org.apache.nifi.admin.service.transaction.TransactionBuilder; @@ -35,7 +36,14 @@ public class StandardTransactionBuilder implements TransactionBuilder { try { // get a new connection Connection connection = dataSource.getConnection(); - connection.setAutoCommit(false); + final boolean isAutoCommit = connection.getAutoCommit(); + if (isAutoCommit) { + try { + connection.setAutoCommit(false); + } catch (SQLFeatureNotSupportedException sfnse) { + throw new TransactionException("setAutoCommit(false) not supported by this driver"); + } + } // create a new transaction return new StandardTransaction(connection); diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java index c788e2c4dc..eca8b431a4 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java @@ -56,6 +56,7 @@ import org.codehaus.groovy.runtime.StackTraceUtils; import java.io.File; import java.lang.reflect.Method; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -353,7 +354,11 @@ public class ExecuteGroovyScript extends AbstractProcessor { //try to set autocommit to false try { if (sql.getConnection().getAutoCommit()) { - sql.getConnection().setAutoCommit(false); + try { + sql.getConnection().setAutoCommit(false); + } catch (SQLFeatureNotSupportedException sfnse) { + getLogger().debug("setAutoCommit(false) not supported by this driver"); + } } } catch (Throwable ei) { getLogger().warn("Failed to set autocommit=false for `" + e.getKey() + "`", ei); @@ -382,7 +387,11 @@ public class ExecuteGroovyScript extends AbstractProcessor { OSql sql = (OSql) e.getValue(); try { if (!sql.getConnection().getAutoCommit()) { - sql.getConnection().setAutoCommit(true); //default autocommit value in nifi + try { + sql.getConnection().setAutoCommit(true); //default autocommit value in nifi + } catch (SQLFeatureNotSupportedException sfnse) { + getLogger().debug("setAutoCommit(true) not supported by this driver"); + } } } catch (Throwable ei) { getLogger().warn("Failed to set autocommit=true for `" + e.getKey() + "`", ei); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java index 4e4c65d043..d3666fdbe4 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ScriptRunner.java @@ -33,7 +33,10 @@ public class ScriptRunner { public ScriptRunner(Connection connection) throws SQLException { this.connection = connection; - this.connection.setAutoCommit(true); + if (!this.connection.getAutoCommit()) { + // May throw SQLFeatureNotSupportedException which is a subclass of SQLException + this.connection.setAutoCommit(true); + } } public void runScript(Reader reader) throws IOException, SQLException { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java index cb6bc761f8..a76b2386be 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -42,6 +42,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; @@ -255,7 +256,16 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { int resultCount = 0; try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes())) { - con.setAutoCommit(context.getProperty(AUTO_COMMIT).asBoolean()); + final boolean isAutoCommit = con.getAutoCommit(); + final boolean setAutoCommitValue = context.getProperty(AUTO_COMMIT).asBoolean(); + // Only set auto-commit if necessary, log any "feature not supported" exceptions + if (isAutoCommit != setAutoCommitValue) { + try { + con.setAutoCommit(setAutoCommitValue); + } catch (SQLFeatureNotSupportedException sfnse) { + logger.debug("setAutoCommit({}) not supported by this driver", setAutoCommitValue); + } + } try (final PreparedStatement st = con.prepareStatement(selectQuery)) { if (fetchSize != null && fetchSize > 0) { try { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 4c365c91f2..3068f07587 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -60,6 +60,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLNonTransientException; import java.sql.Statement; import java.util.ArrayList; @@ -291,7 +292,11 @@ public class PutSQL extends AbstractSessionFactoryProcessor { fc.originalAutoCommit = connection.getAutoCommit(); final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean(); if(fc.originalAutoCommit != autocommit) { - connection.setAutoCommit(autocommit); + try { + connection.setAutoCommit(autocommit); + } catch (SQLFeatureNotSupportedException sfnse) { + getLogger().debug("setAutoCommit({}) not supported by this driver", autocommit); + } } } catch (SQLException e) { throw new ProcessException("Failed to disable auto commit due to " + e, e); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index b993587377..69c6dab739 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -37,6 +37,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -786,7 +787,7 @@ public class TestExecuteSQL { try { Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); - return con; + return Mockito.spy(con); } catch (final Exception e) { throw new ProcessException("getConnection failed: " + e); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/record/sink/db/DatabaseRecordSink.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/record/sink/db/DatabaseRecordSink.java index 27d9b79119..0a4a1fbfb7 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/record/sink/db/DatabaseRecordSink.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/record/sink/db/DatabaseRecordSink.java @@ -46,6 +46,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLDataException; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -202,7 +203,13 @@ public class DatabaseRecordSink extends AbstractControllerService implements Rec try { connection = dbcpService.getConnection(attributes); originalAutoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); + if (originalAutoCommit) { + try { + connection.setAutoCommit(false); + } catch (SQLFeatureNotSupportedException sfnse) { + getLogger().debug("setAutoCommit(false) not supported by this driver"); + } + } final DMLSettings settings = new DMLSettings(context); final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions().getValue(); final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions().getValue();