NIFI-4395 GenerateTableFetch can't fetch column type by state after instance reboot

Add multiple states recover

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2442
This commit is contained in:
Deon Huang 2018-01-29 23:29:21 +08:00 committed by Matthew Burgess
parent 9bc00b6b64
commit b5938062a8
2 changed files with 63 additions and 6 deletions

View File

@ -245,7 +245,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
if (!StringUtils.isEmpty(maxValue)) {
if(columnTypeMap.isEmpty()){
if(columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null){
// This means column type cache is clean after instance reboot. We should re-cache column type
super.setup(context, false, finalFileToProcess);
}
@ -327,7 +327,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
if (!StringUtils.isEmpty(maxValue)) {
if(columnTypeMap.isEmpty()){
if(columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null){
// This means column type cache is clean after instance reboot. We should re-cache column type
super.setup(context, false, finalFileToProcess);
}
@ -419,10 +419,6 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor {
// If the table name is static and the fully-qualified key was not found, try just the column name
type = columnTypeMap.get(getStateKey(null, colName));
}
if (type == null) {
// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
throw new ProcessException("No column type cache found for: " + colName);
}
return type;
}

View File

@ -1095,6 +1095,67 @@ public class TestGenerateTableFetch {
runner.clearTransferState();
}
@Test
public void testMultipleColumnTypeMissing() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// Load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
stmt.execute("drop table TEST_QUERY_DB_TABLE_2");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
// Create multiple table to invoke processor state stored
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)");
stmt.execute("create table TEST_QUERY_DB_TABLE_2 (id integer not null, bucket integer not null)");
stmt.execute("insert into TEST_QUERY_DB_TABLE_2 (id, bucket) VALUES (1, 0)");
runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}");
runner.setIncomingConnection(true);
runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("tableName", "TEST_QUERY_DB_TABLE");
put("maxValueCol", "id");
}});
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("tableName", "TEST_QUERY_DB_TABLE_2");
put("maxValueCol", "id");
}});
runner.run(2);
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2);
assertEquals(2,processor.columnTypeMap.size());
runner.clearTransferState();
// Remove one element from columnTypeMap to simulate it's re-cache partial state
Map.Entry<String,Integer> entry = processor.columnTypeMap.entrySet().iterator().next();
String key = entry.getKey();
processor.columnTypeMap.remove(key);
// Insert new records
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)");
// Re-launch FlowFile to se if re-cache column type works
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("tableName", "TEST_QUERY_DB_TABLE");
put("maxValueCol", "id");
}});
// It should re-cache column type
runner.run();
runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
assertEquals(2,processor.columnTypeMap.size());
runner.clearTransferState();
}
/**
* Simple implementation only for GenerateTableFetch processor testing.
*/