From 63c72bd7e2e27229039ef28f695b19cefa6fdba1 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Wed, 2 Aug 2023 12:10:10 -0400 Subject: [PATCH] NIFI-11898 Handle commit based on driver capabilities in PutDatabaseRecord This closes #7561 Signed-off-by: David Handermann --- .../standard/PutDatabaseRecord.java | 20 ++++- .../standard/PutDatabaseRecordTest.java | 80 +++++++++++++++++-- 2 files changed, 89 insertions(+), 11 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index f90f907fc4..73ec4e83ee 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -70,6 +70,7 @@ import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.SQLDataException; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLIntegrityConstraintViolationException; import java.sql.SQLTransientException; import java.sql.Statement; @@ -471,10 +472,19 @@ public class PutDatabaseRecord extends AbstractProcessor { connectionHolder = Optional.of(connection); originalAutoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); + if (originalAutoCommit) { + try { + connection.setAutoCommit(false); + } catch (SQLFeatureNotSupportedException sfnse) { + getLogger().debug("setAutoCommit(false) not supported by this driver"); + } + } putToDatabase(context, session, flowFile, connection); - connection.commit(); + // Only commit the connection if auto-commit is false + if (!originalAutoCommit) { + connection.commit(); + } session.transfer(flowFile, REL_SUCCESS); session.getProvenanceReporter().send(flowFile, getJdbcUrl(connection)); @@ -496,13 +506,15 @@ public class PutDatabaseRecord extends AbstractProcessor { if (rollbackOnFailure) { session.rollback(); } else { - flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage()); + flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown": e.getMessage())); session.transfer(flowFile, relationship); } connectionHolder.ifPresent(connection -> { try { - connection.rollback(); + if (!connection.getAutoCommit()) { + connection.rollback(); + } } catch (final Exception rollbackException) { getLogger().error("Failed to rollback JDBC transaction", rollbackException); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java index e3c6600dc6..e9fd13b005 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java @@ -17,6 +17,8 @@ package org.apache.nifi.processors.standard; import org.apache.commons.dbcp2.DelegatingConnection; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.pattern.RollbackOnFailure; import org.apache.nifi.processors.standard.db.ColumnDescription; @@ -53,6 +55,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLDataException; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; import java.sql.Statement; import java.time.LocalDate; import java.time.ZoneOffset; @@ -74,6 +77,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -81,6 +85,8 @@ import static org.mockito.Mockito.when; public class PutDatabaseRecordTest { + private static String DBCP_SERVICE_ID = "dbcp"; + private static final String CONNECTION_FAILED = "Connection Failed"; private static final String PARSER_ID = MockRecordParser.class.getSimpleName(); @@ -102,7 +108,7 @@ public class PutDatabaseRecordTest { TestRunner runner; PutDatabaseRecord processor; - DBCPServiceSimpleImpl dbcp; + DBCPService dbcp; @BeforeAll public static void setDatabaseLocation() { @@ -143,9 +149,9 @@ public class PutDatabaseRecordTest { final Map dbcpProperties = new HashMap<>(); runner = TestRunners.newTestRunner(processor); - runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties); runner.enableControllerService(dbcp); - runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp"); + runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID); } @Test @@ -166,6 +172,42 @@ public class PutDatabaseRecordTest { runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE); } + @Test + public void testSetAutoCommitFalseFailure() throws InitializationException, SQLException { + dbcp = new DBCPServiceAutoCommitTest(DB_LOCATION); + final Map dbcpProperties = new HashMap<>(); + runner = TestRunners.newTestRunner(processor); + runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID); + + recreateTable(createPersons); + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("code", RecordFieldType.INT); + parser.addSchemaField("dt", RecordFieldType.DATE); + + LocalDate testDate1 = LocalDate.of(2021, 1, 26); + Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ + LocalDate testDate2 = LocalDate.of(2021, 7, 26); + Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ + + parser.addRecord(1, "rec1", 101, jdbcDate1); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS); + } + @Test public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException { // Need to override the @Before method with a new processor that behaves badly @@ -176,9 +218,9 @@ public class PutDatabaseRecordTest { final Map dbcpProperties = new HashMap<>(); runner = TestRunners.newTestRunner(processor); - runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties); runner.enableControllerService(dbcp); - runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp"); + runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID); recreateTable(); final MockRecordParser parser = new MockRecordParser(); @@ -1766,9 +1808,9 @@ public class PutDatabaseRecordTest { void testInsertEnum() throws InitializationException, ProcessException, SQLException, IOException { dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2 runner = TestRunners.newTestRunner(processor); - runner.addControllerService("dbcp", dbcp, new HashMap<>()); + runner.addControllerService(DBCP_SERVICE_ID, dbcp, new HashMap<>()); runner.enableControllerService(dbcp); - runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp"); + runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID); try (Connection conn = dbcp.getConnection()) { conn.createStatement().executeUpdate("DROP TABLE IF EXISTS ENUM_TEST"); } @@ -1949,4 +1991,28 @@ public class PutDatabaseRecordTest { return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES (?,?,?,?)", Arrays.asList(0, 1, 2, 3)); } } + + static class DBCPServiceAutoCommitTest extends AbstractControllerService implements DBCPService { + private final String databaseLocation; + + public DBCPServiceAutoCommitTest(final String databaseLocation) { + this.databaseLocation = databaseLocation; + } + + @Override + public String getIdentifier() { + return DBCP_SERVICE_ID; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Connection spyConnection = spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true")); + doThrow(SQLFeatureNotSupportedException.class).when(spyConnection).setAutoCommit(false); + return spyConnection; + } catch (final Exception e) { + throw new ProcessException("getConnection failed: " + e); + } + } + } } \ No newline at end of file