mirror of
synced 2025-02-16 15:06:00 +00:00
NIFI-2881: Added EL support to DB Fetch processors
- Allow incoming flowfiles to GenerateTableFetch - Incorporated review comments/discussions - Updated documentation, added error attribute to GenerateTableFetch - Corrected notes for column properties in fetch processors This closes #1407. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
@ -17,6 +17,8 @@
package org.apache.nifi.processors.standard;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -24,6 +26,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.math.BigDecimal;
@ -36,6 +39,7 @@ import java.sql.Timestamp;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -97,15 +101,18 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
.description("The name of the database table to be queried.")
public static final PropertyDescriptor COLUMN_NAMES = new PropertyDescriptor.Builder()
.name("Columns to Return")
.description("A comma-separated list of column names to be used in the query. If your database requires "
+ "special treatment of the names (quoting, e.g.), each name should include such treatment. If no "
+ "column names are supplied, all columns in the specified table will be returned.")
+ "column names are supplied, all columns in the specified table will be returned. NOTE: It is important "
+ "to use consistent column names for a given table for incremental fetch to work properly.")
public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder()
@ -117,9 +124,11 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
+ "can be used to retrieve only those rows that have been added/updated since the last retrieval. Note that some "
+ "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these "
+ "types should not be listed in this property, and will result in error(s) during processing. If no columns "
+ "are provided, all rows from the table will be considered, which could have a performance impact.")
+ "are provided, all rows from the table will be considered, which could have a performance impact. NOTE: It is important "
+ "to use consistent max-value column names for a given table for incremental fetch to work properly.")
public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
@ -129,6 +138,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
.defaultValue("0 seconds")
public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
@ -143,11 +153,24 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
protected List<PropertyDescriptor> propDescriptors;
// The delimiter to use when referencing qualified names (such as table@!@column in the state map)
protected static final String NAMESPACE_DELIMITER = "@!@";
public static final PropertyDescriptor DB_TYPE;
protected final static Map<String, DatabaseAdapter> dbAdapters = new HashMap<>();
protected final Map<String, Integer> columnTypeMap = new HashMap<>();
// This value is set when the processor is scheduled and indicates whether the Table Name property contains Expression Language.
// It is used for backwards-compatibility purposes; if the value is false and the fully-qualified state key (table + column) is not found,
// the processor will look for a state key with just the column name.
protected volatile boolean isDynamicTableName = false;
// This value is set when the processor is scheduled and indicates whether the Maximum Value Columns property contains Expression Language.
// It is used for backwards-compatibility purposes; if the table name and max-value columns are static, then the column types can be
// pre-fetched when the processor is scheduled, rather than having to populate them on-the-fly.
protected volatile boolean isDynamicMaxValues = false;
private static SimpleDateFormat TIME_TYPE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS");
static {
@ -166,13 +189,28 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
// A common validation procedure for DB fetch processors, it stores whether the Table Name and/or Max Value Column properties have expression language
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
// For backwards-compatibility, keep track of whether the table name and max-value column properties are dynamic (i.e. has expression language)
isDynamicTableName = validationContext.isExpressionLanguagePresent(validationContext.getProperty(TABLE_NAME).getValue());
isDynamicMaxValues = validationContext.isExpressionLanguagePresent(validationContext.getProperty(MAX_VALUE_COLUMN_NAMES).getValue());
return super.customValidate(validationContext);
public void setup(final ProcessContext context) {
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
// If there are no max-value column names specified, we don't need to perform this processing
if (StringUtils.isEmpty(maxValueColumnNames)) {
// Try to fill the columnTypeMap with the types of the desired max-value columns
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final String tableName = context.getProperty(TABLE_NAME).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
@ -187,10 +225,10 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
for (int i = 1; i <= numCols; i++) {
String colName = resultSetMetaData.getColumnName(i).toLowerCase();
String colKey = getStateKey(tableName, colName);
int colType = resultSetMetaData.getColumnType(i);
columnTypeMap.put(colName, colType);
columnTypeMap.putIfAbsent(colKey, colType);
} else {
throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames);
@ -378,4 +416,16 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
return value;
protected static String getStateKey(String prefix, String columnName) {
StringBuilder sb = new StringBuilder();
if (prefix != null) {
if (columnName != null) {
return sb.toString();
@ -21,11 +21,15 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
@ -49,6 +53,7 @@ import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -60,18 +65,28 @@ import java.util.stream.IntStream;
@Tags({"sql", "select", "jdbc", "query", "database", "fetch", "generate"})
@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class})
@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class, ListDatabaseTables.class})
@CapabilityDescription("Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, "
+ "determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, "
+ "which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This "
+ "processor is intended to be run on the Primary Node only.")
+ "processor is intended to be run on the Primary Node only.\n\n"
+ "This processor can accept incoming connections; the behavior of the processor is different whether incoming connections are provided:\n"
+ " - If no incoming connection(s) are specified, the processor will generate SQL queries on the specified processor schedule. Expression Language is supported for many "
+ "fields, but no flow file attributes are available. However the properties will be evaluated using the Variable Registry.\n"
+ " - If incoming connection(s) are specified and no flow file is available to a processor task, no work will be performed.\n"
+ " - If incoming connection(s) are specified and a flow file is available to a processor task, the flow file's attributes may be used in Expression Language for such fields "
+ "as Table Name and others. However, the Max-Value Columns and Columns to Return fields must be empty or refer to columns that are available in each specified table.")
@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for "
+ "the specified column(s) will be retained for use in future executions of the query. This allows the Processor "
+ "to fetch only those records that have max values greater than the retained values. This can be used for "
+ "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+ "per the State Management documentation")
@WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes "
+ "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message.")
public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
@ -83,13 +98,20 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
+ "in the table.")
public static final Relationship REL_FAILURE = new Relationship.Builder()
.description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. "
+ "If no incoming connection(s) are specified, this relationship is unused.")
public GenerateTableFetch() {
final Set<Relationship> r = new HashSet<>();
relationships = Collections.unmodifiableSet(r);
final List<PropertyDescriptor> pds = new ArrayList<>();
@ -113,22 +135,41 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
return propDescriptors;
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return super.customValidate(validationContext);
public void setup(final ProcessContext context) {
// Pre-fetch the column types if using a static table name and max-value columns
if (!isDynamicTableName && !isDynamicMaxValues) {
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession();
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
if (fileToProcess == null) {
// Incoming connection with no flow file available, do no work (see capability description)
final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
final String tableName = context.getProperty(TABLE_NAME).getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
final int partitionSize = context.getProperty(PARTITION_SIZE).asInteger();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
@ -164,11 +205,20 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = statePropertyMap.get(colName.toLowerCase());
final String fullyQualifiedStateKey = getStateKey(tableName, colName);
String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
// If the table name is static and the fully-qualified key was not found, try just the column name
maxValue = statePropertyMap.get(getStateKey(null, colName));
if (!StringUtils.isEmpty(maxValue)) {
Integer type = columnTypeMap.get(colName.toLowerCase());
Integer type = columnTypeMap.get(fullyQualifiedStateKey);
if (type == null && !isDynamicTableName) {
// If the table name is static and the fully-qualified key was not found, try just the column name
type = columnTypeMap.get(getStateKey(null, colName));
if (type == null) {
// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled.
// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
throw new IllegalArgumentException("No column type found for: " + colName);
// Add a condition for the WHERE clause
@ -186,7 +236,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
logger.debug("Executing {}", new Object[]{selectQuery});
@ -202,40 +252,61 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
ResultSetMetaData rsmd = resultSet.getMetaData();
for (int i = 2; i <= rsmd.getColumnCount(); i++) {
String resultColumnName = rsmd.getColumnName(i).toLowerCase();
String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName);
String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey);
if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) {
// If we can't find the value at the fully-qualified key name and the table name is static, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
// maximum value is observed, it will be stored under the fully-qualified key from then on.
resultColumnCurrentMax = statePropertyMap.get(resultColumnName);
int type = rsmd.getColumnType(i);
if (isDynamicTableName) {
// We haven't pre-populated the column type map if the table name is dynamic, so do it here
columnTypeMap.put(fullyQualifiedStateKey, type);
try {
String newMaxValue = getMaxValueFromRow(resultSet, i, type, statePropertyMap.get(resultColumnName.toLowerCase()), dbAdapter.getName());
String newMaxValue = getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax, dbAdapter.getName());
if (newMaxValue != null) {
statePropertyMap.put(resultColumnName, newMaxValue);
statePropertyMap.put(fullyQualifiedStateKey, newMaxValue);
} catch (ParseException | IOException pie) {
// Fail the whole thing here before we start creating flow files and such
throw new ProcessException(pie);
} else {
// Something is very wrong here, one row (even if count is zero) should be returned
throw new SQLException("No rows returned from metadata query: " + selectQuery);
final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
// Generate SQL statements to read "pages" of data
for (int i = 0; i < numberOfFetches; i++) {
Integer limit = partitionSize == 0 ? null : partitionSize;
Integer offset = partitionSize == 0 ? null : i * partitionSize;
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset);
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
session.transfer(sqlFlowFile, REL_SUCCESS);
if (fileToProcess != null) {
} catch (SQLException e) {
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
throw new ProcessException(e);
final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
if (fileToProcess != null) {
logger.error("Unable to execute SQL select query {} due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess});
fileToProcess = session.putAttribute(fileToProcess, "generatetablefetch.sql.error", e.getMessage());
session.transfer(fileToProcess, REL_FAILURE);
// Generate SQL statements to read "pages" of data
for (int i = 0; i < numberOfFetches; i++) {
FlowFile sqlFlowFile;
Integer limit = partitionSize == 0 ? null : partitionSize;
Integer offset = partitionSize == 0 ? null : i * partitionSize;
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset);
sqlFlowFile = session.create();
sqlFlowFile = session.write(sqlFlowFile, out -> {
session.transfer(sqlFlowFile, REL_SUCCESS);
} else {
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
throw new ProcessException(e);
@ -25,6 +25,7 @@ import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
@ -70,9 +71,13 @@ import java.util.stream.IntStream;
@Tags({"sql", "select", "jdbc", "query", "database"})
@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format."
+ " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+ "a timer or cron expression, using the standard scheduling methods. FlowFile attribute "
@SeeAlso({GenerateTableFetch.class, ExecuteSQL.class})
@CapabilityDescription("Generates and executes a SQL select query to fetch all rows whose values in the specified Maximum Value column(s) are larger than the "
+ "previously-seen maxima. Query result will be converted to Avro format. Expression Language is supported for several properties, but no incoming "
+ "connections are permitted. The Variable Registry may be used to provide values for any property containing Expression Language. If it is desired to "
+ "leverage flow file attributes to perform these queries, the GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. "
+ "Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+ "a timer or cron expression, using the standard scheduling methods. This processor is intended to be run on the Primary Node only. FlowFile attribute "
+ "'querydbtable.row.count' indicates how many rows were selected.")
@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for "
+ "the specified column(s) will be retained for use in future executions of the query. This allows the Processor "
@ -80,7 +85,7 @@ import java.util.stream.IntStream;
+ "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+ "per the State Management documentation")
@WritesAttribute(attribute = "querydbtable.row.count"),
@WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"),
@WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute="fragment.count", description="If 'Max Rows Per Flow File' is set then this is the total number of "
@ -107,6 +112,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
@ -117,6 +123,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder()
@ -127,6 +134,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public QueryDatabaseTable() {
@ -184,13 +192,13 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
final String tableName = context.getProperty(TABLE_NAME).getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
? context.getProperty(MAX_FRAGMENTS).asInteger()
? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
: 0;
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
@ -212,9 +220,21 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
//If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
for(final Map.Entry<String,String> maxProp : maxValueProperties.entrySet()){
if (!statePropertyMap.containsKey(maxProp.getKey().toLowerCase())) {
statePropertyMap.put(maxProp.getKey().toLowerCase(), maxProp.getValue());
for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
String maxPropKey = maxProp.getKey().toLowerCase();
String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey);
if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
String newMaxPropValue;
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name,
// but store the new initial max value under the fully-qualified key.
if (statePropertyMap.containsKey(maxPropKey)) {
newMaxPropValue = statePropertyMap.get(maxPropKey);
} else {
newMaxPropValue = maxProp.getValue();
statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
@ -247,7 +267,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
// Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
try {
logger.debug("Executing query {}", new Object[]{selectQuery});
@ -304,7 +324,10 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
// Add maximum values as attributes
for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + entry.getKey(), entry.getValue()));
// Get just the column name from the key
String key = entry.getKey();
String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
//set count on all FlowFiles
@ -349,9 +372,16 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
List<String> whereClauses = new ArrayList<>(maxValColumnNames.size());
IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
String colName = maxValColumnNames.get(index);
String maxValue = stateMap.get(colName.toLowerCase());
String maxValueKey = getStateKey(tableName, colName);
String maxValue = stateMap.get(maxValueKey);
if (StringUtils.isEmpty(maxValue)) {
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
// maximum value is observed, it will be stored under the fully-qualified key from then on.
maxValue = stateMap.get(colName.toLowerCase());
if (!StringUtils.isEmpty(maxValue)) {
Integer type = columnTypeMap.get(colName.toLowerCase());
Integer type = columnTypeMap.get(maxValueKey);
if (type == null) {
// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled.
throw new IllegalArgumentException("No column type found for: " + colName);
@ -371,7 +401,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
protected Map<String,String> getDefaultMaxValueProperties(final Map<PropertyDescriptor, String> properties){
final Map<String,String> defaultMaxValues = new HashMap<String, String>();
final Map<String,String> defaultMaxValues = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final String key = entry.getKey().getName();
@ -407,15 +437,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
if (nrOfColumns > 0) {
for (int i = 1; i <= nrOfColumns; i++) {
String colName = meta.getColumnName(i).toLowerCase();
Integer type = columnTypeMap.get(colName);
String fullyQualifiedMaxValueKey = getStateKey(meta.getTableName(i), colName);
Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey);
// Skip any columns we're not keeping track of or whose value is null
if (type == null || resultSet.getObject(i) == null) {
String maxValueString = newColMap.get(colName);
String maxValueString = newColMap.get(fullyQualifiedMaxValueKey);
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
// maximum value is observed, it will be stored under the fully-qualified key from then on.
if (StringUtils.isEmpty(maxValueString)) {
maxValueString = newColMap.get(colName);
String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
if (newMaxValueString != null) {
newColMap.put(colName, newMaxValueString);
newColMap.put(fullyQualifiedMaxValueKey, newMaxValueString);
@ -72,6 +72,8 @@ public class QueryDatabaseTableTest {
private final static String DB_LOCATION = "target/db_qdt";
private DatabaseAdapter dbAdapter;
private HashMap<String, DatabaseAdapter> origDbAdapters;
private final static String TABLE_NAME_KEY = "tableName";
private final static String MAX_ROWS_KEY = "maxRows";
@ -142,13 +144,13 @@ public class QueryDatabaseTableTest {
maxValues.put("id", "509");
StateManager stateManager = runner.getStateManager();
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("id", Types.INTEGER);
processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER);
query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509", query);
maxValues.put("date_created", "2016-03-07 12:34:56");
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("date_created", Types.TIMESTAMP);
processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP);
query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap());
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query);
@ -460,6 +462,7 @@ public class QueryDatabaseTableTest {
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT");
runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() {
@ -521,7 +524,8 @@ public class QueryDatabaseTableTest {
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "9");//Using a non-round number to make sure the last file is ragged
runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}");
runner.setVariable(MAX_ROWS_KEY, "9");
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 12);
@ -675,7 +679,8 @@ public class QueryDatabaseTableTest {
cal.add(Calendar.MINUTE, 1);
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" + TABLE_NAME_KEY + "}");
runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE");
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on");
@ -687,14 +692,14 @@ public class QueryDatabaseTableTest {
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(4, getNumberOfRecordsFromStream(in));
runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
// Run again, this time no flowfiles/rows should be transferred
// Validate Max Value doesn't change also
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER);
// Append a new row, expect 1 flowfile one row
@ -707,7 +712,7 @@ public class QueryDatabaseTableTest {
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:10:00.0", Scope.CLUSTER);
runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:10:00.0", Scope.CLUSTER);
@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
@ -95,7 +96,7 @@ public class TestGenerateTableFetch {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(GenerateTableFetch.class);
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.setProperty(GenerateTableFetch.DBCP_SERVICE, "dbcp");
@ -251,9 +252,271 @@ public class TestGenerateTableFetch {
public void testMultiplePartitionsIncomingFlowFiles() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE1");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
stmt.execute("create table TEST_QUERY_DB_TABLE1 (id integer not null, bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE1 (id, bucket) VALUES (0, 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE1 (id, bucket) VALUES (1, 0)");
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE2");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, bucket) VALUES (0, 0)");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}");
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "${partSize}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("tableName", "TEST_QUERY_DB_TABLE1");
put("partSize", "1");
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("tableName", "TEST_QUERY_DB_TABLE2");
put("partSize", "2");
// The table does not exist, expect the original flow file to be routed to failure
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("tableName", "TEST_QUERY_DB_TABLE3");
put("partSize", "1");
runner.assertTransferCount(AbstractDatabaseFetchProcessor.REL_SUCCESS, 3);
// Two records from table 1
(ff) -> "TEST_QUERY_DB_TABLE1".equals(ff.getAttribute("tableName"))).count(),
// One record from table 2
(ff) -> "TEST_QUERY_DB_TABLE2".equals(ff.getAttribute("tableName"))).count(),
// Table 3 doesn't exist, should be routed to failure
runner.assertTransferCount(GenerateTableFetch.REL_FAILURE, 1);
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE1");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE2");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
public void testBackwardsCompatibilityStateKeyStaticTableDynamicMaxValues() throws Exception {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("maxValueCol", "id");
// Pre-populate the state with a key for column name (not fully-qualified)
StateManager stateManager = runner.getStateManager();
stateManager.setState(new HashMap<String, String>() {{
put("id", "0");
}}, Scope.CLUSTER);
// Pre-populate the column type map with an entry for id (not fully-qualified)
processor.columnTypeMap.put("id", 4);
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 0 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
public void testBackwardsCompatibilityStateKeyDynamicTableDynamicMaxValues() throws Exception {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}");
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("tableName", "TEST_QUERY_DB_TABLE");
put("maxValueCol", "id");
// Pre-populate the state with a key for column name (not fully-qualified)
StateManager stateManager = runner.getStateManager();
stateManager.setState(new HashMap<String, String>() {{
put("id", "0");
}}, Scope.CLUSTER);
// Pre-populate the column type map with an entry for id (not fully-qualified)
processor.columnTypeMap.put("id", 4);
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
// Note there is no WHERE clause here. Because we are using dynamic tables, the old state key/value is not retrieved
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("tableName", "TEST_QUERY_DB_TABLE");
put("maxValueCol", "id");
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
public void testBackwardsCompatibilityStateKeyDynamicTableStaticMaxValues() throws Exception {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}");
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "id");
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("tableName", "TEST_QUERY_DB_TABLE");
// Pre-populate the state with a key for column name (not fully-qualified)
StateManager stateManager = runner.getStateManager();
stateManager.setState(new HashMap<String, String>() {{
put("id", "0");
}}, Scope.CLUSTER);
// Pre-populate the column type map with an entry for id (not fully-qualified)
processor.columnTypeMap.put("id", 4);
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
// Note there is no WHERE clause here. Because we are using dynamic tables, the old state key/value is not retrieved
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("tableName", "TEST_QUERY_DB_TABLE");
put("maxValueCol", "id");
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
public void testBackwardsCompatibilityStateKeyVariableRegistry() throws Exception {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}");
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}");
runner.setVariable("tableName", "TEST_QUERY_DB_TABLE");
runner.setVariable("maxValueCol", "id");
// Pre-populate the state with a key for column name (not fully-qualified)
StateManager stateManager = runner.getStateManager();
stateManager.setState(new HashMap<String, String>() {{
put("id", "0");
}}, Scope.CLUSTER);
// Pre-populate the column type map with an entry for id (not fully-qualified)
processor.columnTypeMap.put("id", 4);
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
// Note there is no WHERE clause here. Because we are using dynamic tables (i.e. Expression Language,
// even when not referring to flow file attributes), the old state key/value is not retrieved
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray()));
* Simple implementation only for ListDatabaseTables processor testing.
* Simple implementation only for GenerateTableFetch processor testing.
private class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
Reference in New Issue
Block a user