NIFI-13103 Enhance AutoCommit property in PutDatabaseRecord

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #8937
This commit is contained in:
Jim Steinebrey 2024-06-06 18:12:52 -04:00 committed by Matt Burgess
parent dfd1276230
commit 0c06340c5a
2 changed files with 72 additions and 52 deletions

View File

@ -27,7 +27,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
@ -72,7 +71,6 @@ import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTransientException;
import java.sql.Statement;
@ -86,7 +84,6 @@ import java.util.HashSet;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -385,7 +382,6 @@ public class PutDatabaseRecord extends AbstractProcessor {
.allowableValues("true", "false")
.defaultValue("false")
.required(false)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
static final PropertyDescriptor DB_TYPE;
@ -479,21 +475,22 @@ public class PutDatabaseRecord extends AbstractProcessor {
);
}
final boolean autoCommit = validationContext.getProperty(AUTO_COMMIT).asBoolean();
final Boolean autoCommit = validationContext.getProperty(AUTO_COMMIT).asBoolean();
final boolean rollbackOnFailure = validationContext.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
if (autoCommit && rollbackOnFailure) {
if (autoCommit != null && autoCommit && rollbackOnFailure) {
validationResults.add(new ValidationResult.Builder()
.subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName())
.explanation(format("'%s' cannot be set to 'true' when '%s' is also set to 'true'. "
+ "Transaction rollbacks for batch updates cannot be supported when auto commit is set to 'true'",
+ "Transaction rollbacks for batch updates cannot rollback all the flow file's statements together "
+ "when auto commit is set to 'true' because the database autocommits each batch separately.",
RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), AUTO_COMMIT.getDisplayName()))
.build());
}
if (autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) {
if (autoCommit != null && autoCommit && !isMaxBatchSizeHardcodedToZero(validationContext)) {
final String explanation = format("'%s' must be hard-coded to zero when '%s' is set to 'true'."
+ " Batch size equal to zero executes all statements in a single transaction"
+ " which allows automatic rollback to revert all statements if an error occurs",
+ " which allows rollback to revert all the flow file's statements together if an error occurs.",
MAX_BATCH_SIZE.getDisplayName(), AUTO_COMMIT.getDisplayName());
validationResults.add(new ValidationResult.Builder()
@ -535,11 +532,6 @@ public class PutDatabaseRecord extends AbstractProcessor {
dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue);
}
@OnUnscheduled
public final void onUnscheduled() {
supportsBatchUpdates = Optional.empty();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
@ -555,18 +547,18 @@ public class PutDatabaseRecord extends AbstractProcessor {
connection = dbcpService.getConnection(flowFile.getAttributes());
originalAutoCommit = connection.getAutoCommit();
final boolean autoCommit = context.getProperty(AUTO_COMMIT).asBoolean();
if (originalAutoCommit != autoCommit) {
final Boolean propertyAutoCommitValue = context.getProperty(AUTO_COMMIT).asBoolean();
if (propertyAutoCommitValue != null && originalAutoCommit != propertyAutoCommitValue) {
try {
connection.setAutoCommit(autoCommit);
} catch (SQLFeatureNotSupportedException sfnse) {
getLogger().debug(String.format("setAutoCommit(%s) not supported by this driver", autoCommit), sfnse);
connection.setAutoCommit(propertyAutoCommitValue);
} catch (Exception ex) {
getLogger().debug("Failed to setAutoCommit({}) due to {}", propertyAutoCommitValue, ex.getClass().getName(), ex);
}
}
putToDatabase(context, session, flowFile, connection);
// Only commit the connection if auto-commit is false
// If the connection's auto-commit setting is false, then manually commit the transaction
if (!connection.getAutoCommit()) {
connection.commit();
}
@ -593,12 +585,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
relationship = REL_FAILURE;
}
getLogger().error("Failed to put Records to database for {}. Routing to {}.", flowFile, relationship, e);
final boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
if (rollbackOnFailure) {
getLogger().error("Failed to put Records to database for {}. Rolling back NiFi session and returning the flow file to its incoming queue.", flowFile, e);
session.rollback();
context.yield();
} else {
getLogger().error("Failed to put Records to database for {}. Routing to {}.", flowFile, relationship, e);
flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, (e.getMessage() == null ? "Unknown" : e.getMessage()));
session.transfer(flowFile, relationship);
}
@ -611,6 +604,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
try {
if (!connection.getAutoCommit()) {
connection.rollback();
getLogger().debug("Manually rolled back JDBC transaction.");
}
} catch (final Exception rollbackException) {
getLogger().error("Failed to rollback JDBC transaction", rollbackException);
@ -668,9 +662,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
final ComponentLog log = getLogger();
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
// Do not use batch if set to batch size of 1 because that is similar to not using batching.
// Batch Size 0 means put all sql statements into one batch update no matter how many statements there are.
// Do not use batch statements if batch size is equal to 1 because that is the same as not using batching.
// Also do not use batches if the connection does not support batching.
boolean useBatch = maxBatchSize != 1 && isSupportBatchUpdates(connection);
boolean useBatch = maxBatchSize != 1 && isSupportsBatchUpdates(connection);
int currentBatchSize = 0;
int batchIndex = 0;
@ -990,13 +985,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
try (InputStream inputStream = new ByteArrayInputStream(byteArray)) {
ps.setBlob(index, inputStream);
} catch (SQLException e) {
throw new IOException("Unable to parse binary data " + value, e.getCause());
throw new IOException("Unable to parse binary data " + value, e);
}
} else {
try (InputStream inputStream = new ByteArrayInputStream(value.toString().getBytes(StandardCharsets.UTF_8))) {
ps.setBlob(index, inputStream);
} catch (IOException | SQLException e) {
throw new IOException("Unable to parse binary data " + value, e.getCause());
throw new IOException("Unable to parse binary data " + value, e);
}
}
} else if (sqlType == Types.CLOB) {
@ -1012,7 +1007,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
clob.setString(1, value.toString());
ps.setClob(index, clob);
} catch (SQLException e) {
throw new IOException("Unable to parse data as CLOB/String " + value, e.getCause());
throw new IOException("Unable to parse data as CLOB/String " + value, e);
}
}
} else if (sqlType == Types.VARBINARY || sqlType == Types.LONGVARBINARY) {
@ -1033,7 +1028,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
try {
ps.setBytes(index, byteArray);
} catch (SQLException e) {
throw new IOException("Unable to parse binary data with size" + byteArray.length, e.getCause());
throw new IOException("Unable to parse binary data with size" + byteArray.length, e);
}
} else {
byte[] byteArray = new byte[0];
@ -1041,7 +1036,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
byteArray = value.toString().getBytes(StandardCharsets.UTF_8);
ps.setBytes(index, byteArray);
} catch (SQLException e) {
throw new IOException("Unable to parse binary data with size" + byteArray.length, e.getCause());
throw new IOException("Unable to parse binary data with size" + byteArray.length, e);
}
}
} else {
@ -1600,28 +1595,16 @@ public class PutDatabaseRecord extends AbstractProcessor {
return normalizedKeyColumnNames;
}
private Optional<Boolean> supportsBatchUpdates = Optional.empty();
private void initializeSupportBatchUpdates(Connection connection) {
if (!supportsBatchUpdates.isPresent()) {
try {
final DatabaseMetaData dmd = connection.getMetaData();
supportsBatchUpdates = Optional.of(dmd.supportsBatchUpdates());
getLogger().debug(String.format("Connection supportsBatchUpdates is %s",
supportsBatchUpdates.orElse(Boolean.FALSE)));
} catch (Exception ex) {
supportsBatchUpdates = Optional.of(Boolean.FALSE);
getLogger().debug(String.format("Exception while testing if connection supportsBatchUpdates due to %s - %s",
ex.getClass().getName(), ex.getMessage()));
}
private boolean isSupportsBatchUpdates(Connection connection) {
try {
return connection.getMetaData().supportsBatchUpdates();
} catch (Exception ex) {
getLogger().debug(String.format("Exception while testing if connection supportsBatchUpdates due to %s - %s",
ex.getClass().getName(), ex.getMessage()));
return false;
}
}
private boolean isSupportBatchUpdates(Connection connection) {
initializeSupportBatchUpdates(connection);
return supportsBatchUpdates.orElse(Boolean.FALSE);
}
static class SchemaKey {
private final String catalog;
private final String schemaName;

View File

@ -81,6 +81,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
@ -95,11 +96,12 @@ public class PutDatabaseRecordTest {
// DISABLED test cases are used for single-run tests which are not parameterized
DEFAULT_0(ENABLED, new TestCase(false, false, 0)),
DEFAULT_1(DISABLED, new TestCase(false, false, 1)),
DEFAULT_2(DISABLED, new TestCase(false, false, 2)),
DEFAULT_2(DISABLED, new TestCase(null, false, 2)),
DEFAULT_5(DISABLED, new TestCase(null, false, 5)),
DEFAULT_1000(DISABLED, new TestCase(false, false, 1000)),
ROLLBACK_0(DISABLED, new TestCase(false, true, 0)),
ROLLBACK_1(DISABLED, new TestCase(false, true, 1)),
ROLLBACK_1(ENABLED, new TestCase(null, true, 1)),
ROLLBACK_2(DISABLED, new TestCase(false, true, 2)),
ROLLBACK_1000(ENABLED, new TestCase(false, true, 1000)),
@ -200,7 +202,11 @@ public class PutDatabaseRecordTest {
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);
runner.setProperty(PutDatabaseRecord.AUTO_COMMIT, testCase.getAutoCommitAsString());
if (testCase.getAutoCommitAsString() == null) {
runner.removeProperty(PutDatabaseRecord.AUTO_COMMIT);
} else {
runner.setProperty(PutDatabaseRecord.AUTO_COMMIT, testCase.getAutoCommitAsString());
}
runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, testCase.getRollbackOnFailureAsString());
runner.setProperty(PutDatabaseRecord.MAX_BATCH_SIZE, testCase.getBatchSizeAsString());
}
@ -316,7 +322,7 @@ public class PutDatabaseRecordTest {
}
public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException {
setRunner(TestCaseEnum.DEFAULT_2.getTestCase());
setRunner(TestCaseEnum.DEFAULT_5.getTestCase());
// Need to override the @Before method with a new processor that behaves badly
processor = new PutDatabaseRecordUnmatchedField();
@ -766,11 +772,29 @@ public class PutDatabaseRecordTest {
runner.setProperty(PutDatabaseRecord.FIELD_CONTAINING_SQL, "sql");
runner.setProperty(PutDatabaseRecord.ALLOW_MULTIPLE_STATEMENTS, String.valueOf(allowMultipleStatements));
Supplier<Statement> spyStmt = createStatementSpy();
final Map<String, String> attrs = new HashMap<>();
attrs.put(PutDatabaseRecord.STATEMENT_TYPE_ATTRIBUTE, "sql");
runner.enqueue(new byte[0], attrs);
runner.run();
final int maxBatchSize = runner.getProcessContext().getProperty(PutDatabaseRecord.MAX_BATCH_SIZE).asInteger();
assertNotNull(spyStmt.get());
if (sqlStatements.length <= 1) {
// When there is only 1 sql statement, then never use batching
verify(spyStmt.get(), times(0)).executeBatch();
} else if (maxBatchSize == 0) {
// When maxBatchSize is 0, verify that all statements were executed in a single executeBatch call
verify(spyStmt.get(), times(1)).executeBatch();
} else if (maxBatchSize == 1) {
// When maxBatchSize is 1, verify that executeBatch was never called
verify(spyStmt.get(), times(0)).executeBatch();
} else {
// When maxBatchSize > 1, verify that executeBatch was called at least once
verify(spyStmt.get(), atLeastOnce()).executeBatch();
}
runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
@ -2380,6 +2404,19 @@ public class PutDatabaseRecordTest {
return () -> spyStmt[0];
}
private Supplier<Statement> createStatementSpy() {
final Statement[] spyStmt = new Statement[1];
final Answer<DelegatingConnection> answer = (inv) -> new DelegatingConnection((Connection) inv.callRealMethod()) {
@Override
public Statement createStatement() throws SQLException {
spyStmt[0] = spy(getDelegate().createStatement());
return spyStmt[0];
}
};
doAnswer(answer).when(dbcp).getConnection();
return () -> spyStmt[0];
}
static class PutDatabaseRecordUnmatchedField extends PutDatabaseRecord {
@Override
SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName, TableSchema tableSchema, DMLSettings settings) throws IllegalArgumentException {