diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index fa2a86e9d4..21459291e7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import static java.sql.Types.ARRAY; import static java.sql.Types.BIGINT; @@ -176,6 +177,10 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact // pre-fetched when the processor is scheduled, rather than having to populate them on-the-fly. protected volatile boolean isDynamicMaxValues = false; + // This value is cleared when the processor is scheduled, and set to true after setup() is called and completes successfully. This enables + // the setup logic to be performed in onTrigger() versus OnScheduled to avoid any issues with DB connection when first scheduled to run. + protected final AtomicBoolean setupComplete = new AtomicBoolean(false); + private static SimpleDateFormat TIME_TYPE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS"); // A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time @@ -222,49 +227,61 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact return super.customValidate(validationContext); } + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + // If the max-value columns have changed, we need to re-fetch the column info from the DB + if (MAX_VALUE_COLUMN_NAMES.equals(descriptor) && newValue != null && !newValue.equals(oldValue)) { + setupComplete.set(false); + } + } + public void setup(final ProcessContext context) { setup(context,true,null); } public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFile flowFile) { - final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue(); + synchronized (setupComplete) { + setupComplete.set(false); + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue(); - // If there are no max-value column names specified, we don't need to perform this processing - if (StringUtils.isEmpty(maxValueColumnNames)) { - return; - } - - // Try to fill the columnTypeMap with the types of the desired max-value columns - final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); - - final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); - try (final Connection con = dbcpService.getConnection(); - final Statement st = con.createStatement()) { - - // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible - // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read - // approach as in Apache Drill - String query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null); - ResultSet resultSet = st.executeQuery(query); - ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); - int numCols = resultSetMetaData.getColumnCount(); - if (numCols > 0) { - if (shouldCleanCache){ - columnTypeMap.clear(); - } - for (int i = 1; i <= numCols; i++) { - String colName = resultSetMetaData.getColumnName(i).toLowerCase(); - String colKey = getStateKey(tableName, colName); - int colType = resultSetMetaData.getColumnType(i); - columnTypeMap.putIfAbsent(colKey, colType); - } - } else { - throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames); + // If there are no max-value column names specified, we don't need to perform this processing + if (StringUtils.isEmpty(maxValueColumnNames)) { + return; } - } catch (SQLException e) { - throw new ProcessException("Unable to communicate with database in order to determine column types", e); + // Try to fill the columnTypeMap with the types of the desired max-value columns + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + + final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); + try (final Connection con = dbcpService.getConnection(); + final Statement st = con.createStatement()) { + + // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible + // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read + // approach as in Apache Drill + String query = dbAdapter.getSelectStatement(tableName, maxValueColumnNames, "1 = 0", null, null, null); + ResultSet resultSet = st.executeQuery(query); + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + int numCols = resultSetMetaData.getColumnCount(); + if (numCols > 0) { + if (shouldCleanCache) { + columnTypeMap.clear(); + } + for (int i = 1; i <= numCols; i++) { + String colName = resultSetMetaData.getColumnName(i).toLowerCase(); + String colKey = getStateKey(tableName, colName); + int colType = resultSetMetaData.getColumnType(i); + columnTypeMap.putIfAbsent(colKey, colType); + } + } else { + throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames); + } + + } catch (SQLException e) { + throw new ProcessException("Unable to communicate with database in order to determine column types", e); + } + setupComplete.set(true); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 1bae371515..8f535b3838 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -155,9 +155,6 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { @OnScheduled public void setup(final ProcessContext context) { maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); - if (!isDynamicTableName && !isDynamicMaxValues) { - super.setup(context); - } if (context.hasIncomingConnection() && !context.hasNonLoopConnection()) { getLogger().error("The failure relationship can be used only if there is another incoming connection to this processor."); } @@ -165,6 +162,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + // Fetch the column/table info once (if the table name and max value columns are not dynamic). Otherwise do the setup later + if (!isDynamicTableName && !isDynamicMaxValues && !setupComplete.get()) { + super.setup(context); + } ProcessSession session = sessionFactory.createSession(); FlowFile fileToProcess = null; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 0532b7990d..ffe58455f2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -18,10 +18,10 @@ package org.apache.nifi.processors.standard; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -73,7 +73,7 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAME import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES; -@EventDriven +@TriggerSerially @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"sql", "select", "jdbc", "query", "database"}) @SeeAlso({GenerateTableFetch.class, ExecuteSQL.class}) @@ -178,11 +178,14 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { @OnScheduled public void setup(final ProcessContext context) { maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); - super.setup(context); } @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + // Fetch the column/table info once + if (!setupComplete.get()) { + super.setup(context); + } ProcessSession session = sessionFactory.createSession(); final List resultSetFlowFiles = new ArrayList<>();