diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index 1f269766ac..fa2a86e9d4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -22,6 +22,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; @@ -222,7 +223,11 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact } public void setup(final ProcessContext context) { - final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); + setup(context,true,null); + } + + public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFile flowFile) { + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue(); // If there are no max-value column names specified, we don't need to perform this processing if (StringUtils.isEmpty(maxValueColumnNames)) { @@ -231,7 +236,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact // Try to fill the columnTypeMap with the types of the desired max-value columns final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); try (final Connection con = dbcpService.getConnection(); @@ -245,7 +250,9 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); int numCols = resultSetMetaData.getColumnCount(); if (numCols > 0) { - columnTypeMap.clear(); + if (shouldCleanCache){ + columnTypeMap.clear(); + } for (int i = 1; i <= numCols; i++) { String colName = resultSetMetaData.getColumnName(i).toLowerCase(); String colKey = getStateKey(tableName, colName); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 12278a3ff2..1bae371515 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -86,12 +86,12 @@ import java.util.stream.IntStream; + "per the State Management documentation") @WritesAttributes({ @WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes " - + "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."), + + "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."), @WritesAttribute(attribute = "generatetablefetch.tableName", description = "The name of the database table to be queried."), @WritesAttribute(attribute = "generatetablefetch.columnNames", description = "The comma-separated list of column names used in the query."), @WritesAttribute(attribute = "generatetablefetch.whereClause", description = "Where clause used in the query to get the expected rows."), @WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data " - + "that has been returned since the processor started running."), + + "that has been returned since the processor started running."), @WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."), @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.") }) @@ -155,11 +155,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { @OnScheduled public void setup(final ProcessContext context) { maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); - // Pre-fetch the column types if using a static table name and max-value columns if (!isDynamicTableName && !isDynamicMaxValues) { super.setup(context); } - if(context.hasIncomingConnection() && !context.hasNonLoopConnection()) { + if (context.hasIncomingConnection() && !context.hasNonLoopConnection()) { getLogger().error("The failure relationship can be used only if there is another incoming connection to this processor."); } } @@ -190,6 +189,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { final StateManager stateManager = context.getStateManager(); final StateMap stateMap; + FlowFile finalFileToProcess = fileToProcess; + try { stateMap = stateManager.getState(Scope.CLUSTER); @@ -243,6 +244,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { maxValueSelectColumns.add("MAX(" + colName + ") " + colName); String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName); if (!StringUtils.isEmpty(maxValue)) { + if(columnTypeMap.isEmpty()){ + // This means column type cache is clean after instance reboot. We should re-cache column type + super.setup(context, false, finalFileToProcess); + } Integer type = getColumnType(tableName, colName); // Add a condition for the WHERE clause @@ -250,7 +255,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { } }); - if(customWhereClause != null) { + if (customWhereClause != null) { // adding the custom WHERE clause (if defined) to the list of existing clauses. maxValueClauses.add("(" + customWhereClause + ")"); } @@ -263,7 +268,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { long rowCount = 0; try (final Connection con = dbcpService.getConnection(); - final Statement st = con.createStatement()) { + final Statement st = con.createStatement()) { final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue(); st.setQueryTimeout(queryTimeout); // timeout in seconds @@ -283,7 +288,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { //Some JDBC drivers consider the columns name and label to be very different things. // Since this column has been aliased lets check the label first, // if there is no label we'll use the column name. - String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i))?rsmd.getColumnLabel(i):rsmd.getColumnName(i)).toLowerCase(); + String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i)) ? rsmd.getColumnLabel(i) : rsmd.getColumnName(i)).toLowerCase(); String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName); String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey); if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) { @@ -321,6 +326,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { maxValueSelectColumns.add("MAX(" + colName + ") " + colName); String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName); if (!StringUtils.isEmpty(maxValue)) { + if(columnTypeMap.isEmpty()){ + // This means column type cache is clean after instance reboot. We should re-cache column type + super.setup(context, false, finalFileToProcess); + } Integer type = getColumnType(tableName, colName); // Add a condition for the WHERE clause @@ -411,7 +420,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { } if (type == null) { // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed - throw new IllegalArgumentException("No column type found for: " + colName); + throw new ProcessException("No column type cache found for: " + colName); } return type; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index d8791a5a6b..67a9badec6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -1040,6 +1040,61 @@ public class TestGenerateTableFetch { runner.clearTransferState(); } + @Test + public void testColumnTypeMissing() throws ClassNotFoundException, SQLException, InitializationException, IOException { + // Load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(true); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}"); + runner.setIncomingConnection(true); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}"); + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + put("maxValueCol", "id"); + }}); + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + String query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id <= 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", query); + runner.clearTransferState(); + + + // Clear columnTypeMap to simulate it's clean after instance reboot + processor.columnTypeMap.clear(); + + // Insert new records + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)"); + + // Re-launch FlowFile to se if re-cache column type works + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + put("maxValueCol", "id"); + }}); + + // It should re-cache column type + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + query = new String(flowFile.toByteArray()); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 AND id <= 2 ORDER BY id FETCH NEXT 10000 ROWS ONLY", query); + runner.clearTransferState(); + } + /** * Simple implementation only for GenerateTableFetch processor testing. */