From 98395de74ffb0d5cd6c7b8842b501f81b2d3ed92 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Tue, 15 Mar 2016 22:55:34 -0400 Subject: [PATCH] NIFI-1575: Add QueryDatabaseTable processor --- .../standard/QueryDatabaseTable.java | 601 ++++++++++++++++++ .../processors/standard/util/JdbcCommon.java | 45 +- .../org.apache.nifi.processor.Processor | 1 + .../standard/QueryDatabaseTableTest.java | 356 +++++++++++ 4 files changed, 998 insertions(+), 5 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java new file mode 100644 index 0000000000..08f6b4119e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.LongHolder; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.text.DecimalFormat; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.sql.Types.ARRAY; +import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BIT; +import static java.sql.Types.BLOB; +import static java.sql.Types.BOOLEAN; +import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; +import static java.sql.Types.DATE; +import static java.sql.Types.DECIMAL; +import static java.sql.Types.DOUBLE; +import static java.sql.Types.FLOAT; +import static java.sql.Types.INTEGER; +import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; +import static java.sql.Types.LONGVARCHAR; +import static java.sql.Types.NCHAR; +import static java.sql.Types.NUMERIC; +import static java.sql.Types.NVARCHAR; +import static java.sql.Types.REAL; +import static java.sql.Types.ROWID; +import static java.sql.Types.SMALLINT; +import static java.sql.Types.TIME; +import static java.sql.Types.TIMESTAMP; +import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; +import static java.sql.Types.VARCHAR; + +@EventDriven +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "select", "jdbc", "query", "database"}) +@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." + + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + + "select query. FlowFile attribute 'querydbtable.row.count' indicates how many rows were selected.") +@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for " + + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor " + + "to fetch only those records that have max values greater than the retained values. This can be used for " + + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + + "per the State Management documentation") +@WritesAttribute(attribute = "querydbtable.row.count") +public class QueryDatabaseTable extends AbstractSessionFactoryProcessor { + + public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; + + public static final String SQL_PREPROCESS_STRATEGY_NONE = "None"; + public static final String SQL_PREPROCESS_STRATEGY_ORACLE = "Oracle"; + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); + + private final Set relationships; + + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain a connection to the database.") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .description("The name of the database table to be queried.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor COLUMN_NAMES = new PropertyDescriptor.Builder() + .name("Columns to Return") + .description("A comma-separated list of column names to be used in the query. If your database requires " + + "special treatment of the names (quoting, e.g.), each name should include such treatment. If no " + + "column names are supplied, all columns in the specified table will be returned.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder() + .name("Maximum-value Columns") + .description("A comma-separated list of column names. The processor will keep track of the maximum value " + + "for each column that has been returned since the processor started running. This can be used to " + + "retrieve only those rows that have been added/updated since the last retrieval. Note that some " + + "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these " + + "types should not be listed in this property, and will result in error(s) during processing.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() + .name("Max Wait Time") + .description("The maximum amount of time allowed for a running SQL select query " + + ", zero means there is no limit. Max time less than 1 second will be equal to zero.") + .defaultValue("0 seconds") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor SQL_PREPROCESS_STRATEGY = new PropertyDescriptor.Builder() + .name("SQL Pre-processing Strategy") + .description("The strategy to employ when generating the SQL for querying the table. A strategy may include " + + "custom or database-specific code, such as the treatment of time/date formats.") + .required(true) + .allowableValues(SQL_PREPROCESS_STRATEGY_NONE, SQL_PREPROCESS_STRATEGY_ORACLE) + .defaultValue("None") + .build(); + + + private final List propDescriptors; + + protected final Map columnTypeMap = new HashMap<>(); + + public QueryDatabaseTable() { + final Set r = new HashSet<>(); + r.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(r); + + final List pds = new ArrayList<>(); + pds.add(DBCP_SERVICE); + pds.add(TABLE_NAME); + pds.add(COLUMN_NAMES); + pds.add(MAX_VALUE_COLUMN_NAMES); + pds.add(QUERY_TIMEOUT); + pds.add(SQL_PREPROCESS_STRATEGY); + propDescriptors = Collections.unmodifiableList(pds); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return propDescriptors; + } + + @OnScheduled + public void setup(final ProcessContext context) { + // Try to fill the columnTypeMap with the types of the desired max-value columns + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String tableName = context.getProperty(TABLE_NAME).getValue(); + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); + + try (final Connection con = dbcpService.getConnection(); + final Statement st = con.createStatement()) { + + // Try a query that returns no rows, for the purposes of getting metadata about the columns. It is possible + // to use DatabaseMetaData.getColumns(), but not all drivers support this, notably the schema-on-read + // approach as in Apache Drill + String query = getSelectFromClause(tableName, maxValueColumnNames).append(" WHERE 1 = 0").toString(); + ResultSet resultSet = st.executeQuery(query); + ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + int numCols = resultSetMetaData.getColumnCount(); + if (numCols > 0) { + columnTypeMap.clear(); + for (int i = 1; i <= numCols; i++) { + String colName = resultSetMetaData.getColumnName(i).toLowerCase(); + int colType = resultSetMetaData.getColumnType(i); + columnTypeMap.put(colName, colType); + } + + } else { + throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames); + } + + } catch (SQLException e) { + throw new ProcessException("Unable to communicate with database in order to determine column types", e); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + ProcessSession session = sessionFactory.createSession(); + FlowFile fileToProcess = null; + + final ProcessorLog logger = getLogger(); + + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String tableName = context.getProperty(TABLE_NAME).getValue(); + final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); + final String preProcessStrategy = context.getProperty(SQL_PREPROCESS_STRATEGY).getValue(); + final StateManager stateManager = context.getStateManager(); + final StateMap stateMap; + + try { + stateMap = stateManager.getState(Scope.CLUSTER); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform " + + "query until this is accomplished.", ioe); + context.yield(); + return; + } + // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually + // set as the current state map (after the session has been committed) + final Map statePropertyMap = new HashMap<>(stateMap.toMap()); + + final String selectQuery = getQuery(tableName, columnNames, getColumns(maxValueColumnNames), stateMap, preProcessStrategy); + final StopWatch stopWatch = new StopWatch(true); + + try (final Connection con = dbcpService.getConnection(); + final Statement st = con.createStatement()) { + + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + st.setQueryTimeout(queryTimeout); // timeout in seconds + + final LongHolder nrOfRows = new LongHolder(0L); + + fileToProcess = session.create(); + fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { + logger.debug("Executing query {}", new Object[]{selectQuery}); + final ResultSet resultSet = st.executeQuery(selectQuery); + // Max values will be updated in the state property map by the callback + final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(statePropertyMap, preProcessStrategy); + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector)); + + } catch (final SQLException e) { + throw new ProcessException("Error during database query or conversion of records to Avro", e); + } + } + }); + + if (nrOfRows.get() > 0) { + // set attribute how many rows were selected + fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, nrOfRows.get().toString()); + + logger.info("{} contains {} Avro records; transferring to 'success'", + new Object[]{fileToProcess, nrOfRows.get()}); + String jdbcURL = "DBCPService"; + try { + DatabaseMetaData databaseMetaData = con.getMetaData(); + if (databaseMetaData != null) { + jdbcURL = databaseMetaData.getURL(); + } + } catch (SQLException se) { + // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly + } + session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(fileToProcess, REL_SUCCESS); + } else { + // If there were no rows returned, don't send the flowfile + session.remove(fileToProcess); + context.yield(); + } + + } catch (final ProcessException | SQLException e) { + logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); + if (fileToProcess != null) { + session.remove(fileToProcess); + } + context.yield(); + } finally { + session.commit(); + try { + // Update the state + stateManager.setState(statePropertyMap, Scope.CLUSTER); + } catch (IOException ioe) { + getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe}); + } + } + } + + protected List getColumns(String commaSeparatedColumnList) { + if (StringUtils.isEmpty(commaSeparatedColumnList)) { + return Collections.emptyList(); + } + final String[] columns = commaSeparatedColumnList.split(","); + final List columnList = new ArrayList<>(columns.length); + for (String column : columns) { + if (column != null) { + String trimmedColumn = column.trim(); + if (!StringUtils.isEmpty(trimmedColumn)) { + columnList.add(trimmedColumn); + } + } + } + return columnList; + } + + protected String getQuery(String tableName, String columnNames, List maxValColumnNames, + StateMap stateMap, String preProcessStrategy) { + if (StringUtils.isEmpty(tableName)) { + throw new IllegalArgumentException("Table name must be specified"); + } + final StringBuilder query = new StringBuilder(getSelectFromClause(tableName, columnNames)); + + // Check state map for last max values + if (stateMap != null && stateMap.getVersion() != -1 && maxValColumnNames != null) { + Map stateProperties = stateMap.toMap(); + List whereClauses = new ArrayList<>(maxValColumnNames.size()); + for (String colName : maxValColumnNames) { + String maxValue = stateProperties.get(colName.toLowerCase()); + if (!StringUtils.isEmpty(maxValue)) { + Integer type = columnTypeMap.get(colName.toLowerCase()); + if (type == null) { + // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled. + throw new IllegalArgumentException("No column type found for: " + colName); + } + // Add a condition for the WHERE clause + whereClauses.add(colName + " > " + getLiteralByType(type, maxValue, preProcessStrategy)); + } + } + if (!whereClauses.isEmpty()) { + query.append(" WHERE "); + query.append(StringUtils.join(whereClauses, " AND ")); + } + } + + return query.toString(); + } + + /** + * Returns a basic SELECT ... FROM clause with the given column names and table name. If no column names are found, + * the wildcard (*) is used to select all columns. + * + * @param tableName The name of the table to select from + * @param columnNames A comma-separated list of column names to select from the table + * @return A SQL select statement representing a query of the given column names from the given table + */ + protected StringBuilder getSelectFromClause(String tableName, String columnNames) { + final StringBuilder query = new StringBuilder("SELECT "); + if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) { + query.append("*"); + } else { + query.append(columnNames); + } + query.append(" FROM "); + query.append(tableName); + return query; + } + + /** + * Returns a SQL literal for the given value based on its type. For example, values of character type need to be enclosed + * in single quotes, whereas values of numeric type should not be. + * + * @param type The JDBC type for the desired literal + * @param value The value to be converted to a SQL literal + * @return A String representing the given value as a literal of the given type + */ + protected String getLiteralByType(int type, String value, String preProcessStrategy) { + // Format value based on column type. For example, strings and timestamps need to be quoted + switch (type) { + // For string-represented values, put in single quotes + case CHAR: + case LONGNVARCHAR: + case LONGVARCHAR: + case NCHAR: + case NVARCHAR: + case VARCHAR: + case ROWID: + case DATE: + case TIME: + return "'" + value + "'"; + case TIMESTAMP: + // Timestamp literals in Oracle need to be cast with TO_DATE + if (SQL_PREPROCESS_STRATEGY_ORACLE.equals(preProcessStrategy)) { + return "to_date('" + value + "', 'yyyy-mm-dd HH24:MI:SS')"; + } else { + return "'" + value + "'"; + } + // Else leave as is (numeric types, e.g.) + default: + return value; + } + } + + protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback { + String preProcessStrategy; + Map newColMap; + + public MaxValueResultSetRowCollector(Map stateMap, String preProcessStrategy) { + this.preProcessStrategy = preProcessStrategy; + newColMap = stateMap; + } + + @Override + public void processRow(ResultSet resultSet) throws IOException { + if (resultSet == null) { + return; + } + try { + // Iterate over the row, check-and-set max values + final ResultSetMetaData meta = resultSet.getMetaData(); + final int nrOfColumns = meta.getColumnCount(); + if (nrOfColumns > 0) { + for (int i = 1; i <= nrOfColumns; i++) { + String colName = meta.getColumnName(i).toLowerCase(); + Integer type = columnTypeMap.get(colName); + // Skip any columns we're not keeping track of or whose value is null + if (type == null || resultSet.getObject(i) == null) { + continue; + } + String maxValueString = newColMap.get(colName); + switch (type) { + case CHAR: + case LONGNVARCHAR: + case LONGVARCHAR: + case NCHAR: + case NVARCHAR: + case VARCHAR: + case ROWID: + String colStringValue = resultSet.getString(i); + if (maxValueString == null || colStringValue.compareTo(maxValueString) > 0) { + newColMap.put(colName, colStringValue); + } + break; + + case INTEGER: + case SMALLINT: + case TINYINT: + Integer colIntValue = resultSet.getInt(i); + Integer maxIntValue = null; + if (maxValueString != null) { + maxIntValue = Integer.valueOf(maxValueString); + } + if (maxIntValue == null || colIntValue > maxIntValue) { + newColMap.put(colName, colIntValue.toString()); + } + break; + + case BIGINT: + Long colLongValue = resultSet.getLong(i); + Long maxLongValue = null; + if (maxValueString != null) { + maxLongValue = Long.valueOf(maxValueString); + } + if (maxLongValue == null || colLongValue > maxLongValue) { + newColMap.put(colName, colLongValue.toString()); + } + break; + + case FLOAT: + case REAL: + case DOUBLE: + Double colDoubleValue = resultSet.getDouble(i); + Double maxDoubleValue = null; + if (maxValueString != null) { + maxDoubleValue = Double.valueOf(maxValueString); + } + if (maxDoubleValue == null || colDoubleValue > maxDoubleValue) { + newColMap.put(colName, colDoubleValue.toString()); + } + break; + + case DECIMAL: + case NUMERIC: + BigDecimal colBigDecimalValue = resultSet.getBigDecimal(i); + BigDecimal maxBigDecimalValue = null; + if (maxValueString != null) { + DecimalFormat df = new DecimalFormat(); + df.setParseBigDecimal(true); + maxBigDecimalValue = (BigDecimal) df.parse(maxValueString); + } + if (maxBigDecimalValue == null || colBigDecimalValue.compareTo(maxBigDecimalValue) > 0) { + newColMap.put(colName, colBigDecimalValue.toString()); + } + break; + + case DATE: + Date rawColDateValue = resultSet.getDate(i); + java.sql.Date colDateValue = new java.sql.Date(rawColDateValue.getTime()); + java.sql.Date maxDateValue = null; + if (maxValueString != null) { + maxDateValue = java.sql.Date.valueOf(maxValueString); + } + if (maxDateValue == null || colDateValue.after(maxDateValue)) { + newColMap.put(colName, colDateValue.toString()); + } + break; + + case TIME: + Date rawColTimeValue = resultSet.getDate(i); + java.sql.Time colTimeValue = new java.sql.Time(rawColTimeValue.getTime()); + java.sql.Time maxTimeValue = null; + if (maxValueString != null) { + maxTimeValue = java.sql.Time.valueOf(maxValueString); + } + if (maxTimeValue == null || colTimeValue.after(maxTimeValue)) { + newColMap.put(colName, colTimeValue.toString()); + } + break; + + case TIMESTAMP: + // Oracle timestamp queries must use literals in java.sql.Date format + if (SQL_PREPROCESS_STRATEGY_ORACLE.equals(preProcessStrategy)) { + Date rawColOracleTimestampValue = resultSet.getDate(i); + java.sql.Date oracleTimestampValue = new java.sql.Date(rawColOracleTimestampValue.getTime()); + java.sql.Date maxOracleTimestampValue = null; + if (maxValueString != null) { + maxOracleTimestampValue = java.sql.Date.valueOf(maxValueString); + } + if (maxOracleTimestampValue == null || oracleTimestampValue.after(maxOracleTimestampValue)) { + newColMap.put(colName, oracleTimestampValue.toString()); + } + } else { + Timestamp rawColTimestampValue = resultSet.getTimestamp(i); + java.sql.Timestamp colTimestampValue = new java.sql.Timestamp(rawColTimestampValue.getTime()); + java.sql.Timestamp maxTimestampValue = null; + if (maxValueString != null) { + maxTimestampValue = java.sql.Timestamp.valueOf(maxValueString); + } + if (maxTimestampValue == null || colTimestampValue.after(maxTimestampValue)) { + newColMap.put(colName, colTimestampValue.toString()); + } + } + break; + + case BIT: + case BOOLEAN: + case BINARY: + case VARBINARY: + case LONGVARBINARY: + case ARRAY: + case BLOB: + case CLOB: + default: + throw new IOException("Type " + meta.getColumnTypeName(i) + " is not valid for maintaining maximum value"); + } + } + } + } catch (ParseException | SQLException e) { + throw new IOException(e); + } + + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 124791950b..3b74c19128 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -69,17 +69,30 @@ import org.apache.commons.lang3.StringUtils; public class JdbcCommon { public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException { - final Schema schema = createSchema(rs); + return convertToAvroStream(rs, outStream, null, null); + } + + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName) + throws SQLException, IOException { + return convertToAvroStream(rs, outStream, recordName, null); + } + + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) + throws SQLException, IOException { + final Schema schema = createSchema(rs, recordName); final GenericRecord rec = new GenericData.Record(schema); - final DatumWriter datumWriter = new GenericDatumWriter(schema); - try (final DataFileWriter dataFileWriter = new DataFileWriter(datumWriter)) { + final DatumWriter datumWriter = new GenericDatumWriter<>(schema); + try (final DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter)) { dataFileWriter.create(schema, outStream); final ResultSetMetaData meta = rs.getMetaData(); final int nrOfColumns = meta.getColumnCount(); long nrOfRows = 0; while (rs.next()) { + if (callback != null) { + callback.processRow(rs); + } for (int i = 1; i <= nrOfColumns; i++) { final int javaSqlType = meta.getColumnType(i); final Object value = rs.getObject(i); @@ -125,10 +138,23 @@ public class JdbcCommon { } public static Schema createSchema(final ResultSet rs) throws SQLException { + return createSchema(rs, null); + } + + /** + * Creates an Avro schema from a result set. If the table/record name is known a priori and provided, use that as a + * fallback for the record name if it cannot be retrieved from the result set, and finally fall back to a default value. + * + * @param rs The result set to convert to Avro + * @param recordName The a priori record name to use if it cannot be determined from the result set. + * @return A Schema object representing the result set converted to an Avro record + * @throws SQLException if any error occurs during conversion + */ + public static Schema createSchema(final ResultSet rs, String recordName) throws SQLException { final ResultSetMetaData meta = rs.getMetaData(); final int nrOfColumns = meta.getColumnCount(); - String tableName = "NiFi_ExecuteSQL_Record"; - if(nrOfColumns > 0) { + String tableName = StringUtils.isEmpty(recordName) ? "NiFi_ExecuteSQL_Record" : recordName; + if (nrOfColumns > 0) { String tableNameFromMeta = meta.getTableName(1); if (!StringUtils.isBlank(tableNameFromMeta)) { tableName = tableNameFromMeta; @@ -218,4 +244,13 @@ public class JdbcCommon { return builder.endRecord(); } + + /** + * An interface for callback methods which allows processing of a row during the convertToAvroStream() processing. + * IMPORTANT: This method should only work on the row pointed at by the current ResultSet reference. + * Advancing the cursor (e.g.) can cause rows to be skipped during Avro transformation. + */ + public interface ResultSetRowCallback { + void processRow(ResultSet resultSet) throws IOException; + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index d3d765ebc4..6c52d289f5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -65,6 +65,7 @@ org.apache.nifi.processors.standard.PutJMS org.apache.nifi.processors.standard.PutSFTP org.apache.nifi.processors.standard.PutSQL org.apache.nifi.processors.standard.PutSyslog +org.apache.nifi.processors.standard.QueryDatabaseTable org.apache.nifi.processors.standard.ReplaceText org.apache.nifi.processors.standard.RouteText org.apache.nifi.processors.standard.ReplaceTextWithMapping diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java new file mode 100644 index 0000000000..d16b9c646e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.fusesource.hawtbuf.ByteArrayInputStream; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for the QueryDatabaseTable processor + */ +public class QueryDatabaseTableTest { + + MockQueryDatabaseTable processor; + private TestRunner runner; + final static String DB_LOCATION = "target/db"; + + + @BeforeClass + public static void setupClass() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + @Before + public void setup() throws InitializationException, IOException { + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map dbcpProperties = new HashMap<>(); + + processor = new MockQueryDatabaseTable(); + runner = TestRunners.newTestRunner(processor); + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(QueryDatabaseTable.DBCP_SERVICE, "dbcp"); + runner.getStateManager().clear(Scope.CLUSTER); + } + + @After + public void teardown() { + runner = null; + } + + @Test + public void testGetColumns() throws Exception { + assertTrue(processor.getColumns(null).isEmpty()); + assertTrue(processor.getColumns("").isEmpty()); + assertEquals(2, processor.getColumns("col1,col2").size()); + } + + @Test + public void testGetQuery() throws Exception { + String query = processor.getQuery("myTable", null, null, null, "None"); + assertEquals("SELECT * FROM myTable", query); + query = processor.getQuery("myTable", "col1,col2", null, null, "None"); + assertEquals("SELECT col1,col2 FROM myTable", query); + + query = processor.getQuery("myTable", null, Collections.singletonList("id"), null, "None"); + assertEquals("SELECT * FROM myTable", query); + + Map maxValues = new HashMap<>(); + maxValues.put("id", "509"); + StateManager stateManager = runner.getStateManager(); + stateManager.setState(maxValues, Scope.CLUSTER); + processor.putColumnType("id", Types.INTEGER); + query = processor.getQuery("myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER), "None"); + assertEquals("SELECT * FROM myTable WHERE id > 509", query); + + maxValues.put("date_created", "2016-03-07 12:34:56"); + stateManager.setState(maxValues, Scope.CLUSTER); + processor.putColumnType("date_created", Types.TIMESTAMP); + query = processor.getQuery("myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER), "None"); + assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > '2016-03-07 12:34:56'", query); + // Test Oracle strategy + query = processor.getQuery("myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER), "Oracle"); + assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED > to_date('2016-03-07 12:34:56', 'yyyy-mm-dd HH24:MI:SS')", query); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetQueryNoTable() throws Exception { + processor.getQuery(null, null, null, null, "None"); + } + + @Test + public void testAddedRows() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); + + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + + InputStream in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(3, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Add a new row with a higher ID and run, one flowfile with one new row should be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + + // Sanity check - run again, this time no flowfiles/rows should be transferred + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Add timestamp as a max value column name + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "id, created_on"); + + // Add a new row with a higher ID and run, one flow file will be transferred because no max value for the timestamp has been stored + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Add a new row with a higher ID but lower timestamp and run, no flow file will be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (5, 'NO NAME', 15.0, '2001-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Add a new row with a higher ID and run, one flow file will be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (6, 'Mr. NiFi', 1.0, '2012-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set + runner.getStateManager().clear(Scope.CLUSTER); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "name"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(7, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Add a new row with a "higher" name than the max but lower than "NULL" (to test that null values are skipped), one flow file will be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (7, 'NULK', 1.0, '2012-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set + runner.getStateManager().clear(Scope.CLUSTER); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "scale"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(8, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Add a new row with a higher value for scale than the max, one flow file will be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (8, 'NULK', 100.0, '2012-01-01 03:23:34.234')"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Set scale as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set + runner.getStateManager().clear(Scope.CLUSTER); + runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "bignum"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(9, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + + // Add a new row with a higher value for scale than the max, one flow file will be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on, bignum) VALUES (9, 'Alice Bob', 100.0, '2012-01-01 03:23:34.234', 1)"); + runner.run(); + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + runner.clearTransferState(); + } + + + @Test + public void testWithNullIntColumn() throws SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + // Ignore, usually due to Derby not having DROP TABLE IF EXISTS + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)"); + + runner.setIncomingConnection(false); + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT"); + runner.run(); + + runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).assertAttributeEquals(QueryDatabaseTable.RESULT_ROW_COUNT, "2"); + } + + @Test + public void testWithSqlException() throws SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NO_ROWS"); + } catch (final SQLException sqle) { + // Ignore, usually due to Derby not having DROP TABLE IF EXISTS + } + + stmt.execute("create table TEST_NO_ROWS (id integer)"); + + runner.setIncomingConnection(false); + // Try a valid SQL statement that will generate an error (val1 does not exist, e.g.) + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NO_ROWS"); + runner.setProperty(QueryDatabaseTable.COLUMN_NAMES, "val1"); + runner.run(); + + assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty()); + } + + private long getNumberOfRecordsFromStream(InputStream in) throws IOException { + final DatumReader datumReader = new GenericDatumReader<>(); + try (DataFileStream dataFileReader = new DataFileStream<>(in, datumReader)) { + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + recordsFromStream += 1; + } + + return recordsFromStream; + } + } + + /** + * Simple implementation only for QueryDatabaseTable processor testing. + */ + class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + } catch (final Exception e) { + throw new ProcessException("getConnection failed: " + e); + } + } + } + + @Stateful(scopes = Scope.CLUSTER, description = "Mock for QueryDatabaseTable processor") + private static class MockQueryDatabaseTable extends QueryDatabaseTable { + public void putColumnType(String colName, Integer colType) { + columnTypeMap.put(colName, colType); + } + } +} \ No newline at end of file