NIFI-11898 Handle commit based on driver capabilities in PutDatabaseRecord

This closes #7561

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matt Burgess 2023-08-02 12:10:10 -04:00 committed by exceptionfactory
parent 3a6f86f599
commit 63c72bd7e2
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 89 additions and 11 deletions

View File

@ -70,6 +70,7 @@ import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTransientException;
import java.sql.Statement;
@ -471,10 +472,19 @@ public class PutDatabaseRecord extends AbstractProcessor {
connectionHolder = Optional.of(connection);
originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
if (originalAutoCommit) {
try {
connection.setAutoCommit(false);
} catch (SQLFeatureNotSupportedException sfnse) {
getLogger().debug("setAutoCommit(false) not supported by this driver");
}
}
putToDatabase(context, session, flowFile, connection);
connection.commit();
// Only commit the connection if auto-commit is false
if (!originalAutoCommit) {
connection.commit();
}
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, getJdbcUrl(connection));
@ -496,13 +506,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
if (rollbackOnFailure) {
session.rollback();
} else {
flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown": e.getMessage()));
session.transfer(flowFile, relationship);
}
connectionHolder.ifPresent(connection -> {
try {
connection.rollback();
if (!connection.getAutoCommit()) {
connection.rollback();
}
} catch (final Exception rollbackException) {
getLogger().error("Failed to rollback JDBC transaction", rollbackException);
}

View File

@ -17,6 +17,8 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.dbcp2.DelegatingConnection;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processors.standard.db.ColumnDescription;
@ -53,6 +55,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.time.LocalDate;
import java.time.ZoneOffset;
@ -74,6 +77,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -81,6 +85,8 @@ import static org.mockito.Mockito.when;
public class PutDatabaseRecordTest {
private static String DBCP_SERVICE_ID = "dbcp";
private static final String CONNECTION_FAILED = "Connection Failed";
private static final String PARSER_ID = MockRecordParser.class.getSimpleName();
@ -102,7 +108,7 @@ public class PutDatabaseRecordTest {
TestRunner runner;
PutDatabaseRecord processor;
DBCPServiceSimpleImpl dbcp;
DBCPService dbcp;
@BeforeAll
public static void setDatabaseLocation() {
@ -143,9 +149,9 @@ public class PutDatabaseRecordTest {
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
}
@Test
@ -166,6 +172,42 @@ public class PutDatabaseRecordTest {
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE);
}
@Test
public void testSetAutoCommitFalseFailure() throws InitializationException, SQLException {
dbcp = new DBCPServiceAutoCommitTest(DB_LOCATION);
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);
parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.INT);
parser.addSchemaField("dt", RecordFieldType.DATE);
LocalDate testDate1 = LocalDate.of(2021, 1, 26);
Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26);
Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ
parser.addRecord(1, "rec1", 101, jdbcDate1);
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");
runner.enqueue(new byte[0]);
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS);
}
@Test
public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException {
// Need to override the @Before method with a new processor that behaves badly
@ -176,9 +218,9 @@ public class PutDatabaseRecordTest {
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
recreateTable();
final MockRecordParser parser = new MockRecordParser();
@ -1766,9 +1808,9 @@ public class PutDatabaseRecordTest {
void testInsertEnum() throws InitializationException, ProcessException, SQLException, IOException {
dbcp = spy(new DBCPServiceSimpleImpl(DB_LOCATION, false)); // Use H2
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, new HashMap<>());
runner.addControllerService(DBCP_SERVICE_ID, dbcp, new HashMap<>());
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "dbcp");
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
try (Connection conn = dbcp.getConnection()) {
conn.createStatement().executeUpdate("DROP TABLE IF EXISTS ENUM_TEST");
}
@ -1949,4 +1991,28 @@ public class PutDatabaseRecordTest {
return new SqlAndIncludedColumns("INSERT INTO PERSONS VALUES (?,?,?,?)", Arrays.asList(0, 1, 2, 3));
}
}
static class DBCPServiceAutoCommitTest extends AbstractControllerService implements DBCPService {
private final String databaseLocation;
public DBCPServiceAutoCommitTest(final String databaseLocation) {
this.databaseLocation = databaseLocation;
}
@Override
public String getIdentifier() {
return DBCP_SERVICE_ID;
}
@Override
public Connection getConnection() throws ProcessException {
try {
Connection spyConnection = spy(DriverManager.getConnection("jdbc:derby:" + databaseLocation + ";create=true"));
doThrow(SQLFeatureNotSupportedException.class).when(spyConnection).setAutoCommit(false);
return spyConnection;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}
}
}
}