mirror of https://github.com/apache/nifi.git
NIFI-7345: Fixed Hive database and table names case insensitivity in Atlas reporting task
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #4198.
This commit is contained in:
parent
63379c3520
commit
08dcd4af14
|
@ -52,7 +52,8 @@ public class DatabaseAnalyzerUtil {
|
|||
}
|
||||
final String databaseName = tableNameSplit.length == 2 ? tableNameSplit[0] : connectedDatabaseName;
|
||||
final String tableName = tableNameSplit.length == 2 ? tableNameSplit[1] : tableNameSplit[0];
|
||||
return new Tuple<>(databaseName, tableName);
|
||||
// Handle case insensitivity of database and table names in Hive: send names uniformly in lower case
|
||||
return new Tuple<>(databaseName.toLowerCase(), tableName.toLowerCase());
|
||||
}
|
||||
|
||||
public static String toTableNameStr(Tuple<String, String> tableName) {
|
||||
|
|
|
@ -94,8 +94,8 @@ public class Hive2JDBC extends AbstractHiveAnalyzer {
|
|||
|
||||
if (inputTables.isEmpty() && outputTables.isEmpty()) {
|
||||
// If input/output tables are unknown, create database level lineage.
|
||||
return getDatabaseRef(event.getComponentId(), event.getEventType(),
|
||||
clusterName, connectedDatabaseName);
|
||||
// Handle case insensitivity of database and table names in Hive: send names uniformly in lower case
|
||||
return getDatabaseRef(event.getComponentId(), event.getEventType(), clusterName, connectedDatabaseName.toLowerCase());
|
||||
}
|
||||
|
||||
final DataSetRefs refs = new DataSetRefs(event.getComponentId());
|
||||
|
|
|
@ -50,7 +50,7 @@ public class TestHive2JDBC {
|
|||
@Test
|
||||
public void testDatabaseLineage() {
|
||||
final String processorName = "PutHiveQL";
|
||||
final String transitUri = "jdbc:hive2://0.example.com:10000/databaseA";
|
||||
final String transitUri = "jdbc:hive2://0.example.com:10000/database_A";
|
||||
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
|
||||
when(record.getComponentType()).thenReturn(processorName);
|
||||
when(record.getTransitUri()).thenReturn(transitUri);
|
||||
|
@ -70,8 +70,8 @@ public class TestHive2JDBC {
|
|||
assertEquals(1, refs.getOutputs().size());
|
||||
Referenceable ref = refs.getOutputs().iterator().next();
|
||||
assertEquals("hive_db", ref.getTypeName());
|
||||
assertEquals("databaseA", ref.get(ATTR_NAME));
|
||||
assertEquals("databaseA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals("database_a", ref.get(ATTR_NAME));
|
||||
assertEquals("database_a@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -81,14 +81,14 @@ public class TestHive2JDBC {
|
|||
@Test
|
||||
public void testTableLineage() {
|
||||
final String processorName = "PutHiveQL";
|
||||
final String transitUri = "jdbc:hive2://0.example.com:10000/databaseA";
|
||||
final String transitUri = "jdbc:hive2://0.example.com:10000/database_A";
|
||||
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
|
||||
when(record.getComponentType()).thenReturn(processorName);
|
||||
when(record.getTransitUri()).thenReturn(transitUri);
|
||||
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
|
||||
// E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
|
||||
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
|
||||
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
|
||||
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
|
||||
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
|
||||
|
||||
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
|
||||
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
|
||||
|
@ -103,8 +103,8 @@ public class TestHive2JDBC {
|
|||
assertEquals(2, refs.getInputs().size());
|
||||
// QualifiedName : Name
|
||||
final Map<String, String> expectedInputRefs = new HashMap<>();
|
||||
expectedInputRefs.put("databaseA.tableA1@cluster1", "tableA1");
|
||||
expectedInputRefs.put("databaseA.tableA2@cluster1", "tableA2");
|
||||
expectedInputRefs.put("database_a.table_a1@cluster1", "table_a1");
|
||||
expectedInputRefs.put("database_a.table_a2@cluster1", "table_a2");
|
||||
for (Referenceable ref : refs.getInputs()) {
|
||||
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
|
||||
assertTrue(expectedInputRefs.containsKey(qName));
|
||||
|
@ -114,8 +114,8 @@ public class TestHive2JDBC {
|
|||
assertEquals(1, refs.getOutputs().size());
|
||||
Referenceable ref = refs.getOutputs().iterator().next();
|
||||
assertEquals("hive_table", ref.getTypeName());
|
||||
assertEquals("tableB1", ref.get(ATTR_NAME));
|
||||
assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals("table_b1", ref.get(ATTR_NAME));
|
||||
assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -131,8 +131,8 @@ public class TestHive2JDBC {
|
|||
when(record.getTransitUri()).thenReturn(transitUri);
|
||||
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
|
||||
// E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
|
||||
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
|
||||
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
|
||||
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
|
||||
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
|
||||
|
||||
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
|
||||
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
|
||||
|
@ -147,8 +147,8 @@ public class TestHive2JDBC {
|
|||
assertEquals(2, refs.getInputs().size());
|
||||
// QualifiedName : Name
|
||||
final Map<String, String> expectedInputRefs = new HashMap<>();
|
||||
expectedInputRefs.put("default.tableA1@cluster1", "tableA1");
|
||||
expectedInputRefs.put("default.tableA2@cluster1", "tableA2");
|
||||
expectedInputRefs.put("default.table_a1@cluster1", "table_a1");
|
||||
expectedInputRefs.put("default.table_a2@cluster1", "table_a2");
|
||||
for (Referenceable ref : refs.getInputs()) {
|
||||
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
|
||||
assertTrue(expectedInputRefs.containsKey(qName));
|
||||
|
@ -158,8 +158,8 @@ public class TestHive2JDBC {
|
|||
assertEquals(1, refs.getOutputs().size());
|
||||
Referenceable ref = refs.getOutputs().iterator().next();
|
||||
assertEquals("hive_table", ref.getTypeName());
|
||||
assertEquals("tableB1", ref.get(ATTR_NAME));
|
||||
assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals("table_b1", ref.get(ATTR_NAME));
|
||||
assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -174,8 +174,8 @@ public class TestHive2JDBC {
|
|||
when(record.getTransitUri()).thenReturn(transitUri);
|
||||
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
|
||||
// E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
|
||||
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
|
||||
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
|
||||
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
|
||||
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
|
||||
|
||||
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
|
||||
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
|
||||
|
@ -190,8 +190,8 @@ public class TestHive2JDBC {
|
|||
assertEquals(2, refs.getInputs().size());
|
||||
// QualifiedName : Name
|
||||
final Map<String, String> expectedInputRefs = new HashMap<>();
|
||||
expectedInputRefs.put("default.tableA1@cluster1", "tableA1");
|
||||
expectedInputRefs.put("default.tableA2@cluster1", "tableA2");
|
||||
expectedInputRefs.put("default.table_a1@cluster1", "table_a1");
|
||||
expectedInputRefs.put("default.table_a2@cluster1", "table_a2");
|
||||
for (Referenceable ref : refs.getInputs()) {
|
||||
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
|
||||
assertTrue(expectedInputRefs.containsKey(qName));
|
||||
|
@ -201,8 +201,8 @@ public class TestHive2JDBC {
|
|||
assertEquals(1, refs.getOutputs().size());
|
||||
Referenceable ref = refs.getOutputs().iterator().next();
|
||||
assertEquals("hive_table", ref.getTypeName());
|
||||
assertEquals("tableB1", ref.get(ATTR_NAME));
|
||||
assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals("table_b1", ref.get(ATTR_NAME));
|
||||
assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -219,8 +219,8 @@ public class TestHive2JDBC {
|
|||
when(record.getTransitUri()).thenReturn(transitUri);
|
||||
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
|
||||
// E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
|
||||
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
|
||||
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
|
||||
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
|
||||
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
|
||||
|
||||
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
|
||||
when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"), eq("2.example.com"))).thenReturn("cluster1");
|
||||
|
@ -235,8 +235,8 @@ public class TestHive2JDBC {
|
|||
assertEquals(2, refs.getInputs().size());
|
||||
// QualifiedName : Name
|
||||
final Map<String, String> expectedInputRefs = new HashMap<>();
|
||||
expectedInputRefs.put("default.tableA1@cluster1", "tableA1");
|
||||
expectedInputRefs.put("default.tableA2@cluster1", "tableA2");
|
||||
expectedInputRefs.put("default.table_a1@cluster1", "table_a1");
|
||||
expectedInputRefs.put("default.table_a2@cluster1", "table_a2");
|
||||
for (Referenceable ref : refs.getInputs()) {
|
||||
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
|
||||
assertTrue(expectedInputRefs.containsKey(qName));
|
||||
|
@ -246,8 +246,8 @@ public class TestHive2JDBC {
|
|||
assertEquals(1, refs.getOutputs().size());
|
||||
Referenceable ref = refs.getOutputs().iterator().next();
|
||||
assertEquals("hive_table", ref.getTypeName());
|
||||
assertEquals("tableB1", ref.get(ATTR_NAME));
|
||||
assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals("table_b1", ref.get(ATTR_NAME));
|
||||
assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -262,8 +262,8 @@ public class TestHive2JDBC {
|
|||
when(record.getTransitUri()).thenReturn(transitUri);
|
||||
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
|
||||
// E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
|
||||
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
|
||||
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
|
||||
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("table_A1, table_A2");
|
||||
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_B.table_B1");
|
||||
|
||||
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
|
||||
when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("cluster1");
|
||||
|
@ -278,8 +278,8 @@ public class TestHive2JDBC {
|
|||
assertEquals(2, refs.getInputs().size());
|
||||
// QualifiedName : Name
|
||||
final Map<String, String> expectedInputRefs = new HashMap<>();
|
||||
expectedInputRefs.put("some_database.tableA1@cluster1", "tableA1");
|
||||
expectedInputRefs.put("some_database.tableA2@cluster1", "tableA2");
|
||||
expectedInputRefs.put("some_database.table_a1@cluster1", "table_a1");
|
||||
expectedInputRefs.put("some_database.table_a2@cluster1", "table_a2");
|
||||
for (Referenceable ref : refs.getInputs()) {
|
||||
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
|
||||
assertTrue(expectedInputRefs.containsKey(qName));
|
||||
|
@ -289,8 +289,8 @@ public class TestHive2JDBC {
|
|||
assertEquals(1, refs.getOutputs().size());
|
||||
Referenceable ref = refs.getOutputs().iterator().next();
|
||||
assertEquals("hive_table", ref.getTypeName());
|
||||
assertEquals("tableB1", ref.get(ATTR_NAME));
|
||||
assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals("table_b1", ref.get(ATTR_NAME));
|
||||
assertEquals("database_b.table_b1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ public class TestPutHiveStreaming {
|
|||
when(record.getComponentType()).thenReturn(processorName);
|
||||
when(record.getTransitUri()).thenReturn(transitUri);
|
||||
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
|
||||
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseA.tableA");
|
||||
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("database_A.table_A");
|
||||
|
||||
final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
|
||||
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
|
||||
|
@ -69,7 +69,7 @@ public class TestPutHiveStreaming {
|
|||
assertEquals(1, refs.getOutputs().size());
|
||||
Referenceable ref = refs.getOutputs().iterator().next();
|
||||
assertEquals("hive_table", ref.getTypeName());
|
||||
assertEquals("tableA", ref.get(ATTR_NAME));
|
||||
assertEquals("databaseA.tableA@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
assertEquals("table_a", ref.get(ATTR_NAME));
|
||||
assertEquals("database_a.table_a@cluster1", ref.get(ATTR_QUALIFIED_NAME));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue