NIFI-4395 - GenerateTableFetch can't fetch column type by state after instance reboot

NIFI-4395: Updated unit test for GenerateTableFetch
Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2166
This commit is contained in:
Deon Huang 2017-09-21 15:34:00 +08:00 committed by Matthew Burgess
parent 9e2c7be7d3
commit a29348f2a4
3 changed files with 82 additions and 11 deletions

View File

@ -22,6 +22,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
@ -222,7 +223,11 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
}
public void setup(final ProcessContext context) {
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
setup(context,true,null);
}
public void setup(final ProcessContext context, boolean shouldCleanCache, FlowFile flowFile) {
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(flowFile).getValue();
// If there are no max-value column names specified, we don't need to perform this processing
if (StringUtils.isEmpty(maxValueColumnNames)) {
@ -231,7 +236,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
// Try to fill the columnTypeMap with the types of the desired max-value columns
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
try (final Connection con = dbcpService.getConnection();
@ -245,7 +250,9 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int numCols = resultSetMetaData.getColumnCount();
if (numCols > 0) {
columnTypeMap.clear();
if (shouldCleanCache){
columnTypeMap.clear();
}
for (int i = 1; i <= numCols; i++) {
String colName = resultSetMetaData.getColumnName(i).toLowerCase();
String colKey = getStateKey(tableName, colName);

View File

@ -86,12 +86,12 @@ import java.util.stream.IntStream;
+ "per the State Management documentation")
@WritesAttributes({
@WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes "
+ "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."),
+ "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."),
@WritesAttribute(attribute = "generatetablefetch.tableName", description = "The name of the database table to be queried."),
@WritesAttribute(attribute = "generatetablefetch.columnNames", description = "The comma-separated list of column names used in the query."),
@WritesAttribute(attribute = "generatetablefetch.whereClause", description = "Where clause used in the query to get the expected rows."),
@WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data "
+ "that has been returned since the processor started running."),
+ "that has been returned since the processor started running."),
@WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
@WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
})
@ -155,11 +155,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
@OnScheduled
public void setup(final ProcessContext context) {
maxValueProperties = getDefaultMaxValueProperties(context.getProperties());
// Pre-fetch the column types if using a static table name and max-value columns
if (!isDynamicTableName && !isDynamicMaxValues) {
super.setup(context);
}
if(context.hasIncomingConnection() && !context.hasNonLoopConnection()) {
if (context.hasIncomingConnection() && !context.hasNonLoopConnection()) {
getLogger().error("The failure relationship can be used only if there is another incoming connection to this processor.");
}
}
@ -190,6 +189,8 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
FlowFile finalFileToProcess = fileToProcess;
try {
stateMap = stateManager.getState(Scope.CLUSTER);
@ -243,6 +244,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
if (!StringUtils.isEmpty(maxValue)) {
if(columnTypeMap.isEmpty()){
// This means column type cache is clean after instance reboot. We should re-cache column type
super.setup(context, false, finalFileToProcess);
}
Integer type = getColumnType(tableName, colName);
// Add a condition for the WHERE clause
@ -250,7 +255,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
}
});
if(customWhereClause != null) {
if (customWhereClause != null) {
// adding the custom WHERE clause (if defined) to the list of existing clauses.
maxValueClauses.add("(" + customWhereClause + ")");
}
@ -263,7 +268,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
long rowCount = 0;
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
final Statement st = con.createStatement()) {
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
@ -283,7 +288,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
//Some JDBC drivers consider the columns name and label to be very different things.
// Since this column has been aliased lets check the label first,
// if there is no label we'll use the column name.
String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i))?rsmd.getColumnLabel(i):rsmd.getColumnName(i)).toLowerCase();
String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i)) ? rsmd.getColumnLabel(i) : rsmd.getColumnName(i)).toLowerCase();
String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName);
String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey);
if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) {
@ -321,6 +326,10 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
if (!StringUtils.isEmpty(maxValue)) {
if(columnTypeMap.isEmpty()){
// This means column type cache is clean after instance reboot. We should re-cache column type
super.setup(context, false, finalFileToProcess);
}
Integer type = getColumnType(tableName, colName);
// Add a condition for the WHERE clause
@ -411,7 +420,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
}
if (type == null) {
// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
throw new IllegalArgumentException("No column type found for: " + colName);
throw new ProcessException("No column type cache found for: " + colName);
}
return type;

View File

@ -1040,6 +1040,61 @@ public class TestGenerateTableFetch {
runner.clearTransferState();
}
@Test
public void testColumnTypeMissing() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// Load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE");
runner.setIncomingConnection(true);
runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}");
runner.setIncomingConnection(true);
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("tableName", "TEST_QUERY_DB_TABLE");
put("maxValueCol", "id");
}});
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
String query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id <= 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", query);
runner.clearTransferState();
// Clear columnTypeMap to simulate it's clean after instance reboot
processor.columnTypeMap.clear();
// Insert new records
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
// Re-launch FlowFile to se if re-cache column type works
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("tableName", "TEST_QUERY_DB_TABLE");
put("maxValueCol", "id");
}});
// It should re-cache column type
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
query = new String(flowFile.toByteArray());
assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 AND id <= 2 ORDER BY id FETCH NEXT 10000 ROWS ONLY", query);
runner.clearTransferState();
}
/**
* Simple implementation only for GenerateTableFetch processor testing.
*/