From 08dcd4af14386183d0cd84690772b5ad93f75b9f Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Wed, 8 Apr 2020 23:27:50 +0200 Subject: [PATCH] NIFI-7345: Fixed Hive database and table names case insensitivity in Atlas reporting task Signed-off-by: Pierre Villard This closes #4198. --- .../analyzer/DatabaseAnalyzerUtil.java | 3 +- .../atlas/provenance/analyzer/Hive2JDBC.java | 4 +- .../provenance/analyzer/TestHive2JDBC.java | 68 +++++++++---------- .../analyzer/TestPutHiveStreaming.java | 6 +- 4 files changed, 41 insertions(+), 40 deletions(-) diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java index 63ab1bf529..94db2c8dcf 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/DatabaseAnalyzerUtil.java @@ -52,7 +52,8 @@ public class DatabaseAnalyzerUtil { } final String databaseName = tableNameSplit.length == 2 ? tableNameSplit[0] : connectedDatabaseName; final String tableName = tableNameSplit.length == 2 ? tableNameSplit[1] : tableNameSplit[0]; - return new Tuple<>(databaseName, tableName); + // Handle case insensitivity of database and table names in Hive: send names uniformly in lower case + return new Tuple<>(databaseName.toLowerCase(), tableName.toLowerCase()); } public static String toTableNameStr(Tuple tableName) { diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java index ffed41f98d..7821f866df 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java @@ -94,8 +94,8 @@ public class Hive2JDBC extends AbstractHiveAnalyzer { if (inputTables.isEmpty() && outputTables.isEmpty()) { // If input/output tables are unknown, create database level lineage. - return getDatabaseRef(event.getComponentId(), event.getEventType(), - clusterName, connectedDatabaseName); + // Handle case insensitivity of database and table names in Hive: send names uniformly in lower case + return getDatabaseRef(event.getComponentId(), event.getEventType(), clusterName, connectedDatabaseName.toLowerCase()); } final DataSetRefs refs = new DataSetRefs(event.getComponentId()); diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java index 5d5fcd62cc..e7e6a91519 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java @@ -50,7 +50,7 @@ public class TestHive2JDBC { @Test public void testDatabaseLineage() { final String processorName = "PutHiveQL"; - final String transitUri = "jdbc:hive2://0.example.com:10000/databaseA"; + final String transitUri = "jdbc:hive2://0.example.com:10000/database_A"; final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); when(record.getComponentType()).thenReturn(processorName); when(record.getTransitUri()).thenReturn(transitUri); @@ -70,8 +70,8 @@ public class TestHive2JDBC { assertEquals(1, refs.getOutputs().size()); Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_db", ref.getTypeName()); - assertEquals("databaseA", ref.get(ATTR_NAME)); - assertEquals("databaseA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("database_a", ref.get(ATTR_NAME)); + assertEquals("database_a@cluster1", ref.get(ATTR_QUALIFIED_NAME)); } /** @@ -81,14 +81,14 @@ public class TestHive2JDBC { @Test public void testTableLineage() { final String processorName = "PutHiveQL"; - final String transitUri = "jdbc:hive2://0.example.com:10000/databaseA"; + final String transitUri = "jdbc:hive2://0.example.com:10000/database_A"; final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); when(record.getComponentType()).thenReturn(processorName); when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id - when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2"); - when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1"); + when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2"); + when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1"); final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); @@ -103,8 +103,8 @@ public class TestHive2JDBC { assertEquals(2, refs.getInputs().size()); // QualifiedName : Name final Map expectedInputRefs = new HashMap<>(); - expectedInputRefs.put("databaseA.tableA1@cluster1", "tableA1"); - expectedInputRefs.put("databaseA.tableA2@cluster1", "tableA2"); + expectedInputRefs.put("database_a.table_a1@cluster1", "table_a1"); + expectedInputRefs.put("database_a.table_a2@cluster1", "table_a2"); for (Referenceable ref : refs.getInputs()) { final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); assertTrue(expectedInputRefs.containsKey(qName)); @@ -114,8 +114,8 @@ public class TestHive2JDBC { assertEquals(1, refs.getOutputs().size()); Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_table", ref.getTypeName()); - assertEquals("tableB1", ref.get(ATTR_NAME)); - assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("table_b1", ref.get(ATTR_NAME)); + assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); } /** @@ -131,8 +131,8 @@ public class TestHive2JDBC { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id - when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2"); - when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1"); + when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2"); + when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1"); final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); @@ -147,8 +147,8 @@ public class TestHive2JDBC { assertEquals(2, refs.getInputs().size()); // QualifiedName : Name final Map expectedInputRefs = new HashMap<>(); - expectedInputRefs.put("default.tableA1@cluster1", "tableA1"); - expectedInputRefs.put("default.tableA2@cluster1", "tableA2"); + expectedInputRefs.put("default.table_a1@cluster1", "table_a1"); + expectedInputRefs.put("default.table_a2@cluster1", "table_a2"); for (Referenceable ref : refs.getInputs()) { final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); assertTrue(expectedInputRefs.containsKey(qName)); @@ -158,8 +158,8 @@ public class TestHive2JDBC { assertEquals(1, refs.getOutputs().size()); Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_table", ref.getTypeName()); - assertEquals("tableB1", ref.get(ATTR_NAME)); - assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("table_b1", ref.get(ATTR_NAME)); + assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); } /** @@ -174,8 +174,8 @@ public class TestHive2JDBC { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id - when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2"); - when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1"); + when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2"); + when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1"); final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); @@ -190,8 +190,8 @@ public class TestHive2JDBC { assertEquals(2, refs.getInputs().size()); // QualifiedName : Name final Map expectedInputRefs = new HashMap<>(); - expectedInputRefs.put("default.tableA1@cluster1", "tableA1"); - expectedInputRefs.put("default.tableA2@cluster1", "tableA2"); + expectedInputRefs.put("default.table_a1@cluster1", "table_a1"); + expectedInputRefs.put("default.table_a2@cluster1", "table_a2"); for (Referenceable ref : refs.getInputs()) { final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); assertTrue(expectedInputRefs.containsKey(qName)); @@ -201,8 +201,8 @@ public class TestHive2JDBC { assertEquals(1, refs.getOutputs().size()); Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_table", ref.getTypeName()); - assertEquals("tableB1", ref.get(ATTR_NAME)); - assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("table_b1", ref.get(ATTR_NAME)); + assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); } /** @@ -219,8 +219,8 @@ public class TestHive2JDBC { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id - when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2"); - when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1"); + when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2"); + when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1"); final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"), eq("2.example.com"))).thenReturn("cluster1"); @@ -235,8 +235,8 @@ public class TestHive2JDBC { assertEquals(2, refs.getInputs().size()); // QualifiedName : Name final Map expectedInputRefs = new HashMap<>(); - expectedInputRefs.put("default.tableA1@cluster1", "tableA1"); - expectedInputRefs.put("default.tableA2@cluster1", "tableA2"); + expectedInputRefs.put("default.table_a1@cluster1", "table_a1"); + expectedInputRefs.put("default.table_a2@cluster1", "table_a2"); for (Referenceable ref : refs.getInputs()) { final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); assertTrue(expectedInputRefs.containsKey(qName)); @@ -246,8 +246,8 @@ public class TestHive2JDBC { assertEquals(1, refs.getOutputs().size()); Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_table", ref.getTypeName()); - assertEquals("tableB1", ref.get(ATTR_NAME)); - assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("table_b1", ref.get(ATTR_NAME)); + assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); } /** @@ -262,8 +262,8 @@ public class TestHive2JDBC { when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id - when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2"); - when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1"); + when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2"); + when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1"); final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("cluster1"); @@ -278,8 +278,8 @@ public class TestHive2JDBC { assertEquals(2, refs.getInputs().size()); // QualifiedName : Name final Map expectedInputRefs = new HashMap<>(); - expectedInputRefs.put("some_database.tableA1@cluster1", "tableA1"); - expectedInputRefs.put("some_database.tableA2@cluster1", "tableA2"); + expectedInputRefs.put("some_database.table_a1@cluster1", "table_a1"); + expectedInputRefs.put("some_database.table_a2@cluster1", "table_a2"); for (Referenceable ref : refs.getInputs()) { final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); assertTrue(expectedInputRefs.containsKey(qName)); @@ -289,8 +289,8 @@ public class TestHive2JDBC { assertEquals(1, refs.getOutputs().size()); Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_table", ref.getTypeName()); - assertEquals("tableB1", ref.get(ATTR_NAME)); - assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("table_b1", ref.get(ATTR_NAME)); + assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java index 606f6d5e7b..0194cddb3c 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java @@ -53,7 +53,7 @@ public class TestPutHiveStreaming { when(record.getComponentType()).thenReturn(processorName); when(record.getTransitUri()).thenReturn(transitUri); when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); - when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseA.tableA"); + when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_A.table_A"); final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); @@ -69,7 +69,7 @@ public class TestPutHiveStreaming { assertEquals(1, refs.getOutputs().size()); Referenceable ref = refs.getOutputs().iterator().next(); assertEquals("hive_table", ref.getTypeName()); - assertEquals("tableA", ref.get(ATTR_NAME)); - assertEquals("databaseA.tableA@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + assertEquals("table_a", ref.get(ATTR_NAME)); + assertEquals("database_a.table_a@cluster1", ref.get(ATTR_QUALIFIED_NAME)); } }