diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index e11a888fe5..ebe23d0290 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -21,6 +21,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; @@ -82,6 +83,8 @@ import static java.sql.Types.VARCHAR; */ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFactoryProcessor { + public static final String INITIAL_MAX_VALUE_PROP_START = "initial.maxvalue."; + // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -165,6 +168,9 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact private static SimpleDateFormat TIME_TYPE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS"); + // A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time + protected Map maxValueProperties; + static { // Load the DatabaseAdapters ArrayList dbAdapterValues = new ArrayList<>(); @@ -185,6 +191,18 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact .build(); } + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + // A common validation procedure for DB fetch processors, it stores whether the Table Name and/or Max Value Column properties have expression language protected Collection customValidate(ValidationContext validationContext) { // For backwards-compatibility, keep track of whether the table name and max-value column properties are dynamic (i.e. has expression language) @@ -424,4 +442,20 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact } return sb.toString(); } + + protected Map getDefaultMaxValueProperties(final Map properties){ + final Map defaultMaxValues = new HashMap<>(); + + for (final Map.Entry entry : properties.entrySet()) { + final String key = entry.getKey().getName(); + + if(!key.startsWith(INITIAL_MAX_VALUE_PROP_START)) { + continue; + } + + defaultMaxValues.put(key.substring(INITIAL_MAX_VALUE_PROP_START.length()), entry.getValue()); + } + + return defaultMaxValues; + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 55fc06c9de..1fcb33ef83 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; @@ -94,6 +95,8 @@ import java.util.stream.IntStream; @WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."), @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.") }) +@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial " + + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.") public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder() @@ -150,6 +153,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { @Override @OnScheduled public void setup(final ProcessContext context) { + maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); // Pre-fetch the column types if using a static table name and max-value columns if (!isDynamicTableName && !isDynamicMaxValues) { super.setup(context); @@ -198,6 +202,24 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { // set as the current state map (after the session has been committed) final Map statePropertyMap = new HashMap<>(stateMap.toMap()); + // If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map + for (final Map.Entry maxProp : maxValueProperties.entrySet()) { + String maxPropKey = maxProp.getKey().toLowerCase(); + String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey); + if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) { + String newMaxPropValue; + // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name, + // but store the new initial max value under the fully-qualified key. + if (statePropertyMap.containsKey(maxPropKey)) { + newMaxPropValue = statePropertyMap.get(maxPropKey); + } else { + newMaxPropValue = maxProp.getValue(); + } + statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue); + } + } + // Build a WHERE clause with maximum-value columns (if they exist), and a list of column names that will contain MAX() aliases. The // executed SQL query will retrieve the count of all records after the filter(s) have been applied, as well as the new maximum values for the // specified columns. This allows the processor to generate the correctly partitioned SQL statements as well as to update the state with the diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index f23b228c42..8ae157aadb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -33,7 +33,6 @@ import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.dbcp.DBCPService; -import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; @@ -110,8 +109,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { public static final String RESULT_TABLENAME = "tablename"; public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; - public static final String INTIIAL_MAX_VALUE_PROP_START = "initial.maxvalue."; - public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() .name("Fetch Size") @@ -177,20 +174,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { return propDescriptors; } - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .required(false) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) - .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) - .expressionLanguageSupported(true) - .dynamic(true) - .build(); - } - @OnScheduled public void setup(final ProcessContext context) { + maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); super.setup(context); } @@ -220,8 +206,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { .defaultScale(context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger()) .build(); - final Map maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); - final StateManager stateManager = context.getStateManager(); final StateMap stateMap; @@ -417,23 +401,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { return query.toString(); } - - protected Map getDefaultMaxValueProperties(final Map properties){ - final Map defaultMaxValues = new HashMap<>(); - - for (final Map.Entry entry : properties.entrySet()) { - final String key = entry.getKey().getName(); - - if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) { - continue; - } - - defaultMaxValues.put(key.substring(INTIIAL_MAX_VALUE_PROP_START.length()), entry.getValue()); - } - - return defaultMaxValues; - } - protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback { DatabaseAdapter dbAdapter; Map newColMap; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index a9b675c8bf..67ab72f30f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -584,6 +584,75 @@ public class TestGenerateTableFetch { runner.clearTransferState(); } + @Test + public void testInitialMaxValue() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID"); + runner.setProperty("initial.maxvalue.ID", "1"); + + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + String query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query); + ResultSet resultSet = stmt.executeQuery(query); + // Should be one record (the initial max value skips the first two) + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + runner.clearTransferState(); + + // Run again, this time no flowfiles/rows should be transferred + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0); + runner.clearTransferState(); + + // Add 3 new rows with a higher ID and run with a partition size of 2. Two flow files should be transferred + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (5, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')"); + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2"); + runner.setProperty("initial.maxvalue.ID", "5"); // This should have no effect as there is a max value in the processor state + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2); + + // Verify first flow file's contents + flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query); + resultSet = stmt.executeQuery(query); + // Should be two records + assertTrue(resultSet.next()); + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + + // Verify second flow file's contents + flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1); + query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query); + resultSet = stmt.executeQuery(query); + // Should be one record + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + runner.clearTransferState(); + } + /** * Simple implementation only for GenerateTableFetch processor testing.