diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java index 57933b33d1..6b166d949f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; @@ -65,6 +66,27 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr public static final String RESULT_TABLENAME = "tablename"; public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; + private static AllowableValue TRANSACTION_READ_COMMITTED = new AllowableValue( + String.valueOf(Connection.TRANSACTION_READ_COMMITTED), + "TRANSACTION_READ_COMMITTED" + ); + private static AllowableValue TRANSACTION_READ_UNCOMMITTED = new AllowableValue( + String.valueOf(Connection.TRANSACTION_READ_UNCOMMITTED), + "TRANSACTION_READ_UNCOMMITTED" + ); + private static AllowableValue TRANSACTION_REPEATABLE_READ = new AllowableValue( + String.valueOf(Connection.TRANSACTION_REPEATABLE_READ), + "TRANSACTION_REPEATABLE_READ" + ); + private static AllowableValue TRANSACTION_NONE = new AllowableValue( + String.valueOf(Connection.TRANSACTION_NONE), + "TRANSACTION_NONE" + ); + private static AllowableValue TRANSACTION_SERIALIZABLE = new AllowableValue( + String.valueOf(Connection.TRANSACTION_SERIALIZABLE), + "TRANSACTION_SERIALIZABLE" + ); + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() .name("Fetch Size") .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be " @@ -112,6 +134,14 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); + public static final PropertyDescriptor TRANS_ISOLATION_LEVEL = new PropertyDescriptor.Builder() + .name("transaction-isolation-level") + .displayName("Transaction Isolation Level") + .description("This setting will set the transaction isolation level for the database connection for drivers that support this setting") + .required(false) + .allowableValues(TRANSACTION_NONE,TRANSACTION_READ_COMMITTED, TRANSACTION_READ_UNCOMMITTED, TRANSACTION_REPEATABLE_READ, TRANSACTION_SERIALIZABLE) + .build(); + @Override public Set getRelationships() { return relationships; @@ -170,7 +200,9 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet() ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger() : 0; - + final Integer transIsolationLevel = context.getProperty(TRANS_ISOLATION_LEVEL).isSet() + ? context.getProperty(TRANS_ISOLATION_LEVEL).asInteger() + : null; SqlWriter sqlWriter = configureSqlWriter(session, context); @@ -227,6 +259,10 @@ public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchPr } } + if (transIsolationLevel != null) { + con.setTransactionIsolation(transIsolationLevel); + } + String jdbcURL = "DBCPService"; try { DatabaseMetaData databaseMetaData = con.getMetaData(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 0c7407fdc3..108937025e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -109,6 +109,7 @@ public class QueryDatabaseTable extends AbstractQueryDatabaseTable { pds.add(OUTPUT_BATCH_SIZE); pds.add(MAX_FRAGMENTS); pds.add(NORMALIZE_NAMES_FOR_AVRO); + pds.add(TRANS_ISOLATION_LEVEL); pds.add(USE_AVRO_LOGICAL_TYPES); pds.add(DEFAULT_PRECISION); pds.add(DEFAULT_SCALE);