NIFI-3335: Add initial.maxvalue support to GenerateTableFetch

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2039.
This commit is contained in:
Matt Burgess 2017-07-25 16:07:52 -04:00 committed by Pierre Villard
parent 759f81bc1b
commit dc4006f423
4 changed files with 126 additions and 34 deletions

View File

@ -21,6 +21,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
@ -82,6 +83,8 @@ import static java.sql.Types.VARCHAR;
*/
public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFactoryProcessor {
public static final String INITIAL_MAX_VALUE_PROP_START = "initial.maxvalue.";
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -165,6 +168,9 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
private static SimpleDateFormat TIME_TYPE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS");
// A Map (name to value) of initial maximum-value properties, filled at schedule-time and used at trigger-time
protected Map<String,String> maxValueProperties;
static {
// Load the DatabaseAdapters
ArrayList<AllowableValue> dbAdapterValues = new ArrayList<>();
@ -185,6 +191,18 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
.build();
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
// A common validation procedure for DB fetch processors, it stores whether the Table Name and/or Max Value Column properties have expression language
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
// For backwards-compatibility, keep track of whether the table name and max-value column properties are dynamic (i.e. has expression language)
@ -424,4 +442,20 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
}
return sb.toString();
}
protected Map<String,String> getDefaultMaxValueProperties(final Map<PropertyDescriptor, String> properties){
final Map<String,String> defaultMaxValues = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final String key = entry.getKey().getName();
if(!key.startsWith(INITIAL_MAX_VALUE_PROP_START)) {
continue;
}
defaultMaxValues.put(key.substring(INITIAL_MAX_VALUE_PROP_START.length()), entry.getValue());
}
return defaultMaxValues;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
@ -94,6 +95,8 @@ import java.util.stream.IntStream;
@WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
@WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
})
@DynamicProperty(name = "Initial Max Value", value = "Attribute Expression Language", supportsExpressionLanguage = false, description = "Specifies an initial "
+ "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.")
public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
@ -150,6 +153,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
@Override
@OnScheduled
public void setup(final ProcessContext context) {
maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
// Pre-fetch the column types if using a static table name and max-value columns
if (!isDynamicTableName && !isDynamicMaxValues) {
super.setup(context);
@ -198,6 +202,24 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
// set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
// If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
String maxPropKey = maxProp.getKey().toLowerCase();
String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey);
if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
String newMaxPropValue;
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name,
// but store the new initial max value under the fully-qualified key.
if (statePropertyMap.containsKey(maxPropKey)) {
newMaxPropValue = statePropertyMap.get(maxPropKey);
} else {
newMaxPropValue = maxProp.getValue();
}
statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
}
}
// Build a WHERE clause with maximum-value columns (if they exist), and a list of column names that will contain MAX(<column>) aliases. The
// executed SQL query will retrieve the count of all records after the filter(s) have been applied, as well as the new maximum values for the
// specified columns. This allows the processor to generate the correctly partitioned SQL statements as well as to update the state with the

View File

@ -33,7 +33,6 @@ import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
@ -110,8 +109,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public static final String RESULT_TABLENAME = "tablename";
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
public static final String INTIIAL_MAX_VALUE_PROP_START = "initial.maxvalue.";
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Fetch Size")
@ -177,20 +174,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
return propDescriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
@OnScheduled
public void setup(final ProcessContext context) {
maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
super.setup(context);
}
@ -220,8 +206,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
.defaultScale(context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger())
.build();
final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
@ -417,23 +401,6 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
return query.toString();
}
protected Map<String,String> getDefaultMaxValueProperties(final Map<PropertyDescriptor, String> properties){
final Map<String,String> defaultMaxValues = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final String key = entry.getKey().getName();
if(!key.startsWith(INTIIAL_MAX_VALUE_PROP_START)) {
continue;
}
defaultMaxValues.put(key.substring(INTIIAL_MAX_VALUE_PROP_START.length()), entry.getValue());
}
return defaultMaxValues;
}
protected class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
DatabaseAdapter dbAdapter;
Map<String, String> newColMap;

View File

@ -584,6 +584,75 @@ public class TestGenerateTableFetch {
runner.clearTransferState();
}
@Test
public void testInitialMaxValue() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (0, 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (1, 'Carrie Jones', 5.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (2, NULL, 2.0, '2010-01-01 00:00:00')");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(false);
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID");
runner.setProperty("initial.maxvalue.ID", "1");
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 1 ORDER BY ID FETCH NEXT 10000 ROWS ONLY", query);
ResultSet resultSet = stmt.executeQuery(query);
// Should be one record (the initial max value skips the first two)
assertTrue(resultSet.next());
assertFalse(resultSet.next());
runner.clearTransferState();
// Run again, this time no flowfiles/rows should be transferred
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 0);
runner.clearTransferState();
// Add 3 new rows with a higher ID and run with a partition size of 2. Two flow files should be transferred
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (3, 'Mary West', 15.0, '2000-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (4, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (5, 'Marty Johnson', 15.0, '2011-01-01 03:23:34.234')");
runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "2");
runner.setProperty("initial.maxvalue.ID", "5"); // This should have no effect as there is a max value in the processor state
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
// Verify first flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID FETCH NEXT 2 ROWS ONLY", query);
resultSet = stmt.executeQuery(query);
// Should be two records
assertTrue(resultSet.next());
assertTrue(resultSet.next());
assertFalse(resultSet.next());
// Verify second flow file's contents
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 ORDER BY ID OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", query);
resultSet = stmt.executeQuery(query);
// Should be one record
assertTrue(resultSet.next());
assertFalse(resultSet.next());
runner.clearTransferState();
}
/**
* Simple implementation only for GenerateTableFetch processor testing.