mirror of
synced 2025-02-11 12:35:20 +00:00
NIFI-4517: Added ExecuteSQLRecord and QueryDatabaseTableRecord processors
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #2945.
This commit is contained in:
@ -63,6 +63,19 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return schema;
// Protected methods for subclasses to access private member variables
protected ResultSet getResultSet() {
return rs;
protected boolean hasMoreRows() {
return moreRows;
protected void setMoreRows(boolean moreRows) {
this.moreRows = moreRows;
public Record next() throws IOException {
try {
@ -87,7 +100,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
private Record createRecord(final ResultSet rs) throws SQLException {
protected Record createRecord(final ResultSet rs) throws SQLException {
final Map<String, Object> values = new HashMap<>(schema.getFieldCount());
for (final RecordField field : schema.getFields()) {
@ -0,0 +1,369 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public abstract class AbstractExecuteSQL extends AbstractProcessor {
public static final String RESULT_ROW_COUNT = "executesql.row.count";
public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
public static final String RESULTSET_INDEX = "executesql.resultset.index";
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("Successfully created FlowFile from SQL query result set.")
public static final Relationship REL_FAILURE = new Relationship.Builder()
.description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
protected Set<Relationship> relationships;
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain connection to database")
public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
.name("SQL select query")
.description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes "
+ "using Expression Language. If this property is specified, it will be used regardless of the content of "
+ "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected "
+ "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression "
+ "Language is not evaluated for flow file contents.")
public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time allowed for a running SQL select query "
+ " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
.defaultValue("0 seconds")
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
.displayName("Max Rows Per Flow File")
.description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+ "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
.displayName("Output Batch Size")
.description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+ "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+ "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+ "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The fragment.count attribute will not be set on FlowFiles when this "
+ "property is set.")
protected List<PropertyDescriptor> propDescriptors;
protected DBCPService dbcpService;
public Set<Relationship> getRelationships() {
return relationships;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
public void setup(ProcessContext context) {
// If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization
if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
final String errorString = "Either the Select Query must be specified or there must be an incoming connection "
+ "providing flowfile(s) containing a SQL select query";
throw new ProcessException(errorString);
dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
// If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
// However, if we have no FlowFile and we have connections coming from other Processors, then
// we know that we should run only if we have a FlowFile.
if (fileToProcess == null && context.hasNonLoopConnection()) {
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
final ComponentLog logger = getLogger();
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
SqlWriter sqlWriter = configureSqlWriter(session, context, fileToProcess);
final String selectQuery;
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
} else {
// If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
// If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
final StringBuilder queryContents = new StringBuilder();
session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
selectQuery = queryContents.toString();
int resultCount = 0;
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
final PreparedStatement st = con.prepareStatement(selectQuery)) {
st.setQueryTimeout(queryTimeout); // timeout in seconds
if (fileToProcess != null) {
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
logger.debug("Executing query {}", new Object[]{selectQuery});
int fragmentIndex = 0;
final String fragmentId = UUID.randomUUID().toString();
final StopWatch executionTime = new StopWatch(true);
boolean hasResults = st.execute();
long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
boolean hasUpdateCount = st.getUpdateCount() != -1;
while (hasResults || hasUpdateCount) {
//getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
if (hasResults) {
final AtomicLong nrOfRows = new AtomicLong(0L);
try {
final ResultSet resultSet = st.getResultSet();
do {
final StopWatch fetchTime = new StopWatch(true);
FlowFile resultSetFF;
if (fileToProcess == null) {
resultSetFF = session.create();
} else {
resultSetFF = session.create(fileToProcess);
resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
try {
resultSetFF = session.write(resultSetFF, out -> {
try {
nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null));
} catch (Exception e) {
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
// set attributes
final Map<String, String> attributesToAdd = new HashMap<>();
attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount));
resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
// if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
if (maxRowsPerFlowFile > 0) {
// if row count is zero and this is not the first fragment, drop it instead of committing it.
if (nrOfRows.get() == 0 && fragmentIndex > 0) {
resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
logger.info("{} contains {} records; transferring to 'success'",
new Object[]{resultSetFF, nrOfRows.get()});
// Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise
if(context.hasIncomingConnection()) {
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
} else {
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
// If we've reached the batch size, send out the flow files
if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
session.transfer(resultSetFlowFiles, REL_SUCCESS);
} catch (Exception e) {
// Remove the result set flow file and propagate the exception
if (e instanceof ProcessException) {
throw (ProcessException) e;
} else {
throw new ProcessException(e);
} while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
// If we are splitting results but not outputting batches, set count on all FlowFiles
if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
} catch (final SQLException e) {
throw new ProcessException(e);
// are there anymore result sets?
try {
hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
hasUpdateCount = st.getUpdateCount() != -1;
} catch (SQLException ex) {
hasResults = false;
hasUpdateCount = false;
// Transfer any remaining files to SUCCESS
session.transfer(resultSetFlowFiles, REL_SUCCESS);
//If we had at least one result then it's OK to drop the original file, but if we had no results then
// pass the original flow file down the line to trigger downstream processors
if (fileToProcess != null) {
if (resultCount > 0) {
} else {
fileToProcess = session.write(fileToProcess, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
session.transfer(fileToProcess, REL_SUCCESS);
} else if (resultCount == 0) {
//If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
// Then generate an empty Output FlowFile
FlowFile resultSetFF = session.create();
resultSetFF = session.write(resultSetFF, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
session.transfer(resultSetFF, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
//If we had at least one result then it's OK to drop the original file, but if we had no results then
// pass the original flow file down the line to trigger downstream processors
if (fileToProcess == null) {
// This can happen if any exceptions occur while setting up the connection, statement, etc.
logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure",
new Object[]{selectQuery, e});
} else {
if (context.hasIncomingConnection()) {
logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure",
new Object[]{selectQuery, fileToProcess, e});
fileToProcess = session.penalize(fileToProcess);
} else {
logger.error("Unable to execute SQL select query {} due to {}; routing to failure",
new Object[]{selectQuery, e});
session.transfer(fileToProcess, REL_FAILURE);
protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess);
@ -0,0 +1,483 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public static final String RESULT_TABLENAME = "tablename";
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Fetch Size")
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
+ "honored and/or exact. If the value specified is zero, then the hint is ignored.")
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
.displayName("Max Rows Per Flow File")
.description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+ "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
.displayName("Output Batch Size")
.description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+ "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+ "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+ "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this "
+ "property is set.")
public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder()
.displayName("Maximum Number of Fragments")
.description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
"This prevents OutOfMemoryError when this processor ingests huge table. NOTE: Setting this property can result in data loss, as the incoming results are "
+ "not ordered, and fragments may end at arbitrary boundaries where rows are not included in the result set.")
public Set<Relationship> getRelationships() {
return relationships;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
public void setup(final ProcessContext context) {
maxValueProperties = getDefaultMaxValueProperties(context, null);
public void stop() {
// Reset the column type map in case properties change
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
// Fetch the column/table info once
if (!setupComplete.get()) {
ProcessSession session = sessionFactory.createSession();
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
: 0;
SqlWriter sqlWriter = configureSqlWriter(session, context);
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
try {
stateMap = stateManager.getState(Scope.CLUSTER);
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+ "query until this is accomplished.", ioe);
// Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
// set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
//If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
String maxPropKey = maxProp.getKey().toLowerCase();
String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey, dbAdapter);
if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
String newMaxPropValue;
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name,
// but store the new initial max value under the fully-qualified key.
if (statePropertyMap.containsKey(maxPropKey)) {
newMaxPropValue = statePropertyMap.get(maxPropKey);
} else {
newMaxPropValue = maxProp.getValue();
statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? null
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
final StopWatch stopWatch = new StopWatch(true);
final String fragmentIdentifier = UUID.randomUUID().toString();
try (final Connection con = dbcpService.getConnection(Collections.emptyMap());
final Statement st = con.createStatement()) {
if (fetchSize != null && fetchSize > 0) {
try {
} catch (SQLException se) {
// Not all drivers support this, just log the error (at debug level) and move on
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
String jdbcURL = "DBCPService";
try {
DatabaseMetaData databaseMetaData = con.getMetaData();
if (databaseMetaData != null) {
jdbcURL = databaseMetaData.getURL();
} catch (SQLException se) {
// Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
if (logger.isDebugEnabled()) {
logger.debug("Executing query {}", new Object[] { selectQuery });
try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
int fragmentIndex=0;
// Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
while(true) {
final AtomicLong nrOfRows = new AtomicLong(0L);
FlowFile fileToProcess = session.create();
try {
fileToProcess = session.write(fileToProcess, out -> {
try {
nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), maxValCollector));
} catch (Exception e) {
throw new ProcessException("Error during database query or conversion of records.", e);
} catch (ProcessException e) {
// Add flowfile to results before rethrowing so it will be removed from session in outer catch
throw e;
if (nrOfRows.get() > 0) {
// set attributes
final Map<String, String> attributesToAdd = new HashMap<>();
attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
attributesToAdd.put(RESULT_TABLENAME, tableName);
if(maxRowsPerFlowFile > 0) {
attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier);
attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex));
fileToProcess = session.putAllAttributes(fileToProcess, attributesToAdd);
logger.info("{} contains {} records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
// If we've reached the batch size, send out the flow files
if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
session.transfer(resultSetFlowFiles, REL_SUCCESS);
} else {
// If there were no rows returned, don't send the flowfile
// If no rows and this was first FlowFile, yield
if(fragmentIndex == 0){
if (maxFragments > 0 && fragmentIndex >= maxFragments) {
// If we aren't splitting up the data into flow files or fragments, then the result set has been entirely fetched so don't loop back around
if (maxFragments == 0 && maxRowsPerFlowFile == 0) {
// If we are splitting up the data into flow files, don't loop back around if we've gotten all results
if(maxRowsPerFlowFile > 0 && nrOfRows.get() < maxRowsPerFlowFile) {
// Apply state changes from the Max Value tracker
// Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
if (outputBatchSize == 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
// Add maximum values as attributes
for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
// Get just the column name from the key
String key = entry.getKey();
String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
//set count on all FlowFiles
if (maxRowsPerFlowFile > 0) {
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
} catch (final SQLException e) {
throw e;
session.transfer(resultSetFlowFiles, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
if (!resultSetFlowFiles.isEmpty()) {
} finally {
try {
// Update the state
stateManager.setState(statePropertyMap, Scope.CLUSTER);
} catch (IOException ioe) {
getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe});
protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
String customWhereClause, Map<String, String> stateMap) {
return getQuery(dbAdapter, tableName, null, columnNames, maxValColumnNames, customWhereClause, stateMap);
protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String sqlQuery, String columnNames, List<String> maxValColumnNames,
String customWhereClause, Map<String, String> stateMap) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name must be specified");
final StringBuilder query;
if (StringUtils.isEmpty(sqlQuery)) {
query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
} else {
query = getWrappedQuery(dbAdapter, sqlQuery, tableName);
List<String> whereClauses = new ArrayList<>();
// Check state map for last max values
if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
String colName = maxValColumnNames.get(index);
String maxValueKey = getStateKey(tableName, colName, dbAdapter);
String maxValue = stateMap.get(maxValueKey);
if (StringUtils.isEmpty(maxValue)) {
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
// maximum value is observed, it will be stored under the fully-qualified key from then on.
maxValue = stateMap.get(colName.toLowerCase());
if (!StringUtils.isEmpty(maxValue)) {
Integer type = columnTypeMap.get(maxValueKey);
if (type == null) {
// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled.
throw new IllegalArgumentException("No column type found for: " + colName);
// Add a condition for the WHERE clause
whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
if (customWhereClause != null) {
whereClauses.add("(" + customWhereClause + ")");
if (!whereClauses.isEmpty()) {
query.append(" WHERE ");
query.append(StringUtils.join(whereClauses, " AND "));
return query.toString();
public class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
DatabaseAdapter dbAdapter;
final Map<String, String> newColMap;
final Map<String, String> originalState;
String tableName;
public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
this.dbAdapter = dbAdapter;
this.originalState = stateMap;
this.newColMap = new HashMap<>();
this.tableName = tableName;
public void processRow(ResultSet resultSet) throws IOException {
if (resultSet == null) {
try {
// Iterate over the row, check-and-set max values
final ResultSetMetaData meta = resultSet.getMetaData();
final int nrOfColumns = meta.getColumnCount();
if (nrOfColumns > 0) {
for (int i = 1; i <= nrOfColumns; i++) {
String colName = meta.getColumnName(i).toLowerCase();
String fullyQualifiedMaxValueKey = getStateKey(tableName, colName, dbAdapter);
Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey);
// Skip any columns we're not keeping track of or whose value is null
if (type == null || resultSet.getObject(i) == null) {
String maxValueString = newColMap.get(fullyQualifiedMaxValueKey);
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
// maximum value is observed, it will be stored under the fully-qualified key from then on.
if (StringUtils.isEmpty(maxValueString)) {
maxValueString = newColMap.get(colName);
String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
if (newMaxValueString != null) {
newColMap.put(fullyQualifiedMaxValueKey, newMaxValueString);
} catch (ParseException | SQLException e) {
throw new IOException(e);
public void applyStateChanges() {
protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context);
@ -16,22 +16,12 @@
package org.apache.nifi.processors.standard;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -41,23 +31,16 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
@ -111,82 +94,7 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ "FlowFiles were produced")
public class ExecuteSQL extends AbstractProcessor {
public static final String RESULT_ROW_COUNT = "executesql.row.count";
public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
public static final String RESULTSET_INDEX = "executesql.resultset.index";
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("Successfully created FlowFile from SQL query result set.")
public static final Relationship REL_FAILURE = new Relationship.Builder()
.description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
private final Set<Relationship> relationships;
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain connection to database")
public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
.name("SQL select query")
.description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes "
+ "using Expression Language. If this property is specified, it will be used regardless of the content of "
+ "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected "
+ "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression "
+ "Language is not evaluated for flow file contents.")
public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time allowed for a running SQL select query "
+ " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
.defaultValue("0 seconds")
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
.displayName("Max Rows Per Flow File")
.description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+ "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
.displayName("Output Batch Size")
.description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+ "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+ "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+ "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The fragment.count attribute will not be set on FlowFiles when this "
+ "property is set.")
public class ExecuteSQL extends AbstractExecuteSQL {
public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
@ -198,8 +106,6 @@ public class ExecuteSQL extends AbstractProcessor {
private final List<PropertyDescriptor> propDescriptors;
public ExecuteSQL() {
final Set<Relationship> r = new HashSet<>();
@ -212,102 +118,23 @@ public class ExecuteSQL extends AbstractProcessor {
propDescriptors = Collections.unmodifiableList(pds);
public Set<Relationship> getRelationships() {
return relationships;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
public void setup(ProcessContext context) {
// If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization
if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
final String errorString = "Either the Select Query must be specified or there must be an incoming connection "
+ "providing flowfile(s) containing a SQL select query";
throw new ProcessException(errorString);
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
// If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
// However, if we have no FlowFile and we have connections coming from other Processors, then
// we know that we should run only if we have a FlowFile.
if (fileToProcess == null && context.hasNonLoopConnection()) {
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess) {
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
final String codec = context.getProperty(COMPRESSION_FORMAT).getValue();
final String selectQuery;
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
} else {
// If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
// If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
final StringBuilder queryContents = new StringBuilder();
session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
selectQuery = queryContents.toString();
int resultCount=0;
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
final PreparedStatement st = con.prepareStatement(selectQuery)) {
st.setQueryTimeout(queryTimeout); // timeout in seconds
if (fileToProcess != null) {
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
logger.debug("Executing query {}", new Object[]{selectQuery});
int fragmentIndex=0;
final String fragmentId = UUID.randomUUID().toString();
final StopWatch executionTime = new StopWatch(true);
boolean hasResults = st.execute();
long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
boolean hasUpdateCount = st.getUpdateCount() != -1;
while(hasResults || hasUpdateCount) {
//getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
if (hasResults) {
final AtomicLong nrOfRows = new AtomicLong(0L);
try {
final ResultSet resultSet = st.getResultSet();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
@ -316,144 +143,6 @@ public class ExecuteSQL extends AbstractProcessor {
do {
final StopWatch fetchTime = new StopWatch(true);
FlowFile resultSetFF;
if (fileToProcess == null) {
resultSetFF = session.create();
} else {
resultSetFF = session.create(fileToProcess);
resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
try {
resultSetFF = session.write(resultSetFF, out -> {
try {
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
} catch (SQLException e) {
throw new ProcessException(e);
long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
// set attribute how many rows were selected
resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
// if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
if (maxRowsPerFlowFile > 0) {
// if row count is zero and this is not the first fragment, drop it instead of committing it.
if (nrOfRows.get() == 0 && fragmentIndex > 0) {
resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{resultSetFF, nrOfRows.get()});
session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
// If we've reached the batch size, send out the flow files
if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
session.transfer(resultSetFlowFiles, REL_SUCCESS);
} catch (Exception e) {
// Remove the result set flow file and propagate the exception
if (e instanceof ProcessException) {
throw (ProcessException) e;
} else {
throw new ProcessException(e);
} while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
// If we are splitting results but not outputting batches, set count on all FlowFiles
if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
} catch (final SQLException e) {
throw new ProcessException(e);
// are there anymore result sets?
hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
hasUpdateCount = st.getUpdateCount() != -1;
} catch(SQLException ex){
hasResults = false;
hasUpdateCount = false;
// Transfer any remaining files to SUCCESS
session.transfer(resultSetFlowFiles, REL_SUCCESS);
//If we had at least one result then it's OK to drop the original file, but if we had no results then
// pass the original flow file down the line to trigger downstream processors
if(fileToProcess != null){
if(resultCount > 0){
} else {
fileToProcess = session.write(fileToProcess, JdbcCommon::createEmptyAvroStream);
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
session.transfer(fileToProcess, REL_SUCCESS);
} else if(resultCount == 0){
//If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
// Then generate an empty Output FlowFile
FlowFile resultSetFF = session.create();
resultSetFF = session.write(resultSetFF, out -> JdbcCommon.createEmptyAvroStream(out));
resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
session.transfer(resultSetFF, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
//If we had at least one result then it's OK to drop the original file, but if we had no results then
// pass the original flow file down the line to trigger downstream processors
if (fileToProcess == null) {
// This can happen if any exceptions occur while setting up the connection, statement, etc.
logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure",
new Object[]{selectQuery, e});
} else {
if (context.hasIncomingConnection()) {
logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure",
new Object[]{selectQuery, fileToProcess, e});
fileToProcess = session.penalize(fileToProcess);
} else {
logger.error("Unable to execute SQL select query {} due to {}; routing to failure",
new Object[]{selectQuery, e});
session.transfer(fileToProcess, REL_FAILURE);
return new DefaultAvroSqlWriter(options);
@ -0,0 +1,147 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
@Tags({"sql", "select", "jdbc", "query", "database", "record"})
@CapabilityDescription("Executes provided SQL select query. Query result will be converted to the format specified by a Record Writer. "
+ "Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+ "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. "
+ "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the "
+ "select query, and the query may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format. "
+ "FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
@ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer "
+ "that represents the JDBC Type of the parameter."),
@ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as "
+ "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."),
@ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. "
+ "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases "
+ "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - "
+ "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. "
+ "base64: the string is a Base64 encoded string that can be decoded to bytes. "
+ "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. "
+ "Dates/Times/Timestamps - "
+ "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+ "as specified according to java.time.format.DateTimeFormatter. "
+ "If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in "
+ "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), "
+ "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
@WritesAttribute(attribute = "executesql.row.count", description = "Contains the number of rows returned in the select query"),
@WritesAttribute(attribute = "executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds"),
@WritesAttribute(attribute = "executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"),
@WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"),
@WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, "
+ "the zero based index of this result set."),
@WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
+ "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+ "attribute will not be populated."),
@WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ "FlowFiles were produced"),
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."),
@WritesAttribute(attribute = "record.count", description = "The number of records output by the Record Writer.")
public class ExecuteSQLRecord extends AbstractExecuteSQL {
public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. "
+ "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.")
public static final PropertyDescriptor NORMALIZE_NAMES = new PropertyDescriptor.Builder()
.displayName("Normalize Table/Column Names")
.description("Whether to change characters in column names. For example, colons and periods will be changed to underscores.")
.allowableValues("true", "false")
public ExecuteSQLRecord() {
final Set<Relationship> r = new HashSet<>();
relationships = Collections.unmodifiableSet(r);
final List<PropertyDescriptor> pds = new ArrayList<>();
propDescriptors = Collections.unmodifiableList(pds);
protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess) {
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES).asBoolean();
final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
return new RecordSqlWriter(recordSetWriterFactory, options, maxRowsPerFlowFile, fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
@ -16,7 +16,6 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -27,49 +26,21 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
@ -112,60 +83,7 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should "
+ "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).")
public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public static final String RESULT_TABLENAME = "tablename";
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Fetch Size")
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
+ "honored and/or exact. If the value specified is zero, then the hint is ignored.")
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
.displayName("Max Rows Per Flow File")
.description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+ "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
.displayName("Output Batch Size")
.description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+ "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+ "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+ "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this "
+ "property is set.")
public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder()
.displayName("Maximum Number of Fragments")
.description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
"This prevents OutOfMemoryError when this processor ingests huge table. NOTE: Setting this property can result in data loss, as the incoming results are "
+ "not ordered, and fragments may end at arbitrary boundaries where rows are not included in the result set.")
public class QueryDatabaseTable extends AbstractQueryDatabaseTable {
public QueryDatabaseTable() {
final Set<Relationship> r = new HashSet<>();
@ -197,365 +115,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public Set<Relationship> getRelationships() {
return relationships;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
public void setup(final ProcessContext context) {
maxValueProperties = getDefaultMaxValueProperties(context, null);
public void stop() {
// Reset the column type map in case properties change
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
// Fetch the column/table info once
if (!setupComplete.get()) {
ProcessSession session = sessionFactory.createSession();
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context) {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
: 0;
final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
try {
stateMap = stateManager.getState(Scope.CLUSTER);
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+ "query until this is accomplished.", ioe);
// Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
// set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
//If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
String maxPropKey = maxProp.getKey().toLowerCase();
String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey, dbAdapter);
if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
String newMaxPropValue;
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name,
// but store the new initial max value under the fully-qualified key.
if (statePropertyMap.containsKey(maxPropKey)) {
newMaxPropValue = statePropertyMap.get(maxPropKey);
} else {
newMaxPropValue = maxProp.getValue();
statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? null
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
final StopWatch stopWatch = new StopWatch(true);
final String fragmentIdentifier = UUID.randomUUID().toString();
try (final Connection con = dbcpService.getConnection(Collections.emptyMap());
final Statement st = con.createStatement()) {
if (fetchSize != null && fetchSize > 0) {
try {
} catch (SQLException se) {
// Not all drivers support this, just log the error (at debug level) and move on
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
String jdbcURL = "DBCPService";
try {
DatabaseMetaData databaseMetaData = con.getMetaData();
if (databaseMetaData != null) {
jdbcURL = databaseMetaData.getURL();
} catch (SQLException se) {
// Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
if (logger.isDebugEnabled()) {
logger.debug("Executing query {}", new Object[] { selectQuery });
try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
int fragmentIndex=0;
// Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
while(true) {
final AtomicLong nrOfRows = new AtomicLong(0L);
FlowFile fileToProcess = session.create();
try {
fileToProcess = session.write(fileToProcess, out -> {
try {
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, maxValCollector));
} catch (SQLException | RuntimeException e) {
throw new ProcessException("Error during database query or conversion of records to Avro.", e);
} catch (ProcessException e) {
// Add flowfile to results before rethrowing so it will be removed from session in outer catch
throw e;
if (nrOfRows.get() > 0) {
// set attribute how many rows were selected
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
fileToProcess = session.putAttribute(fileToProcess, RESULT_TABLENAME, tableName);
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
if(maxRowsPerFlowFile > 0) {
fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_ID, fragmentIdentifier);
fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
// If we've reached the batch size, send out the flow files
if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
session.transfer(resultSetFlowFiles, REL_SUCCESS);
} else {
// If there were no rows returned, don't send the flowfile
// If no rows and this was first FlowFile, yield
if(fragmentIndex == 0){
if (maxFragments > 0 && fragmentIndex >= maxFragments) {
// If we aren't splitting up the data into flow files or fragments, then the result set has been entirely fetched so don't loop back around
if (maxFragments == 0 && maxRowsPerFlowFile == 0) {
// If we are splitting up the data into flow files, don't loop back around if we've gotten all results
if(maxRowsPerFlowFile > 0 && nrOfRows.get() < maxRowsPerFlowFile) {
// Apply state changes from the Max Value tracker
// Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
if (outputBatchSize == 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
// Add maximum values as attributes
for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
// Get just the column name from the key
String key = entry.getKey();
String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
//set count on all FlowFiles
if (maxRowsPerFlowFile > 0) {
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
} catch (final SQLException e) {
throw e;
session.transfer(resultSetFlowFiles, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
if (!resultSetFlowFiles.isEmpty()) {
} finally {
try {
// Update the state
stateManager.setState(statePropertyMap, Scope.CLUSTER);
} catch (IOException ioe) {
getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe});
protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
String customWhereClause, Map<String, String> stateMap) {
return getQuery(dbAdapter, tableName, null, columnNames, maxValColumnNames, customWhereClause, stateMap);
protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String sqlQuery, String columnNames, List<String> maxValColumnNames,
String customWhereClause, Map<String, String> stateMap) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name must be specified");
final StringBuilder query;
if (StringUtils.isEmpty(sqlQuery)) {
query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
} else {
query = getWrappedQuery(dbAdapter, sqlQuery, tableName);
List<String> whereClauses = new ArrayList<>();
// Check state map for last max values
if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
String colName = maxValColumnNames.get(index);
String maxValueKey = getStateKey(tableName, colName, dbAdapter);
String maxValue = stateMap.get(maxValueKey);
if (StringUtils.isEmpty(maxValue)) {
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
// maximum value is observed, it will be stored under the fully-qualified key from then on.
maxValue = stateMap.get(colName.toLowerCase());
if (!StringUtils.isEmpty(maxValue)) {
Integer type = columnTypeMap.get(maxValueKey);
if (type == null) {
// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled.
throw new IllegalArgumentException("No column type found for: " + colName);
// Add a condition for the WHERE clause
whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
if (customWhereClause != null) {
whereClauses.add("(" + customWhereClause + ")");
if (!whereClauses.isEmpty()) {
query.append(" WHERE ");
query.append(StringUtils.join(whereClauses, " AND "));
return query.toString();
protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
DatabaseAdapter dbAdapter;
final Map<String, String> newColMap;
final Map<String, String> originalState;
String tableName;
public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
this.dbAdapter = dbAdapter;
this.originalState = stateMap;
this.newColMap = new HashMap<>();
this.tableName = tableName;
public void processRow(ResultSet resultSet) throws IOException {
if (resultSet == null) {
try {
// Iterate over the row, check-and-set max values
final ResultSetMetaData meta = resultSet.getMetaData();
final int nrOfColumns = meta.getColumnCount();
if (nrOfColumns > 0) {
for (int i = 1; i <= nrOfColumns; i++) {
String colName = meta.getColumnName(i).toLowerCase();
String fullyQualifiedMaxValueKey = getStateKey(tableName, colName, dbAdapter);
Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey);
// Skip any columns we're not keeping track of or whose value is null
if (type == null || resultSet.getObject(i) == null) {
String maxValueString = newColMap.get(fullyQualifiedMaxValueKey);
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
// maximum value is observed, it will be stored under the fully-qualified key from then on.
if (StringUtils.isEmpty(maxValueString)) {
maxValueString = newColMap.get(colName);
String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
if (newMaxValueString != null) {
newColMap.put(fullyQualifiedMaxValueKey, newMaxValueString);
} catch (ParseException | SQLException e) {
throw new IOException(e);
public void applyStateChanges() {
return new DefaultAvroSqlWriter(options);
@ -0,0 +1,148 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
@Tags({"sql", "select", "jdbc", "query", "database", "record"})
@SeeAlso({GenerateTableFetch.class, ExecuteSQL.class})
@CapabilityDescription("Generates a SQL select query, or uses a provided statement, and executes it to fetch all rows whose values in the specified "
+ "Maximum Value column(s) are larger than the "
+ "previously-seen maxima. Query result will be converted to the format specified by the record writer. Expression Language is supported for several properties, but no incoming "
+ "connections are permitted. The Variable Registry may be used to provide values for any property containing Expression Language. If it is desired to "
+ "leverage flow file attributes to perform these queries, the GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. "
+ "Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+ "a timer or cron expression, using the standard scheduling methods. This processor is intended to be run on the Primary Node only. FlowFile attribute "
+ "'querydbtable.row.count' indicates how many rows were selected.")
@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for "
+ "the specified column(s) will be retained for use in future executions of the query. This allows the Processor "
+ "to fetch only those records that have max values greater than the retained values. This can be used for "
+ "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+ "per the State Management documentation")
@WritesAttribute(attribute = "tablename", description="Name of the table being queried"),
@WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"),
@WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
+ "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+ "attribute will not be populated."),
@WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ "FlowFiles were produced"),
@WritesAttribute(attribute = "maxvalue.*", description = "Each attribute contains the observed maximum value of a specified 'Maximum-value Column'. The "
+ "suffix of the attribute is the name of the column. If Output Batch Size is set, then this attribute will not be populated."),
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."),
@WritesAttribute(attribute = "record.count", description = "The number of records output by the Record Writer.")
@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = "Initial maximum value for the specified column",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Specifies an initial max value for max value column(s). Properties should "
+ "be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).")
public class QueryDatabaseTableRecord extends AbstractQueryDatabaseTable {
public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. "
+ "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.")
public static final PropertyDescriptor NORMALIZE_NAMES = new PropertyDescriptor.Builder()
.displayName("Normalize Table/Column Names")
.description("Whether to change characters in column names when creating the output schema. For example, colons and periods will be changed to underscores.")
.allowableValues("true", "false")
public QueryDatabaseTableRecord() {
final Set<Relationship> r = new HashSet<>();
relationships = Collections.unmodifiableSet(r);
final List<PropertyDescriptor> pds = new ArrayList<>();
pds.add(new PropertyDescriptor.Builder()
.description("The name of the database table to be queried. When a custom query is used, this property is used to alias the query and appears as an attribute on the FlowFile.")
propDescriptors = Collections.unmodifiableList(pds);
protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context) {
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES).asBoolean();
final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
return new RecordSqlWriter(recordSetWriterFactory, options, maxRowsPerFlowFile, Collections.emptyMap());
@ -0,0 +1,67 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard.sql;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
public class DefaultAvroSqlWriter implements SqlWriter {
private final JdbcCommon.AvroConversionOptions options;
private final Map<String,String> attributesToAdd = new HashMap<String,String>() {{
put(CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
public DefaultAvroSqlWriter(JdbcCommon.AvroConversionOptions options) {
this.options = options;
public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception {
try {
return JdbcCommon.convertToAvroStream(resultSet, outputStream, options, callback);
} catch (SQLException e) {
throw new ProcessException(e);
public Map<String, String> getAttributesToAdd() {
return attributesToAdd;
public void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException {
public String getMimeType() {
return JdbcCommon.MIME_TYPE_AVRO_BINARY;
@ -0,0 +1,158 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard.sql;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
public class RecordSqlWriter implements SqlWriter {
private final RecordSetWriterFactory recordSetWriterFactory;
private final AtomicReference<WriteResult> writeResultRef;
private final JdbcCommon.AvroConversionOptions options;
private final int maxRowsPerFlowFile;
private final Map<String, String> originalAttributes;
private ResultSetRecordSet fullRecordSet;
private RecordSchema writeSchema;
private String mimeType;
public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, JdbcCommon.AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, String> originalAttributes) {
this.recordSetWriterFactory = recordSetWriterFactory;
this.writeResultRef = new AtomicReference<>();
this.maxRowsPerFlowFile = maxRowsPerFlowFile;
this.options = options;
this.originalAttributes = originalAttributes;
public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception {
final RecordSet recordSet;
try {
if (fullRecordSet == null) {
final Schema avroSchema = JdbcCommon.createSchema(resultSet, options);
final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema);
fullRecordSet = new ResultSetRecordSetWithCallback(resultSet, recordAvroSchema, callback);
writeSchema = recordSetWriterFactory.getSchema(originalAttributes, fullRecordSet.getSchema());
recordSet = (maxRowsPerFlowFile > 0) ? fullRecordSet.limit(maxRowsPerFlowFile) : fullRecordSet;
} catch (final SQLException | SchemaNotFoundException | IOException e) {
throw new ProcessException(e);
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream)) {
if (mimeType == null) {
mimeType = resultSetWriter.getMimeType();
return writeResultRef.get().getRecordCount();
} catch (final Exception e) {
throw new IOException(e);
public Map<String, String> getAttributesToAdd() {
Map<String, String> attributesToAdd = new HashMap<>();
attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeType);
// Add any attributes from the record writer (if present)
final WriteResult result = writeResultRef.get();
if (result != null) {
if (result.getAttributes() != null) {
attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
return attributesToAdd;
public void updateCounters(ProcessSession session) {
final WriteResult result = writeResultRef.get();
if (result != null) {
session.adjustCounter("Records Written", result.getRecordCount(), false);
public void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException {
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream)) {
mimeType = resultSetWriter.getMimeType();
} catch (final Exception e) {
throw new IOException(e);
public String getMimeType() {
return mimeType;
private static class ResultSetRecordSetWithCallback extends ResultSetRecordSet {
private final AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback;
ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema readerSchema, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws SQLException {
super(rs, readerSchema);
this.callback = callback;
public Record next() throws IOException {
try {
if (hasMoreRows()) {
ResultSet rs = getResultSet();
final Record record = createRecord(rs);
if (callback != null) {
return record;
} else {
return null;
} catch (final SQLException e) {
throw new IOException("Could not obtain next record from ResultSet", e);
@ -0,0 +1,77 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard.sql;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.Map;
* The SqlWriter interface provides a standard way for processors such as ExecuteSQL, ExecuteSQLRecord, QueryDatabaseTable, and QueryDatabaseTableRecord
* to write SQL result sets out to a flow file in whichever manner is appropriate. For example, ExecuteSQL writes the result set as Avro but ExecuteSQLRecord
* uses the Record API to write the result set out as prescribed by the selected RecordSetWriter.
public interface SqlWriter {
* Writes the given result set out to the given output stream, possibly applying a callback as each row is processed.
* @param resultSet the ResultSet to be written
* @param outputStream the OutputStream to write the result set to
* @param logger a common logger that can be used to log messages during write
* @param callback a MaxValueResultSetRowCollector that may be called as each row in the ResultSet is processed
* @return the number of rows written to the output stream
* @throws Exception if any errors occur during the writing of the result set to the output stream
long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception;
* Returns a map of attribute key/value pairs to be added to any outgoing flow file(s). The default implementation is to return an empty map.
* @return a map of attribute key/value pairs
default Map<String, String> getAttributesToAdd() {
return Collections.emptyMap();
* Updates any session counters as a result of processing result sets. The default implementation is empty, no counters will be updated.
* @param session the session upon which to update counters
default void updateCounters(ProcessSession session) {
* Writes an empty result set to the output stream. In some cases a ResultSet might not have any viable rows, but will throw an error or
* behave unexpectedly if rows are attempted to be retrieved. This method indicates the implementation should write whatever output is
* appropriate for a result set with no rows.
* @param outputStream the OutputStream to write the empty result set to
* @param logger a common logger that can be used to log messages during write
* @throws IOException if any errors occur during the writing of an empty result set to the output stream
void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) throws IOException;
* Returns the MIME type of the output format. This can be used in FlowFile attributes or to perform format-specific processing as necessary.
* @return the MIME type string of the output format.
String getMimeType();
@ -35,6 +35,7 @@ org.apache.nifi.processors.standard.EvaluateXPath
@ -96,6 +97,7 @@ org.apache.nifi.processors.standard.PutSyslog
File diff suppressed because it is too large
Load Diff
@ -38,13 +38,13 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.fusesource.hawtbuf.ByteArrayInputStream;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@ -22,6 +22,7 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@ -51,7 +52,6 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.fusesource.hawtbuf.ByteArrayInputStream;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -351,6 +351,48 @@ public class TestExecuteSQL {
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
public void testNoRowsStatementCreatesEmptyFlowFile() throws Exception {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
final InputStream in = new ByteArrayInputStream(firstFlowFile.toByteArray());
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
GenericRecord record = null;
long recordsFromStream = 0;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
record = dataFileReader.next(record);
recordsFromStream += 1;
assertEquals(0, recordsFromStream);
public void testWithDuplicateColumns() throws SQLException {
// remove previous test database, if any
@ -0,0 +1,376 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class TestExecuteSQLRecord {
private static final Logger LOGGER;
static {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ExecuteSQLRecord", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestExecuteSQLRecord", "debug");
LOGGER = LoggerFactory.getLogger(TestExecuteSQLRecord.class);
final static String DB_LOCATION = "target/db";
final static String QUERY_WITH_EL = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL"
+ " where PER.ID = ${person.id}";
final static String QUERY_WITHOUT_EL = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL"
+ " where PER.ID = 10";
final static String QUERY_WITHOUT_EL_WITH_PARAMS = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL"
+ " where PER.ID < ? AND REL.ID < ?";
public static void setupClass() {
System.setProperty("derby.stream.error.file", "target/derby.log");
private TestRunner runner;
public void setup() throws InitializationException {
final DBCPService dbcp = new DBCPServiceSimpleImpl();
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(ExecuteSQLRecord.class);
runner.addControllerService("dbcp", dbcp, dbcpProperties);
runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "dbcp");
public void testIncomingConnectionWithNoFlowFile() throws InitializationException {
runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM persons");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 0);
runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 0);
public void testIncomingConnectionWithNoFlowFileAndNoQuery() throws InitializationException {
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 0);
runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 0);
@Test(expected = AssertionError.class)
public void testNoIncomingConnectionAndNoQuery() throws InitializationException {
public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException {
invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, false, null, true);
assertEquals(ProvenanceEventType.RECEIVE, runner.getProvenanceEvents().get(0).getEventType());
public void testSelectQueryInFlowFile() throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTriggerRecords(null, QUERY_WITHOUT_EL, true, null, false);
assertEquals(ProvenanceEventType.FORK, runner.getProvenanceEvents().get(0).getEventType());
assertEquals(ProvenanceEventType.FETCH, runner.getProvenanceEvents().get(1).getEventType());
public void testMaxRowsPerFlowFile() throws Exception {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
for (int i = 0; i < 1000; i++) {
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)");
runner.setProperty(AbstractExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5");
runner.setProperty(AbstractExecuteSQL.OUTPUT_BATCH_SIZE, "0");
runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 200);
runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 0);
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key());
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key());
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_COUNT.key());
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0);
firstFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "5");
firstFlowFile.assertAttributeEquals("record.count", "5");
firstFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain"); // MockRecordWriter has text/plain MIME type
firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0");
firstFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULTSET_INDEX, "0");
MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(199);
lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "5");
lastFlowFile.assertAttributeEquals("record.count", "5");
lastFlowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain"); // MockRecordWriter has text/plain MIME type
lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199");
lastFlowFile.assertAttributeEquals(AbstractExecuteSQL.RESULTSET_INDEX, "0");
public void testInsertStatementCreatesFlowFile() throws Exception {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(AbstractExecuteSQL.RESULT_ROW_COUNT, "0");
public void testNoRowsStatementCreatesEmptyFlowFile() throws Exception {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "select * from TEST_NULL_INT");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "0");
public void testWithSqlException() throws Exception {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NO_ROWS");
} catch (final SQLException sqle) {
stmt.execute("create table TEST_NO_ROWS (id integer)");
// Try a valid SQL statement that will generate an error (val1 does not exist, e.g.)
runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS");
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
//No incoming flow file containing a query, and an exception causes no outbound flowfile.
// There should be no flow files on either relationship
runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_FAILURE, 0);
runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 0);
public void invokeOnTriggerRecords(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final Map<String, String> attrs, final boolean setQueryProperty)
throws InitializationException, ClassNotFoundException, SQLException, IOException {
if (queryTimeout != null) {
runner.setProperty(AbstractExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs");
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
LOGGER.info("test data loaded");
// ResultSet size will be 1x200x100 = 20 000 rows
// because of where PER.ID = ${person.id}
final int nrOfRows = 20000;
MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
runner.addControllerService("writer", recordWriter);
runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
if (incomingFlowFile) {
// incoming FlowFile content is not used, but attributes are used
final Map<String, String> attributes = (attrs == null) ? new HashMap<>() : attrs;
attributes.put("person.id", "10");
if (!setQueryProperty) {
runner.enqueue(query.getBytes(), attributes);
} else {
runner.enqueue("Hello".getBytes(), attributes);
if (setQueryProperty) {
runner.setProperty(AbstractExecuteSQL.SQL_SELECT_QUERY, query);
runner.assertAllFlowFilesTransferred(AbstractExecuteSQL.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, AbstractExecuteSQL.RESULT_QUERY_DURATION);
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, AbstractExecuteSQL.RESULT_QUERY_EXECUTION_TIME);
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, AbstractExecuteSQL.RESULT_QUERY_FETCH_TIME);
runner.assertAllFlowFilesContainAttribute(AbstractExecuteSQL.REL_SUCCESS, AbstractExecuteSQL.RESULT_ROW_COUNT);
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_SUCCESS);
final long executionTime = Long.parseLong(flowfiles.get(0).getAttribute(AbstractExecuteSQL.RESULT_QUERY_EXECUTION_TIME));
final long fetchTime = Long.parseLong(flowfiles.get(0).getAttribute(AbstractExecuteSQL.RESULT_QUERY_FETCH_TIME));
final long durationTime = Long.parseLong(flowfiles.get(0).getAttribute(AbstractExecuteSQL.RESULT_QUERY_DURATION));
assertEquals(durationTime, fetchTime + executionTime);
* Simple implementation only for ExecuteSQL processor testing.
class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
public String getIdentifier() {
return "dbcp";
public Connection getConnection() throws ProcessException {
try {
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
Reference in New Issue
Block a user