NIFI-12010: Handle auto-commit and commit based on driver capabilities in SQL components

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #7663
This commit is contained in:
Matt Burgess 2023-08-30 23:30:22 -04:00 committed by Arpad Boda
parent 6cffc78ad2
commit 932cfe22a3
No known key found for this signature in database
GPG Key ID: 065668F2A58F097F
8 changed files with 60 additions and 9 deletions

View File

@ -27,6 +27,7 @@ import java.io.File;
import java.sql.Connection; import java.sql.Connection;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement; import java.sql.Statement;
/** /**
@ -146,7 +147,14 @@ public class AuditDataSourceFactoryBean implements FactoryBean {
try { try {
// get a connection // get a connection
connection = connectionPool.getConnection(); connection = connectionPool.getConnection();
connection.setAutoCommit(false); final boolean isAutoCommit = connection.getAutoCommit();
if (isAutoCommit) {
try {
connection.setAutoCommit(false);
} catch (SQLFeatureNotSupportedException sfnse) {
logger.debug("setAutoCommit(false) not supported by this driver");
}
}
// create a statement for initializing the database // create a statement for initializing the database
statement = connection.createStatement(); statement = connection.createStatement();

View File

@ -18,6 +18,7 @@ package org.apache.nifi.admin.service.transaction.impl;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import javax.sql.DataSource; import javax.sql.DataSource;
import org.apache.nifi.admin.service.transaction.Transaction; import org.apache.nifi.admin.service.transaction.Transaction;
import org.apache.nifi.admin.service.transaction.TransactionBuilder; import org.apache.nifi.admin.service.transaction.TransactionBuilder;
@ -35,7 +36,14 @@ public class StandardTransactionBuilder implements TransactionBuilder {
try { try {
// get a new connection // get a new connection
Connection connection = dataSource.getConnection(); Connection connection = dataSource.getConnection();
connection.setAutoCommit(false); final boolean isAutoCommit = connection.getAutoCommit();
if (isAutoCommit) {
try {
connection.setAutoCommit(false);
} catch (SQLFeatureNotSupportedException sfnse) {
throw new TransactionException("setAutoCommit(false) not supported by this driver");
}
}
// create a new transaction // create a new transaction
return new StandardTransaction(connection); return new StandardTransaction(connection);

View File

@ -56,6 +56,7 @@ import org.codehaus.groovy.runtime.StackTraceUtils;
import java.io.File; import java.io.File;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -353,7 +354,11 @@ public class ExecuteGroovyScript extends AbstractProcessor {
//try to set autocommit to false //try to set autocommit to false
try { try {
if (sql.getConnection().getAutoCommit()) { if (sql.getConnection().getAutoCommit()) {
sql.getConnection().setAutoCommit(false); try {
sql.getConnection().setAutoCommit(false);
} catch (SQLFeatureNotSupportedException sfnse) {
getLogger().debug("setAutoCommit(false) not supported by this driver");
}
} }
} catch (Throwable ei) { } catch (Throwable ei) {
getLogger().warn("Failed to set autocommit=false for `" + e.getKey() + "`", ei); getLogger().warn("Failed to set autocommit=false for `" + e.getKey() + "`", ei);
@ -382,7 +387,11 @@ public class ExecuteGroovyScript extends AbstractProcessor {
OSql sql = (OSql) e.getValue(); OSql sql = (OSql) e.getValue();
try { try {
if (!sql.getConnection().getAutoCommit()) { if (!sql.getConnection().getAutoCommit()) {
sql.getConnection().setAutoCommit(true); //default autocommit value in nifi try {
sql.getConnection().setAutoCommit(true); //default autocommit value in nifi
} catch (SQLFeatureNotSupportedException sfnse) {
getLogger().debug("setAutoCommit(true) not supported by this driver");
}
} }
} catch (Throwable ei) { } catch (Throwable ei) {
getLogger().warn("Failed to set autocommit=true for `" + e.getKey() + "`", ei); getLogger().warn("Failed to set autocommit=true for `" + e.getKey() + "`", ei);

View File

@ -33,7 +33,10 @@ public class ScriptRunner {
public ScriptRunner(Connection connection) throws SQLException { public ScriptRunner(Connection connection) throws SQLException {
this.connection = connection; this.connection = connection;
this.connection.setAutoCommit(true); if (!this.connection.getAutoCommit()) {
// May throw SQLFeatureNotSupportedException which is a subclass of SQLException
this.connection.setAutoCommit(true);
}
} }
public void runScript(Reader reader) throws IOException, SQLException { public void runScript(Reader reader) throws IOException, SQLException {

View File

@ -42,6 +42,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -255,7 +256,16 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor {
int resultCount = 0; int resultCount = 0;
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes())) { try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes())) {
con.setAutoCommit(context.getProperty(AUTO_COMMIT).asBoolean()); final boolean isAutoCommit = con.getAutoCommit();
final boolean setAutoCommitValue = context.getProperty(AUTO_COMMIT).asBoolean();
// Only set auto-commit if necessary, log any "feature not supported" exceptions
if (isAutoCommit != setAutoCommitValue) {
try {
con.setAutoCommit(setAutoCommitValue);
} catch (SQLFeatureNotSupportedException sfnse) {
logger.debug("setAutoCommit({}) not supported by this driver", setAutoCommitValue);
}
}
try (final PreparedStatement st = con.prepareStatement(selectQuery)) { try (final PreparedStatement st = con.prepareStatement(selectQuery)) {
if (fetchSize != null && fetchSize > 0) { if (fetchSize != null && fetchSize > 0) {
try { try {

View File

@ -60,6 +60,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLNonTransientException; import java.sql.SQLNonTransientException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
@ -291,7 +292,11 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
fc.originalAutoCommit = connection.getAutoCommit(); fc.originalAutoCommit = connection.getAutoCommit();
final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean(); final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
if(fc.originalAutoCommit != autocommit) { if(fc.originalAutoCommit != autocommit) {
connection.setAutoCommit(autocommit); try {
connection.setAutoCommit(autocommit);
} catch (SQLFeatureNotSupportedException sfnse) {
getLogger().debug("setAutoCommit({}) not supported by this driver", autocommit);
}
} }
} catch (SQLException e) { } catch (SQLException e) {
throw new ProcessException("Failed to disable auto commit due to " + e, e); throw new ProcessException("Failed to disable auto commit due to " + e, e);

View File

@ -37,6 +37,7 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -786,7 +787,7 @@ public class TestExecuteSQL {
try { try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con; return Mockito.spy(con);
} catch (final Exception e) { } catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e); throw new ProcessException("getConnection failed: " + e);
} }

View File

@ -46,6 +46,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLDataException; import java.sql.SQLDataException;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -202,7 +203,13 @@ public class DatabaseRecordSink extends AbstractControllerService implements Rec
try { try {
connection = dbcpService.getConnection(attributes); connection = dbcpService.getConnection(attributes);
originalAutoCommit = connection.getAutoCommit(); originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false); if (originalAutoCommit) {
try {
connection.setAutoCommit(false);
} catch (SQLFeatureNotSupportedException sfnse) {
getLogger().debug("setAutoCommit(false) not supported by this driver");
}
}
final DMLSettings settings = new DMLSettings(context); final DMLSettings settings = new DMLSettings(context);
final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions().getValue(); final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions().getValue();
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions().getValue(); final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions().getValue();