NIFI-3611: Added ability to set Transaction Isolation Level on Database connection for the QueryDatabaseTable processor

NIFI-3611: Make TRANS_ISOLATION_LEVEL property optional

This closes #3248.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
erichanson5 2019-01-07 15:53:58 -05:00 committed by Koji Kawamura
parent 05de73d6a0
commit 4ef2251d74
2 changed files with 38 additions and 1 deletions

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManager;
@ -65,6 +66,27 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
public static final String RESULT_TABLENAME = "tablename"; public static final String RESULT_TABLENAME = "tablename";
public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
private static AllowableValue TRANSACTION_READ_COMMITTED = new AllowableValue(
String.valueOf(Connection.TRANSACTION_READ_COMMITTED),
"TRANSACTION_READ_COMMITTED"
);
private static AllowableValue TRANSACTION_READ_UNCOMMITTED = new AllowableValue(
String.valueOf(Connection.TRANSACTION_READ_UNCOMMITTED),
"TRANSACTION_READ_UNCOMMITTED"
);
private static AllowableValue TRANSACTION_REPEATABLE_READ = new AllowableValue(
String.valueOf(Connection.TRANSACTION_REPEATABLE_READ),
"TRANSACTION_REPEATABLE_READ"
);
private static AllowableValue TRANSACTION_NONE = new AllowableValue(
String.valueOf(Connection.TRANSACTION_NONE),
"TRANSACTION_NONE"
);
private static AllowableValue TRANSACTION_SERIALIZABLE = new AllowableValue(
String.valueOf(Connection.TRANSACTION_SERIALIZABLE),
"TRANSACTION_SERIALIZABLE"
);
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Fetch Size") .name("Fetch Size")
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be " .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
@ -112,6 +134,14 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
public static final PropertyDescriptor TRANS_ISOLATION_LEVEL = new PropertyDescriptor.Builder()
.name("transaction-isolation-level")
.displayName("Transaction Isolation Level")
.description("This setting will set the transaction isolation level for the database connection for drivers that support this setting")
.required(false)
.allowableValues(TRANSACTION_NONE,TRANSACTION_READ_COMMITTED, TRANSACTION_READ_UNCOMMITTED, TRANSACTION_REPEATABLE_READ, TRANSACTION_SERIALIZABLE)
.build();
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
return relationships; return relationships;
@ -170,7 +200,9 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet() final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger() ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
: 0; : 0;
final Integer transIsolationLevel = context.getProperty(TRANS_ISOLATION_LEVEL).isSet()
? context.getProperty(TRANS_ISOLATION_LEVEL).asInteger()
: null;
SqlWriter sqlWriter = configureSqlWriter(session, context); SqlWriter sqlWriter = configureSqlWriter(session, context);
@ -227,6 +259,10 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr
} }
} }
if (transIsolationLevel != null) {
con.setTransactionIsolation(transIsolationLevel);
}
String jdbcURL = "DBCPService"; String jdbcURL = "DBCPService";
try { try {
DatabaseMetaData databaseMetaData = con.getMetaData(); DatabaseMetaData databaseMetaData = con.getMetaData();

View File

@ -109,6 +109,7 @@ public class QueryDatabaseTable extends AbstractQueryDatabaseTable {
pds.add(OUTPUT_BATCH_SIZE); pds.add(OUTPUT_BATCH_SIZE);
pds.add(MAX_FRAGMENTS); pds.add(MAX_FRAGMENTS);
pds.add(NORMALIZE_NAMES_FOR_AVRO); pds.add(NORMALIZE_NAMES_FOR_AVRO);
pds.add(TRANS_ISOLATION_LEVEL);
pds.add(USE_AVRO_LOGICAL_TYPES); pds.add(USE_AVRO_LOGICAL_TYPES);
pds.add(DEFAULT_PRECISION); pds.add(DEFAULT_PRECISION);
pds.add(DEFAULT_SCALE); pds.add(DEFAULT_SCALE);