From 6d4901cd26388e1810cb670a135d2cf0e87cc8d0 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Mon, 9 Jan 2017 13:37:03 -0500 Subject: [PATCH] NIFI-2881: Added EL support to DB Fetch processors - Allow incoming flowfiles to GenerateTableFetch - Incorporated review comments/discussions - Updated documentation, added error attribute to GenerateTableFetch - Corrected notes for column properties in fetch processors This closes #1407. Signed-off-by: Koji Kawamura --- .../AbstractDatabaseFetchProcessor.java | 64 ++++- .../standard/GenerateTableFetch.java | 135 ++++++--- .../standard/QueryDatabaseTable.java | 79 ++++-- .../standard/QueryDatabaseTableTest.java | 19 +- .../standard/TestGenerateTableFetch.java | 267 +++++++++++++++++- 5 files changed, 495 insertions(+), 69 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index eda93287e3..7728af1545 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -17,6 +17,8 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessContext; @@ -24,6 +26,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.db.DatabaseAdapter; +import org.apache.nifi.util.StringUtils; import java.io.IOException; import java.math.BigDecimal; @@ -36,6 +39,7 @@ import java.sql.Timestamp; import java.text.DecimalFormat; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -97,15 +101,18 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact .description("The name of the database table to be queried.") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor COLUMN_NAMES = new PropertyDescriptor.Builder() .name("Columns to Return") .description("A comma-separated list of column names to be used in the query. If your database requires " + "special treatment of the names (quoting, e.g.), each name should include such treatment. If no " - + "column names are supplied, all columns in the specified table will be returned.") + + "column names are supplied, all columns in the specified table will be returned. NOTE: It is important " + + "to use consistent column names for a given table for incremental fetch to work properly.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder() @@ -117,9 +124,11 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact + "can be used to retrieve only those rows that have been added/updated since the last retrieval. Note that some " + "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these " + "types should not be listed in this property, and will result in error(s) during processing. If no columns " - + "are provided, all rows from the table will be considered, which could have a performance impact.") + + "are provided, all rows from the table will be considered, which could have a performance impact. NOTE: It is important " + + "to use consistent max-value column names for a given table for incremental fetch to work properly.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() @@ -129,6 +138,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact .defaultValue("0 seconds") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() @@ -143,11 +153,24 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact protected List propDescriptors; + // The delimiter to use when referencing qualified names (such as table@!@column in the state map) + protected static final String NAMESPACE_DELIMITER = "@!@"; + public static final PropertyDescriptor DB_TYPE; protected final static Map dbAdapters = new HashMap<>(); protected final Map columnTypeMap = new HashMap<>(); + // This value is set when the processor is scheduled and indicates whether the Table Name property contains Expression Language. + // It is used for backwards-compatibility purposes; if the value is false and the fully-qualified state key (table + column) is not found, + // the processor will look for a state key with just the column name. + protected volatile boolean isDynamicTableName = false; + + // This value is set when the processor is scheduled and indicates whether the Maximum Value Columns property contains Expression Language. + // It is used for backwards-compatibility purposes; if the table name and max-value columns are static, then the column types can be + // pre-fetched when the processor is scheduled, rather than having to populate them on-the-fly. + protected volatile boolean isDynamicMaxValues = false; + private static SimpleDateFormat TIME_TYPE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS"); static { @@ -166,13 +189,28 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact .build(); } + // A common validation procedure for DB fetch processors, it stores whether the Table Name and/or Max Value Column properties have expression language + protected Collection customValidate(ValidationContext validationContext) { + // For backwards-compatibility, keep track of whether the table name and max-value column properties are dynamic (i.e. has expression language) + isDynamicTableName = validationContext.isExpressionLanguagePresent(validationContext.getProperty(TABLE_NAME).getValue()); + isDynamicMaxValues = validationContext.isExpressionLanguagePresent(validationContext.getProperty(MAX_VALUE_COLUMN_NAMES).getValue()); + + return super.customValidate(validationContext); + } + public void setup(final ProcessContext context) { + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); + + // If there are no max-value column names specified, we don't need to perform this processing + if (StringUtils.isEmpty(maxValueColumnNames)) { + return; + } + // Try to fill the columnTypeMap with the types of the desired max-value columns final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); - final String tableName = context.getProperty(TABLE_NAME).getValue(); - final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); - final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); + final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { @@ -187,10 +225,10 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact columnTypeMap.clear(); for (int i = 1; i <= numCols; i++) { String colName = resultSetMetaData.getColumnName(i).toLowerCase(); + String colKey = getStateKey(tableName, colName); int colType = resultSetMetaData.getColumnType(i); - columnTypeMap.put(colName, colType); + columnTypeMap.putIfAbsent(colKey, colType); } - } else { throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames); } @@ -378,4 +416,16 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact return value; } } + + protected static String getStateKey(String prefix, String columnName) { + StringBuilder sb = new StringBuilder(); + if (prefix != null) { + sb.append(prefix.toLowerCase()); + sb.append(NAMESPACE_DELIMITER); + } + if (columnName != null) { + sb.append(columnName.toLowerCase()); + } + return sb.toString(); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index bff1024ae1..966c20db4b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -21,11 +21,15 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; @@ -49,6 +53,7 @@ import java.sql.Statement; import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -60,18 +65,28 @@ import java.util.stream.IntStream; @TriggerSerially -@InputRequirement(Requirement.INPUT_FORBIDDEN) +@InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"sql", "select", "jdbc", "query", "database", "fetch", "generate"}) -@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class}) +@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class, ListDatabaseTables.class}) @CapabilityDescription("Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, " + "determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, " + "which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This " - + "processor is intended to be run on the Primary Node only.") + + "processor is intended to be run on the Primary Node only.\n\n" + + "This processor can accept incoming connections; the behavior of the processor is different whether incoming connections are provided:\n" + + " - If no incoming connection(s) are specified, the processor will generate SQL queries on the specified processor schedule. Expression Language is supported for many " + + "fields, but no flow file attributes are available. However the properties will be evaluated using the Variable Registry.\n" + + " - If incoming connection(s) are specified and no flow file is available to a processor task, no work will be performed.\n" + + " - If incoming connection(s) are specified and a flow file is available to a processor task, the flow file's attributes may be used in Expression Language for such fields " + + "as Table Name and others. However, the Max-Value Columns and Columns to Return fields must be empty or refer to columns that are available in each specified table.") @Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for " + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor " + "to fetch only those records that have max values greater than the retained values. This can be used for " + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + "per the State Management documentation") +@WritesAttributes({ + @WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes " + + "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message.") +}) public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder() @@ -83,13 +98,20 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { + "in the table.") .defaultValue("10000") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. " + + "If no incoming connection(s) are specified, this relationship is unused.") + .build(); + public GenerateTableFetch() { final Set r = new HashSet<>(); r.add(REL_SUCCESS); + r.add(REL_FAILURE); relationships = Collections.unmodifiableSet(r); final List pds = new ArrayList<>(); @@ -113,22 +135,41 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { return propDescriptors; } + @Override + protected Collection customValidate(ValidationContext validationContext) { + return super.customValidate(validationContext); + } + @OnScheduled public void setup(final ProcessContext context) { - super.setup(context); + // Pre-fetch the column types if using a static table name and max-value columns + if (!isDynamicTableName && !isDynamicMaxValues) { + super.setup(context); + } } @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { ProcessSession session = sessionFactory.createSession(); + + FlowFile fileToProcess = null; + if (context.hasIncomingConnection()) { + fileToProcess = session.get(); + + if (fileToProcess == null) { + // Incoming connection with no flow file available, do no work (see capability description) + return; + } + } + final ComponentLog logger = getLogger(); final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); - final String tableName = context.getProperty(TABLE_NAME).getValue(); - final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); - final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); - final int partitionSize = context.getProperty(PARTITION_SIZE).asInteger(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue(); + final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue(); + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue(); + final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger(); final StateManager stateManager = context.getStateManager(); final StateMap stateMap; @@ -164,11 +205,20 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> { String colName = maxValueColumnNameList.get(index); maxValueSelectColumns.add("MAX(" + colName + ") " + colName); - String maxValue = statePropertyMap.get(colName.toLowerCase()); + final String fullyQualifiedStateKey = getStateKey(tableName, colName); + String maxValue = statePropertyMap.get(fullyQualifiedStateKey); + if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) { + // If the table name is static and the fully-qualified key was not found, try just the column name + maxValue = statePropertyMap.get(getStateKey(null, colName)); + } if (!StringUtils.isEmpty(maxValue)) { - Integer type = columnTypeMap.get(colName.toLowerCase()); + Integer type = columnTypeMap.get(fullyQualifiedStateKey); + if (type == null && !isDynamicTableName) { + // If the table name is static and the fully-qualified key was not found, try just the column name + type = columnTypeMap.get(getStateKey(null, colName)); + } if (type == null) { - // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled. + // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed throw new IllegalArgumentException("No column type found for: " + colName); } // Add a condition for the WHERE clause @@ -186,7 +236,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { - final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue(); st.setQueryTimeout(queryTimeout); // timeout in seconds logger.debug("Executing {}", new Object[]{selectQuery}); @@ -202,40 +252,61 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { ResultSetMetaData rsmd = resultSet.getMetaData(); for (int i = 2; i <= rsmd.getColumnCount(); i++) { String resultColumnName = rsmd.getColumnName(i).toLowerCase(); + String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName); + String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey); + if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) { + // If we can't find the value at the fully-qualified key name and the table name is static, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new + // maximum value is observed, it will be stored under the fully-qualified key from then on. + resultColumnCurrentMax = statePropertyMap.get(resultColumnName); + } + int type = rsmd.getColumnType(i); + if (isDynamicTableName) { + // We haven't pre-populated the column type map if the table name is dynamic, so do it here + columnTypeMap.put(fullyQualifiedStateKey, type); + } try { - String newMaxValue = getMaxValueFromRow(resultSet, i, type, statePropertyMap.get(resultColumnName.toLowerCase()), dbAdapter.getName()); + String newMaxValue = getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax, dbAdapter.getName()); if (newMaxValue != null) { - statePropertyMap.put(resultColumnName, newMaxValue); + statePropertyMap.put(fullyQualifiedStateKey, newMaxValue); } } catch (ParseException | IOException pie) { // Fail the whole thing here before we start creating flow files and such throw new ProcessException(pie); } + } } else { // Something is very wrong here, one row (even if count is zero) should be returned throw new SQLException("No rows returned from metadata query: " + selectQuery); } + + final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); + + // Generate SQL statements to read "pages" of data + for (int i = 0; i < numberOfFetches; i++) { + Integer limit = partitionSize == 0 ? null : partitionSize; + Integer offset = partitionSize == 0 ? null : i * partitionSize; + final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset); + FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); + sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); + session.transfer(sqlFlowFile, REL_SUCCESS); + } + + if (fileToProcess != null) { + session.remove(fileToProcess); + } } catch (SQLException e) { - logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); - throw new ProcessException(e); - } - final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); + if (fileToProcess != null) { + logger.error("Unable to execute SQL select query {} due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess}); + fileToProcess = session.putAttribute(fileToProcess, "generatetablefetch.sql.error", e.getMessage()); + session.transfer(fileToProcess, REL_FAILURE); - - // Generate SQL statements to read "pages" of data - for (int i = 0; i < numberOfFetches; i++) { - FlowFile sqlFlowFile; - - Integer limit = partitionSize == 0 ? null : partitionSize; - Integer offset = partitionSize == 0 ? null : i * partitionSize; - final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset); - sqlFlowFile = session.create(); - sqlFlowFile = session.write(sqlFlowFile, out -> { - out.write(query.getBytes()); - }); - session.transfer(sqlFlowFile, REL_SUCCESS); + } else { + logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); + throw new ProcessException(e); + } } session.commit(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 2fddb87dd5..1d898b4b47 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -25,6 +25,7 @@ import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; @@ -70,9 +71,13 @@ import java.util.stream.IntStream; @EventDriven @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"sql", "select", "jdbc", "query", "database"}) -@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." - + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " - + "a timer or cron expression, using the standard scheduling methods. FlowFile attribute " +@SeeAlso({GenerateTableFetch.class, ExecuteSQL.class}) +@CapabilityDescription("Generates and executes a SQL select query to fetch all rows whose values in the specified Maximum Value column(s) are larger than the " + + "previously-seen maxima. Query result will be converted to Avro format. Expression Language is supported for several properties, but no incoming " + + "connections are permitted. The Variable Registry may be used to provide values for any property containing Expression Language. If it is desired to " + + "leverage flow file attributes to perform these queries, the GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. " + + "Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + + "a timer or cron expression, using the standard scheduling methods. This processor is intended to be run on the Primary Node only. FlowFile attribute " + "'querydbtable.row.count' indicates how many rows were selected.") @Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for " + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor " @@ -80,7 +85,7 @@ import java.util.stream.IntStream; + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + "per the State Management documentation") @WritesAttributes({ - @WritesAttribute(attribute = "querydbtable.row.count"), + @WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"), @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), @WritesAttribute(attribute="fragment.count", description="If 'Max Rows Per Flow File' is set then this is the total number of " @@ -107,6 +112,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { .defaultValue("0") .required(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() @@ -117,6 +123,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { .defaultValue("0") .required(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder() @@ -127,6 +134,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { .defaultValue("0") .required(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) .build(); public QueryDatabaseTable() { @@ -184,13 +192,13 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); - final String tableName = context.getProperty(TABLE_NAME).getValue(); - final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); - final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); - final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger(); - final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); + final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue(); + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); + final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger(); + final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet() - ? context.getProperty(MAX_FRAGMENTS).asInteger() + ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger() : 0; final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean(); @@ -212,9 +220,21 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final Map statePropertyMap = new HashMap<>(stateMap.toMap()); //If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map - for(final Map.Entry maxProp : maxValueProperties.entrySet()){ - if (!statePropertyMap.containsKey(maxProp.getKey().toLowerCase())) { - statePropertyMap.put(maxProp.getKey().toLowerCase(), maxProp.getValue()); + for (final Map.Entry maxProp : maxValueProperties.entrySet()) { + String maxPropKey = maxProp.getKey().toLowerCase(); + String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey); + if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) { + String newMaxPropValue; + // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name, + // but store the new initial max value under the fully-qualified key. + if (statePropertyMap.containsKey(maxPropKey)) { + newMaxPropValue = statePropertyMap.get(maxPropKey); + } else { + newMaxPropValue = maxProp.getValue(); + } + statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue); + } } @@ -247,7 +267,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly } - final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue(); st.setQueryTimeout(queryTimeout); // timeout in seconds try { logger.debug("Executing query {}", new Object[]{selectQuery}); @@ -304,7 +324,10 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { for (int i = 0; i < resultSetFlowFiles.size(); i++) { // Add maximum values as attributes for (Map.Entry entry : statePropertyMap.entrySet()) { - resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + entry.getKey(), entry.getValue())); + // Get just the column name from the key + String key = entry.getKey(); + String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length()); + resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue())); } //set count on all FlowFiles @@ -349,9 +372,16 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { List whereClauses = new ArrayList<>(maxValColumnNames.size()); IntStream.range(0, maxValColumnNames.size()).forEach((index) -> { String colName = maxValColumnNames.get(index); - String maxValue = stateMap.get(colName.toLowerCase()); + String maxValueKey = getStateKey(tableName, colName); + String maxValue = stateMap.get(maxValueKey); + if (StringUtils.isEmpty(maxValue)) { + // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new + // maximum value is observed, it will be stored under the fully-qualified key from then on. + maxValue = stateMap.get(colName.toLowerCase()); + } if (!StringUtils.isEmpty(maxValue)) { - Integer type = columnTypeMap.get(colName.toLowerCase()); + Integer type = columnTypeMap.get(maxValueKey); if (type == null) { // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled. throw new IllegalArgumentException("No column type found for: " + colName); @@ -371,7 +401,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { protected Map getDefaultMaxValueProperties(final Map properties){ - final Map defaultMaxValues = new HashMap(); + final Map defaultMaxValues = new HashMap<>(); for (final Map.Entry entry : properties.entrySet()) { final String key = entry.getKey().getName(); @@ -407,15 +437,22 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { if (nrOfColumns > 0) { for (int i = 1; i <= nrOfColumns; i++) { String colName = meta.getColumnName(i).toLowerCase(); - Integer type = columnTypeMap.get(colName); + String fullyQualifiedMaxValueKey = getStateKey(meta.getTableName(i), colName); + Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey); // Skip any columns we're not keeping track of or whose value is null if (type == null || resultSet.getObject(i) == null) { continue; } - String maxValueString = newColMap.get(colName); + String maxValueString = newColMap.get(fullyQualifiedMaxValueKey); + // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new + // maximum value is observed, it will be stored under the fully-qualified key from then on. + if (StringUtils.isEmpty(maxValueString)) { + maxValueString = newColMap.get(colName); + } String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName()); if (newMaxValueString != null) { - newColMap.put(colName, newMaxValueString); + newColMap.put(fullyQualifiedMaxValueKey, newMaxValueString); } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 3353a8725f..92f4757f4e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -72,6 +72,8 @@ public class QueryDatabaseTableTest { private final static String DB_LOCATION = "target/db_qdt"; private DatabaseAdapter dbAdapter; private HashMap origDbAdapters; + private final static String TABLE_NAME_KEY = "tableName"; + private final static String MAX_ROWS_KEY = "maxRows"; @BeforeClass @@ -142,13 +144,13 @@ public class QueryDatabaseTableTest { maxValues.put("id", "509"); StateManager stateManager = runner.getStateManager(); stateManager.setState(maxValues, Scope.CLUSTER); - processor.putColumnType("id", Types.INTEGER); + processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER); query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER).toMap()); assertEquals("SELECT * FROM myTable WHERE id > 509", query); maxValues.put("date_created", "2016-03-07 12:34:56"); stateManager.setState(maxValues, Scope.CLUSTER); - processor.putColumnType("date_created", Types.TIMESTAMP); + processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP); query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap()); assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query); @@ -460,6 +462,7 @@ public class QueryDatabaseTableTest { runner.setIncomingConnection(false); runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT"); + runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id"); QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() { @Override @@ -521,7 +524,8 @@ public class QueryDatabaseTableTest { runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); runner.setIncomingConnection(false); runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); - runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "9");//Using a non-round number to make sure the last file is ragged + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}"); + runner.setVariable(MAX_ROWS_KEY, "9"); runner.run(); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 12); @@ -675,7 +679,8 @@ public class QueryDatabaseTableTest { cal.add(Calendar.MINUTE, 1); } - runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" + TABLE_NAME_KEY + "}"); + runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE"); runner.setIncomingConnection(false); runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on"); @@ -687,14 +692,14 @@ public class QueryDatabaseTableTest { runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); assertEquals(4, getNumberOfRecordsFromStream(in)); - runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER); + runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER); runner.clearTransferState(); // Run again, this time no flowfiles/rows should be transferred // Validate Max Value doesn't change also runner.run(); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); - runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER); + runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER); runner.clearTransferState(); // Append a new row, expect 1 flowfile one row @@ -707,7 +712,7 @@ public class QueryDatabaseTableTest { runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); assertEquals(1, getNumberOfRecordsFromStream(in)); - runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:10:00.0", Scope.CLUSTER); + runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:10:00.0", Scope.CLUSTER); runner.clearTransferState(); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 8a8aa01220..f79f96cbc8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.processor.exception.ProcessException; @@ -95,7 +96,7 @@ public class TestGenerateTableFetch { final DBCPService dbcp = new DBCPServiceSimpleImpl(); final Map dbcpProperties = new HashMap<>(); - runner = TestRunners.newTestRunner(GenerateTableFetch.class); + runner = TestRunners.newTestRunner(processor); runner.addControllerService("dbcp", dbcp, dbcpProperties); runner.enableControllerService(dbcp); runner.setProperty(GenerateTableFetch.DBCP_SERVICE, "dbcp"); @@ -251,9 +252,271 @@ public class TestGenerateTableFetch { runner.clearTransferState(); } + @Test + public void testMultiplePartitionsIncomingFlowFiles() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE1"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE1 (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE1 (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE1 (id, bucket) VALUES (1, 0)"); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE2"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, bucket) VALUES (0, 0)"); + + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}"); + runner.setIncomingConnection(true); + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "${partSize}"); + + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE1"); + put("partSize", "1"); + }}); + + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE2"); + put("partSize", "2"); + }}); + + // The table does not exist, expect the original flow file to be routed to failure + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE3"); + put("partSize", "1"); + }}); + + runner.run(3); + runner.assertTransferCount(AbstractDatabaseFetchProcessor.REL_SUCCESS, 3); + + // Two records from table 1 + assertEquals(runner.getFlowFilesForRelationship(AbstractDatabaseFetchProcessor.REL_SUCCESS).stream().filter( + (ff) -> "TEST_QUERY_DB_TABLE1".equals(ff.getAttribute("tableName"))).count(), + 2); + + // One record from table 2 + assertEquals(runner.getFlowFilesForRelationship(AbstractDatabaseFetchProcessor.REL_SUCCESS).stream().filter( + (ff) -> "TEST_QUERY_DB_TABLE2".equals(ff.getAttribute("tableName"))).count(), + 1); + + // Table 3 doesn't exist, should be routed to failure + runner.assertTransferCount(GenerateTableFetch.REL_FAILURE, 1); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE1"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE2"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + } + + @Test + public void testBackwardsCompatibilityStateKeyStaticTableDynamicMaxValues() throws Exception { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(true); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}"); + runner.enqueue("".getBytes(), new HashMap() {{ + put("maxValueCol", "id"); + }}); + + // Pre-populate the state with a key for column name (not fully-qualified) + StateManager stateManager = runner.getStateManager(); + stateManager.setState(new HashMap() {{ + put("id", "0"); + }}, Scope.CLUSTER); + + // Pre-populate the column type map with an entry for id (not fully-qualified) + processor.columnTypeMap.put("id", 4); + + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 0 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + } + + @Test + public void testBackwardsCompatibilityStateKeyDynamicTableDynamicMaxValues() throws Exception { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}"); + runner.setIncomingConnection(true); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}"); + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + put("maxValueCol", "id"); + }}); + + // Pre-populate the state with a key for column name (not fully-qualified) + StateManager stateManager = runner.getStateManager(); + stateManager.setState(new HashMap() {{ + put("id", "0"); + }}, Scope.CLUSTER); + + // Pre-populate the column type map with an entry for id (not fully-qualified) + processor.columnTypeMap.put("id", 4); + + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + // Note there is no WHERE clause here. Because we are using dynamic tables, the old state key/value is not retrieved + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + + runner.clearTransferState(); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)"); + + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + put("maxValueCol", "id"); + }}); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + } + + @Test + public void testBackwardsCompatibilityStateKeyDynamicTableStaticMaxValues() throws Exception { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}"); + runner.setIncomingConnection(true); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "id"); + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + }}); + + // Pre-populate the state with a key for column name (not fully-qualified) + StateManager stateManager = runner.getStateManager(); + stateManager.setState(new HashMap() {{ + put("id", "0"); + }}, Scope.CLUSTER); + + // Pre-populate the column type map with an entry for id (not fully-qualified) + processor.columnTypeMap.put("id", 4); + + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + // Note there is no WHERE clause here. Because we are using dynamic tables, the old state key/value is not retrieved + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + + runner.clearTransferState(); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)"); + + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + put("maxValueCol", "id"); + }}); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + } + + @Test + public void testBackwardsCompatibilityStateKeyVariableRegistry() throws Exception { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}"); + runner.setIncomingConnection(false); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}"); + + runner.setVariable("tableName", "TEST_QUERY_DB_TABLE"); + runner.setVariable("maxValueCol", "id"); + + // Pre-populate the state with a key for column name (not fully-qualified) + StateManager stateManager = runner.getStateManager(); + stateManager.setState(new HashMap() {{ + put("id", "0"); + }}, Scope.CLUSTER); + + // Pre-populate the column type map with an entry for id (not fully-qualified) + processor.columnTypeMap.put("id", 4); + + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + // Note there is no WHERE clause here. Because we are using dynamic tables (i.e. Expression Language, + // even when not referring to flow file attributes), the old state key/value is not retrieved + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + } + /** - * Simple implementation only for ListDatabaseTables processor testing. + * Simple implementation only for GenerateTableFetch processor testing. */ private class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {