diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 9957c2e0ae..38134c25e1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -28,6 +28,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -64,6 +66,7 @@ import java.sql.SQLNonTransientException; import java.sql.Statement; import java.util.ArrayList; import java.util.BitSet; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -74,6 +77,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; +import static java.lang.String.format; import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError; @SupportsBatching @@ -134,6 +138,14 @@ public class PutSQL extends AbstractSessionFactoryProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); + static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder() + .name("database-session-autocommit") + .displayName("Database Session AutoCommit") + .description("The autocommit mode to set on the database connection being used.") + .allowableValues("true", "false") + .defaultValue("false") + .build(); + static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder() .name("Support Fragmented Transactions") .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. " @@ -189,6 +201,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { properties.add(CONNECTION_POOL); properties.add(SQL_STATEMENT); properties.add(SUPPORT_TRANSACTIONS); + properties.add(AUTO_COMMIT); properties.add(TRANSACTION_TIMEOUT); properties.add(BATCH_SIZE); properties.add(OBTAIN_GENERATED_KEYS); @@ -196,6 +209,34 @@ public class PutSQL extends AbstractSessionFactoryProcessor { return properties; } + @Override + protected final Collection customValidate(ValidationContext context) { + final Collection results = new ArrayList<>(); + final String support_transactions = context.getProperty(SUPPORT_TRANSACTIONS).getValue(); + final String rollback_on_failure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).getValue(); + final String auto_commit = context.getProperty(AUTO_COMMIT).getValue(); + + if(auto_commit.equalsIgnoreCase("true")) { + if(support_transactions.equalsIgnoreCase("true")) { + results.add(new ValidationResult.Builder() + .subject(SUPPORT_TRANSACTIONS.getDisplayName()) + .explanation(format("'%s' cannot be set to 'true' when '%s' is also set to 'true'." + + "Transactions for batch updates cannot be supported when auto commit is set to 'true'", + SUPPORT_TRANSACTIONS.getDisplayName(), AUTO_COMMIT.getDisplayName())) + .build()); + } + if(rollback_on_failure.equalsIgnoreCase("true")) { + results.add(new ValidationResult.Builder() + .subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName()) + .explanation(format("'%s' cannot be set to 'true' when '%s' is also set to 'true'." + + "Transaction rollbacks for batch updates cannot be supported when auto commit is set to 'true'", + RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), AUTO_COMMIT.getDisplayName())) + .build()); + } + } + return results; + } + @Override public Set getRelationships() { final Set rels = new HashSet<>(); @@ -239,7 +280,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor { .getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes()); try { fc.originalAutoCommit = connection.getAutoCommit(); - connection.setAutoCommit(false); + final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean(); + if(fc.originalAutoCommit != autocommit) { + connection.setAutoCommit(autocommit); + } } catch (SQLException e) { throw new ProcessException("Failed to disable auto commit due to " + e, e); } @@ -521,9 +565,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor { process.cleanup((c, s, fc, conn) -> { // make sure that we try to set the auto commit back to whatever it was. - if (fc.originalAutoCommit) { + final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean(); + if (fc.originalAutoCommit != autocommit) { try { - conn.setAutoCommit(true); + conn.setAutoCommit(fc.originalAutoCommit); } catch (final SQLException se) { getLogger().warn("Failed to reset autocommit due to {}", new Object[]{se}); } @@ -670,7 +715,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor { int selectedNumFragments = 0; final BitSet bitSet = new BitSet(); - BiFunction illegal = (s, objects) -> new IllegalArgumentException(String.format(s, objects)); + BiFunction illegal = (s, objects) -> new IllegalArgumentException(format(s, objects)); for (final FlowFile flowFile : flowFiles) { final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);