NIFI-5724 make database connection autocommit configurable

making the database session autocommit value a configurable property

adding custom validation to PutSQL processor so as to disallow 'supports transaction' and 'rollback on failure' to be true when the autocommit value has been set to true

fixing some style issues to conform to standards

This closes #3113.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Vish Uma 2018-10-26 15:32:46 -04:00 committed by Koji Kawamura
parent d5bce91970
commit 63f55d05b4
1 changed files with 49 additions and 4 deletions

View File

@ -28,6 +28,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@ -64,6 +66,7 @@ import java.sql.SQLNonTransientException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@ -74,6 +77,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import static java.lang.String.format;
import static org.apache.nifi.processor.util.pattern.ExceptionHandler.createOnError;
@SupportsBatching
@ -134,6 +138,14 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder()
.name("database-session-autocommit")
.displayName("Database Session AutoCommit")
.description("The autocommit mode to set on the database connection being used.")
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder()
.name("Support Fragmented Transactions")
.description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
@ -189,6 +201,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
properties.add(CONNECTION_POOL);
properties.add(SQL_STATEMENT);
properties.add(SUPPORT_TRANSACTIONS);
properties.add(AUTO_COMMIT);
properties.add(TRANSACTION_TIMEOUT);
properties.add(BATCH_SIZE);
properties.add(OBTAIN_GENERATED_KEYS);
@ -196,6 +209,34 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
return properties;
}
@Override
protected final Collection<ValidationResult> customValidate(ValidationContext context) {
final Collection<ValidationResult> results = new ArrayList<>();
final String support_transactions = context.getProperty(SUPPORT_TRANSACTIONS).getValue();
final String rollback_on_failure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).getValue();
final String auto_commit = context.getProperty(AUTO_COMMIT).getValue();
if(auto_commit.equalsIgnoreCase("true")) {
if(support_transactions.equalsIgnoreCase("true")) {
results.add(new ValidationResult.Builder()
.subject(SUPPORT_TRANSACTIONS.getDisplayName())
.explanation(format("'%s' cannot be set to 'true' when '%s' is also set to 'true'."
+ "Transactions for batch updates cannot be supported when auto commit is set to 'true'",
SUPPORT_TRANSACTIONS.getDisplayName(), AUTO_COMMIT.getDisplayName()))
.build());
}
if(rollback_on_failure.equalsIgnoreCase("true")) {
results.add(new ValidationResult.Builder()
.subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName())
.explanation(format("'%s' cannot be set to 'true' when '%s' is also set to 'true'."
+ "Transaction rollbacks for batch updates cannot be supported when auto commit is set to 'true'",
RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), AUTO_COMMIT.getDisplayName()))
.build());
}
}
return results;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
@ -239,7 +280,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
.getConnection(ff == null ? Collections.emptyMap() : ff.getAttributes());
try {
fc.originalAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
if(fc.originalAutoCommit != autocommit) {
connection.setAutoCommit(autocommit);
}
} catch (SQLException e) {
throw new ProcessException("Failed to disable auto commit due to " + e, e);
}
@ -521,9 +565,10 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
process.cleanup((c, s, fc, conn) -> {
// make sure that we try to set the auto commit back to whatever it was.
if (fc.originalAutoCommit) {
final boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
if (fc.originalAutoCommit != autocommit) {
try {
conn.setAutoCommit(true);
conn.setAutoCommit(fc.originalAutoCommit);
} catch (final SQLException se) {
getLogger().warn("Failed to reset autocommit due to {}", new Object[]{se});
}
@ -670,7 +715,7 @@ public class PutSQL extends AbstractSessionFactoryProcessor {
int selectedNumFragments = 0;
final BitSet bitSet = new BitSet();
BiFunction<String, Object[], IllegalArgumentException> illegal = (s, objects) -> new IllegalArgumentException(String.format(s, objects));
BiFunction<String, Object[], IllegalArgumentException> illegal = (s, objects) -> new IllegalArgumentException(format(s, objects));
for (final FlowFile flowFile : flowFiles) {
final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);