NIFI-1575: Add QueryDatabaseTable processor

This commit is contained in:
Matt Burgess 2016-03-15 22:55:34 -04:00 committed by Mark Payne
parent 148b4497b4
commit 98395de74f
4 changed files with 998 additions and 5 deletions

View File

@ -0,0 +1,601 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.StopWatch;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static java.sql.Types.ARRAY;
import static java.sql.Types.BIGINT;
import static java.sql.Types.BINARY;
import static java.sql.Types.BIT;
import static java.sql.Types.BLOB;
import static java.sql.Types.BOOLEAN;
import static java.sql.Types.CHAR;
import static java.sql.Types.CLOB;
import static java.sql.Types.DATE;
import static java.sql.Types.DECIMAL;
import static java.sql.Types.DOUBLE;
import static java.sql.Types.FLOAT;
import static java.sql.Types.INTEGER;
import static java.sql.Types.LONGNVARCHAR;
import static java.sql.Types.LONGVARBINARY;
import static java.sql.Types.LONGVARCHAR;
import static java.sql.Types.NCHAR;
import static java.sql.Types.NUMERIC;
import static java.sql.Types.NVARCHAR;
import static java.sql.Types.REAL;
import static java.sql.Types.ROWID;
import static java.sql.Types.SMALLINT;
import static java.sql.Types.TIME;
import static java.sql.Types.TIMESTAMP;
import static java.sql.Types.TINYINT;
import static java.sql.Types.VARBINARY;
import static java.sql.Types.VARCHAR;
@Tags({"sql", "select", "jdbc", "query", "database"})
@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format."
+ " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+ "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. "
+ "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the "
+ "select query. FlowFile attribute 'querydbtable.row.count' indicates how many rows were selected.")
@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for "
+ "the specified column(s) will be retained for use in future executions of the query. This allows the Processor "
+ "to fetch only those records that have max values greater than the retained values. This can be used for "
+ "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+ "per the State Management documentation")
@WritesAttribute(attribute = "querydbtable.row.count")
public class QueryDatabaseTable extends AbstractSessionFactoryProcessor {
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
public static final String SQL_PREPROCESS_STRATEGY_NONE = "None";
public static final String SQL_PREPROCESS_STRATEGY_ORACLE = "Oracle";
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("Successfully created FlowFile from SQL query result set.")
private final Set<Relationship> relationships;
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain a connection to the database.")
public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the database table to be queried.")
public static final PropertyDescriptor COLUMN_NAMES = new PropertyDescriptor.Builder()
.name("Columns to Return")
.description("A comma-separated list of column names to be used in the query. If your database requires "
+ "special treatment of the names (quoting, e.g.), each name should include such treatment. If no "
+ "column names are supplied, all columns in the specified table will be returned.")
public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder()
.name("Maximum-value Columns")
.description("A comma-separated list of column names. The processor will keep track of the maximum value "
+ "for each column that has been returned since the processor started running. This can be used to "
+ "retrieve only those rows that have been added/updated since the last retrieval. Note that some "
+ "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these "
+ "types should not be listed in this property, and will result in error(s) during processing.")
public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time allowed for a running SQL select query "
+ ", zero means there is no limit. Max time less than 1 second will be equal to zero.")
.defaultValue("0 seconds")
public static final PropertyDescriptor SQL_PREPROCESS_STRATEGY = new PropertyDescriptor.Builder()
.name("SQL Pre-processing Strategy")
.description("The strategy to employ when generating the SQL for querying the table. A strategy may include "
+ "custom or database-specific code, such as the treatment of time/date formats.")
private final List<PropertyDescriptor> propDescriptors;
protected final Map<String, Integer> columnTypeMap = new HashMap<>();
public QueryDatabaseTable() {
final Set<Relationship> r = new HashSet<>();
relationships = Collections.unmodifiableSet(r);
final List<PropertyDescriptor> pds = new ArrayList<>();
propDescriptors = Collections.unmodifiableList(pds);
public Set<Relationship> getRelationships() {
return relationships;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
public void setup(final ProcessContext context) {
// Try to fill the columnTypeMap with the types of the desired max-value columns
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final String tableName = context.getProperty(TABLE_NAME).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
// Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible
// to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read
// approach as in Apache Drill
String query = getSelectFromClause(tableName, maxValueColumnNames).append(" WHERE 1 = 0").toString();
ResultSet resultSet = st.executeQuery(query);
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int numCols = resultSetMetaData.getColumnCount();
if (numCols > 0) {
for (int i = 1; i <= numCols; i++) {
String colName = resultSetMetaData.getColumnName(i).toLowerCase();
int colType = resultSetMetaData.getColumnType(i);
columnTypeMap.put(colName, colType);
} else {
throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames);
} catch (SQLException e) {
throw new ProcessException("Unable to communicate with database in order to determine column types", e);
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession();
FlowFile fileToProcess = null;
final ProcessorLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final String tableName = context.getProperty(TABLE_NAME).getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue();
final String preProcessStrategy = context.getProperty(SQL_PREPROCESS_STRATEGY).getValue();
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
try {
stateMap = stateManager.getState(Scope.CLUSTER);
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+ "query until this is accomplished.", ioe);
// Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
// set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
final String selectQuery = getQuery(tableName, columnNames, getColumns(maxValueColumnNames), stateMap, preProcessStrategy);
final StopWatch stopWatch = new StopWatch(true);
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
final LongHolder nrOfRows = new LongHolder(0L);
fileToProcess = session.create();
fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
public void process(final OutputStream out) throws IOException {
try {
logger.debug("Executing query {}", new Object[]{selectQuery});
final ResultSet resultSet = st.executeQuery(selectQuery);
// Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, preProcessStrategy);
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector));
} catch (final SQLException e) {
throw new ProcessException("Error during database query or conversion of records to Avro", e);
if (nrOfRows.get() > 0) {
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, nrOfRows.get().toString());"{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
String jdbcURL = "DBCPService";
try {
DatabaseMetaData databaseMetaData = con.getMetaData();
if (databaseMetaData != null) {
jdbcURL = databaseMetaData.getURL();
} catch (SQLException se) {
// Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(fileToProcess, REL_SUCCESS);
} else {
// If there were no rows returned, don't send the flowfile
} catch (final ProcessException | SQLException e) {
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
if (fileToProcess != null) {
} finally {
try {
// Update the state
stateManager.setState(statePropertyMap, Scope.CLUSTER);
} catch (IOException ioe) {
getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe});
protected List<String> getColumns(String commaSeparatedColumnList) {
if (StringUtils.isEmpty(commaSeparatedColumnList)) {
return Collections.emptyList();
final String[] columns = commaSeparatedColumnList.split(",");
final List<String> columnList = new ArrayList<>(columns.length);
for (String column : columns) {
if (column != null) {
String trimmedColumn = column.trim();
if (!StringUtils.isEmpty(trimmedColumn)) {
return columnList;
protected String getQuery(String tableName, String columnNames, List<String> maxValColumnNames,
StateMap stateMap, String preProcessStrategy) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name must be specified");
final StringBuilder query = new StringBuilder(getSelectFromClause(tableName, columnNames));
// Check state map for last max values
if (stateMap != null && stateMap.getVersion() != -1 && maxValColumnNames != null) {
Map<String, String> stateProperties = stateMap.toMap();
List<String> whereClauses = new ArrayList<>(maxValColumnNames.size());
for (String colName : maxValColumnNames) {
String maxValue = stateProperties.get(colName.toLowerCase());
if (!StringUtils.isEmpty(maxValue)) {
Integer type = columnTypeMap.get(colName.toLowerCase());
if (type == null) {
// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled.
throw new IllegalArgumentException("No column type found for: " + colName);
// Add a condition for the WHERE clause
whereClauses.add(colName + " > " + getLiteralByType(type, maxValue, preProcessStrategy));
if (!whereClauses.isEmpty()) {
query.append(" WHERE ");
query.append(StringUtils.join(whereClauses, " AND "));
return query.toString();
* Returns a basic SELECT ... FROM clause with the given column names and table name. If no column names are found,
* the wildcard (*) is used to select all columns.
* @param tableName The name of the table to select from
* @param columnNames A comma-separated list of column names to select from the table
* @return A SQL select statement representing a query of the given column names from the given table
protected StringBuilder getSelectFromClause(String tableName, String columnNames) {
final StringBuilder query = new StringBuilder("SELECT ");
if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) {
} else {
query.append(" FROM ");
return query;
* Returns a SQL literal for the given value based on its type. For example, values of character type need to be enclosed
* in single quotes, whereas values of numeric type should not be.
* @param type The JDBC type for the desired literal
* @param value The value to be converted to a SQL literal
* @return A String representing the given value as a literal of the given type
protected String getLiteralByType(int type, String value, String preProcessStrategy) {
// Format value based on column type. For example, strings and timestamps need to be quoted
switch (type) {
// For string-represented values, put in single quotes
case CHAR:
case NCHAR:
case ROWID:
case DATE:
case TIME:
return "'" + value + "'";
// Timestamp literals in Oracle need to be cast with TO_DATE
if (SQL_PREPROCESS_STRATEGY_ORACLE.equals(preProcessStrategy)) {
return "to_date('" + value + "', 'yyyy-mm-dd HH24:MI:SS')";
} else {
return "'" + value + "'";
// Else leave as is (numeric types, e.g.)
return value;
protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
String preProcessStrategy;
Map<String, String> newColMap;
public MaxValueResultSetRowCollector(Map<String, String> stateMap, String preProcessStrategy) {
this.preProcessStrategy = preProcessStrategy;
newColMap = stateMap;
public void processRow(ResultSet resultSet) throws IOException {
if (resultSet == null) {
try {
// Iterate over the row, check-and-set max values
final ResultSetMetaData meta = resultSet.getMetaData();
final int nrOfColumns = meta.getColumnCount();
if (nrOfColumns > 0) {
for (int i = 1; i <= nrOfColumns; i++) {
String colName = meta.getColumnName(i).toLowerCase();
Integer type = columnTypeMap.get(colName);
// Skip any columns we're not keeping track of or whose value is null
if (type == null || resultSet.getObject(i) == null) {
String maxValueString = newColMap.get(colName);
switch (type) {
case CHAR:
case NCHAR:
case ROWID:
String colStringValue = resultSet.getString(i);
if (maxValueString == null || colStringValue.compareTo(maxValueString) > 0) {
newColMap.put(colName, colStringValue);
Integer colIntValue = resultSet.getInt(i);
Integer maxIntValue = null;
if (maxValueString != null) {
maxIntValue = Integer.valueOf(maxValueString);
if (maxIntValue == null || colIntValue > maxIntValue) {
newColMap.put(colName, colIntValue.toString());
case BIGINT:
Long colLongValue = resultSet.getLong(i);
Long maxLongValue = null;
if (maxValueString != null) {
maxLongValue = Long.valueOf(maxValueString);
if (maxLongValue == null || colLongValue > maxLongValue) {
newColMap.put(colName, colLongValue.toString());
case FLOAT:
case REAL:
case DOUBLE:
Double colDoubleValue = resultSet.getDouble(i);
Double maxDoubleValue = null;
if (maxValueString != null) {
maxDoubleValue = Double.valueOf(maxValueString);
if (maxDoubleValue == null || colDoubleValue > maxDoubleValue) {
newColMap.put(colName, colDoubleValue.toString());
BigDecimal colBigDecimalValue = resultSet.getBigDecimal(i);
BigDecimal maxBigDecimalValue = null;
if (maxValueString != null) {
DecimalFormat df = new DecimalFormat();
maxBigDecimalValue = (BigDecimal) df.parse(maxValueString);
if (maxBigDecimalValue == null || colBigDecimalValue.compareTo(maxBigDecimalValue) > 0) {
newColMap.put(colName, colBigDecimalValue.toString());
case DATE:
Date rawColDateValue = resultSet.getDate(i);
java.sql.Date colDateValue = new java.sql.Date(rawColDateValue.getTime());
java.sql.Date maxDateValue = null;
if (maxValueString != null) {
maxDateValue = java.sql.Date.valueOf(maxValueString);
if (maxDateValue == null || colDateValue.after(maxDateValue)) {
newColMap.put(colName, colDateValue.toString());
case TIME:
Date rawColTimeValue = resultSet.getDate(i);
java.sql.Time colTimeValue = new java.sql.Time(rawColTimeValue.getTime());
java.sql.Time maxTimeValue = null;
if (maxValueString != null) {
maxTimeValue = java.sql.Time.valueOf(maxValueString);
if (maxTimeValue == null || colTimeValue.after(maxTimeValue)) {
newColMap.put(colName, colTimeValue.toString());
// Oracle timestamp queries must use literals in java.sql.Date format
if (SQL_PREPROCESS_STRATEGY_ORACLE.equals(preProcessStrategy)) {
Date rawColOracleTimestampValue = resultSet.getDate(i);
java.sql.Date oracleTimestampValue = new java.sql.Date(rawColOracleTimestampValue.getTime());
java.sql.Date maxOracleTimestampValue = null;
if (maxValueString != null) {
maxOracleTimestampValue = java.sql.Date.valueOf(maxValueString);
if (maxOracleTimestampValue == null || oracleTimestampValue.after(maxOracleTimestampValue)) {
newColMap.put(colName, oracleTimestampValue.toString());
} else {
Timestamp rawColTimestampValue = resultSet.getTimestamp(i);
java.sql.Timestamp colTimestampValue = new java.sql.Timestamp(rawColTimestampValue.getTime());
java.sql.Timestamp maxTimestampValue = null;
if (maxValueString != null) {
maxTimestampValue = java.sql.Timestamp.valueOf(maxValueString);
if (maxTimestampValue == null || colTimestampValue.after(maxTimestampValue)) {
newColMap.put(colName, colTimestampValue.toString());
case BIT:
case BINARY:
case ARRAY:
case BLOB:
case CLOB:
throw new IOException("Type " + meta.getColumnTypeName(i) + " is not valid for maintaining maximum value");
} catch (ParseException | SQLException e) {
throw new IOException(e);

View File

@ -69,17 +69,30 @@ import org.apache.commons.lang3.StringUtils;
public class JdbcCommon {
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException {
final Schema schema = createSchema(rs);
return convertToAvroStream(rs, outStream, null, null);
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName)
throws SQLException, IOException {
return convertToAvroStream(rs, outStream, recordName, null);
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback)
throws SQLException, IOException {
final Schema schema = createSchema(rs, recordName);
final GenericRecord rec = new GenericData.Record(schema);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter)) {
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(schema, outStream);
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
long nrOfRows = 0;
while ( {
if (callback != null) {
for (int i = 1; i <= nrOfColumns; i++) {
final int javaSqlType = meta.getColumnType(i);
final Object value = rs.getObject(i);
@ -125,10 +138,23 @@ public class JdbcCommon {
public static Schema createSchema(final ResultSet rs) throws SQLException {
return createSchema(rs, null);
* Creates an Avro schema from a result set. If the table/record name is known a priori and provided, use that as a
* fallback for the record name if it cannot be retrieved from the result set, and finally fall back to a default value.
* @param rs The result set to convert to Avro
* @param recordName The a priori record name to use if it cannot be determined from the result set.
* @return A Schema object representing the result set converted to an Avro record
* @throws SQLException if any error occurs during conversion
public static Schema createSchema(final ResultSet rs, String recordName) throws SQLException {
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
String tableName = "NiFi_ExecuteSQL_Record";
if(nrOfColumns > 0) {
String tableName = StringUtils.isEmpty(recordName) ? "NiFi_ExecuteSQL_Record" : recordName;
if (nrOfColumns > 0) {
String tableNameFromMeta = meta.getTableName(1);
if (!StringUtils.isBlank(tableNameFromMeta)) {
tableName = tableNameFromMeta;
@ -218,4 +244,13 @@ public class JdbcCommon {
return builder.endRecord();
* An interface for callback methods which allows processing of a row during the convertToAvroStream() processing.
* <b>IMPORTANT:</b> This method should only work on the row pointed at by the current ResultSet reference.
* Advancing the cursor (e.g.) can cause rows to be skipped during Avro transformation.
public interface ResultSetRowCallback {
void processRow(ResultSet resultSet) throws IOException;

View File

@ -65,6 +65,7 @@ org.apache.nifi.processors.standard.PutJMS

View File

@ -0,0 +1,356 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.fusesource.hawtbuf.ByteArrayInputStream;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
* Unit tests for the QueryDatabaseTable processor
public class QueryDatabaseTableTest {
MockQueryDatabaseTable processor;
private TestRunner runner;
final static String DB_LOCATION = "target/db";
public static void setupClass() {
System.setProperty("", "target/derby.log");
public void setup() throws InitializationException, IOException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
processor = new MockQueryDatabaseTable();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.setProperty(QueryDatabaseTable.DBCP_SERVICE, "dbcp");
public void teardown() {
runner = null;
public void testGetColumns() throws Exception {
assertEquals(2, processor.getColumns("col1,col2").size());
public void testGetQuery() throws Exception {
String query = processor.getQuery("myTable", null, null, null, "None");
assertEquals("SELECT * FROM myTable", query);
query = processor.getQuery("myTable", "col1,col2", null, null, "None");
assertEquals("SELECT col1,col2 FROM myTable", query);
query = processor.getQuery("myTable", null, Collections.singletonList("id"), null, "None");
assertEquals("SELECT * FROM myTable", query);
Map<String, String> maxValues = new HashMap<>();
maxValues.put("id", "509");
StateManager stateManager = runner.getStateManager();
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("id", Types.INTEGER);
query = processor.getQuery("myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER), "None");
assertEquals("SELECT * FROM myTable WHERE id > 509", query);
maxValues.put("date_created", "2016-03-07 12:34:56");
stateManager.setState(maxValues, Scope.CLUSTER);
processor.putColumnType("date_created", Types.TIMESTAMP);
query = processor.getQuery("myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER), "None");
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > '2016-03-07 12:34:56'", query);
// Test Oracle strategy
query = processor.getQuery("myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER), "Oracle");
assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query);
@Test(expected = IllegalArgumentException.class)
public void testGetQueryNoTable() throws Exception {
processor.getQuery(null, null, null, null, "None");
public void testAddedRows() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID");;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(3, getNumberOfRecordsFromStream(in));
// Run again, this time no flowfiles/rows should be transferred;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
// Add a new row with a higher ID and run, one flowfile with one new row should be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
// Sanity check - run again, this time no flowfiles/rows should be transferred
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
// Add timestamp as a max value column name
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "id, created_on");
// Add a new row with a higher ID and run, one flow file will be transferred because no max value for the timestamp has been stored
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
// Add a new row with a higher ID but lower timestamp and run, no flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (5, 'NO NAME', 15.0, '2001-01-01 03:23:34.234')");;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0);
// Add a new row with a higher ID and run, one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (6, 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')");;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
// Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "name");;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(7, getNumberOfRecordsFromStream(in));
// Add a new row with a "higher" name than the max but lower than "NULL" (to test that null values are skipped), one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (7, 'NULK', 1.0, '2012-01-01 03:23:34.234')");;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
// Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "scale");;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(8, getNumberOfRecordsFromStream(in));
// Add a new row with a higher value for scale than the max, one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (8, 'NULK', 100.0, '2012-01-01 03:23:34.234')");;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
// Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set
runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "bignum");;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
// Add a new row with a higher value for scale than the max, one flow file will be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on, bignum) VALUES (9, 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)");;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
public void testWithNullIntColumn() throws SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
// Ignore, usually due to Derby not having DROP TABLE IF EXISTS
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)");
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT");;
runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).assertAttributeEquals(QueryDatabaseTable.RESULT_ROW_COUNT, "2");
public void testWithSqlException() throws SQLException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NO_ROWS");
} catch (final SQLException sqle) {
// Ignore, usually due to Derby not having DROP TABLE IF EXISTS
stmt.execute("create table TEST_NO_ROWS (id integer)");
// Try a valid SQL statement that will generate an error (val1 does not exist, e.g.)
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NO_ROWS");
runner.setProperty(QueryDatabaseTable.COLUMN_NAMES, "val1");;
private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
GenericRecord record = null;
long recordsFromStream = 0;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
record =;
recordsFromStream += 1;
return recordsFromStream;
* Simple implementation only for QueryDatabaseTable processor testing.
class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
public String getIdentifier() {
return "dbcp";
public Connection getConnection() throws ProcessException {
try {
return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
@Stateful(scopes = Scope.CLUSTER, description = "Mock for QueryDatabaseTable processor")
private static class MockQueryDatabaseTable extends QueryDatabaseTable {
public void putColumnType(String colName, Integer colType) {
columnTypeMap.put(colName, colType);