From b5938062a8ff0d7c8bb2c1985dc2605b2a69b6dd Mon Sep 17 00:00:00 2001 From: Deon Huang Date: Mon, 29 Jan 2018 23:29:21 +0800 Subject: [PATCH] NIFI-4395 GenerateTableFetch can't fetch column type by state after instance reboot Add multiple states recover Signed-off-by: Matthew Burgess This closes #2442 --- .../standard/GenerateTableFetch.java | 8 +-- .../standard/TestGenerateTableFetch.java | 61 +++++++++++++++++++ 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 8f535b3838..188e282229 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -245,7 +245,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { maxValueSelectColumns.add("MAX(" + colName + ") " + colName); String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName); if (!StringUtils.isEmpty(maxValue)) { - if(columnTypeMap.isEmpty()){ + if(columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null){ // This means column type cache is clean after instance reboot. We should re-cache column type super.setup(context, false, finalFileToProcess); } @@ -327,7 +327,7 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { maxValueSelectColumns.add("MAX(" + colName + ") " + colName); String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName); if (!StringUtils.isEmpty(maxValue)) { - if(columnTypeMap.isEmpty()){ + if(columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null){ // This means column type cache is clean after instance reboot. We should re-cache column type super.setup(context, false, finalFileToProcess); } @@ -419,10 +419,6 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { // If the table name is static and the fully-qualified key was not found, try just the column name type = columnTypeMap.get(getStateKey(null, colName)); } - if (type == null) { - // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed - throw new ProcessException("No column type cache found for: " + colName); - } return type; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 67a9badec6..f20dee8690 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -1095,6 +1095,67 @@ public class TestGenerateTableFetch { runner.clearTransferState(); } + @Test + public void testMultipleColumnTypeMissing() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // Load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + stmt.execute("drop table TEST_QUERY_DB_TABLE_2"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + // Create multiple table to invoke processor state stored + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + stmt.execute("create table TEST_QUERY_DB_TABLE_2 (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE_2 (id, bucket) VALUES (1, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}"); + runner.setIncomingConnection(true); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}"); + + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + put("maxValueCol", "id"); + }}); + + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE_2"); + put("maxValueCol", "id"); + }}); + runner.run(2); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 2); + + assertEquals(2,processor.columnTypeMap.size()); + runner.clearTransferState(); + + + // Remove one element from columnTypeMap to simulate it's re-cache partial state + Map.Entry entry = processor.columnTypeMap.entrySet().iterator().next(); + String key = entry.getKey(); + processor.columnTypeMap.remove(key); + + // Insert new records + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)"); + + // Re-launch FlowFile to se if re-cache column type works + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + put("maxValueCol", "id"); + }}); + + // It should re-cache column type + runner.run(); + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + assertEquals(2,processor.columnTypeMap.size()); + runner.clearTransferState(); + } + /** * Simple implementation only for GenerateTableFetch processor testing. */