From f16cbd462b8d5bfea2cf4e1d02910f22e77d0354 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Thu, 25 Jan 2018 13:57:01 +0900 Subject: [PATCH] NIFI-4818: Fix transit URL parsing at Hive2JDBC and KafkaTopic for ReportLineageToAtlas - Hive2JDBC: Handle connection parameters and multiple host entries correctly - KafkaTopic: Handle multiple host entries correctly - Avoid potential "IllegalStateException: Duplicate key" exception when NiFiAtlasHook analyzes existing NiFiFlowPath input/output entries - This closes #2435 --- .../org/apache/nifi/atlas/NiFiAtlasHook.java | 6 +- .../atlas/provenance/analyzer/Hive2JDBC.java | 46 ++++-- .../atlas/provenance/analyzer/KafkaTopic.java | 10 +- .../provenance/analyzer/TestHive2JDBC.java | 133 ++++++++++++++++++ .../provenance/analyzer/TestKafkaTopic.java | 3 +- 5 files changed, 179 insertions(+), 19 deletions(-) diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java index a15c93572d..58945d5f57 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java @@ -255,7 +255,11 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { } return new Tuple<>(refQualifiedName, typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName))); }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null) - .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue)); + // If duplication happens, use new value. + .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue, (oldValue, newValue) -> { + logger.warn("Duplicated qualified name was found, use the new one. oldValue={}, newValue={}", new Object[]{oldValue, newValue}); + return newValue; + })); } @SuppressWarnings("unchecked") diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java index e393a09c53..ccbbc66951 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java @@ -21,10 +21,14 @@ import org.apache.nifi.atlas.provenance.AnalysisContext; import org.apache.nifi.atlas.provenance.DataSetRefs; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.net.URI; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_INPUT_TABLES; import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_OUTPUT_TABLES; @@ -49,17 +53,41 @@ import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.par */ public class Hive2JDBC extends AbstractHiveAnalyzer { + private static final Logger logger = LoggerFactory.getLogger(Hive2JDBC.class); + + // jdbc:hive2://:,:/dbName;initFile=;sess_var_list?hive_conf_list#hive_var_list + // Group 1 = :,: + // Group 2 = dbName;initFile=;sess_var_list?hive_conf_list#hive_var_list + private static final String URI_PATTERN_STR = "jdbc:hive2://([^/]+)/?(.*)$"; + private static final Pattern URI_PATTERN = Pattern.compile(URI_PATTERN_STR); + @Override public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { // Replace the colon so that the schema in the URI can be parsed correctly. - final String transitUri = event.getTransitUri().replaceFirst("^jdbc:hive2", "jdbc-hive2"); - final URI uri = parseUri(transitUri); - final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost()); - // Remove the heading '/' - final String path = uri.getPath(); - // If uri does not contain database name, then use 'default' as database name. - final String connectedDatabaseName = path == null || path.isEmpty() ? "default" : path.substring(1); + final String transitUri = event.getTransitUri(); + if (transitUri == null) { + return null; + } + + final Matcher uriMatcher = URI_PATTERN.matcher(transitUri); + if (!uriMatcher.matches()) { + logger.warn("Unexpected transit URI: {}", new Object[]{transitUri}); + return null; + } + + final String clusterName = context.getClusterResolver().fromHostNames(splitHostNames(uriMatcher.group(1))); + String connectedDatabaseName = null; + if (uriMatcher.groupCount() > 1) { + // Try to find connected database name from connection parameters. + final String[] connectionParams = uriMatcher.group(2).split(";"); + connectedDatabaseName = connectionParams[0]; + } + + if (StringUtils.isEmpty(connectedDatabaseName)) { + // If not found, then use "default". + connectedDatabaseName = "default"; + } final Set> inputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_INPUT_TABLES)); final Set> outputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_OUTPUT_TABLES)); @@ -97,6 +125,6 @@ public class Hive2JDBC extends AbstractHiveAnalyzer { @Override public String targetTransitUriPattern() { - return "^jdbc:hive2://.+$"; + return URI_PATTERN_STR; } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java index e3d4709203..ff86166d6d 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java @@ -62,14 +62,8 @@ public class KafkaTopic extends AbstractNiFiProvenanceEventAnalyzer { return null; } - String clusterName = null; - for (String broker : uriMatcher.group(1).split(",")) { - final String brokerHostname = broker.split(":")[0].trim(); - clusterName = context.getClusterResolver().fromHostNames(brokerHostname); - if (clusterName != null && !clusterName.isEmpty()) { - break; - } - } + final String[] hostNames = splitHostNames(uriMatcher.group(1)); + final String clusterName = context.getClusterResolver().fromHostNames(hostNames); final String topicName = uriMatcher.group(2); diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java index 9e1a92ca74..f63d8ca112 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java @@ -37,6 +37,7 @@ import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATT import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.matches; import static org.mockito.Mockito.when; @@ -160,4 +161,136 @@ public class TestHive2JDBC { assertEquals("tableB1", ref.get(ATTR_NAME)); assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); } + + /** + * A Hive connection URL can have connection strings delimited by semicolons. + */ + @Test + public void testTableLineageWithDefaultTableNameWithConnectionParams() { + final String processorName = "PutHiveQL"; + final String transitUri = "jdbc:hive2://0.example.com:10000;transportMode=http;httpPath=cliservice"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); + // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id + when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2"); + when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1"); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(2, refs.getInputs().size()); + // QualifiedName : Name + final Map expectedInputRefs = new HashMap<>(); + expectedInputRefs.put("default.tableA1@cluster1", "tableA1"); + expectedInputRefs.put("default.tableA2@cluster1", "tableA2"); + for (Referenceable ref : refs.getInputs()) { + final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); + assertTrue(expectedInputRefs.containsKey(qName)); + assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME)); + } + + assertEquals(1, refs.getOutputs().size()); + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals("hive_table", ref.getTypeName()); + assertEquals("tableB1", ref.get(ATTR_NAME)); + assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + /** + * Hive connection URL can have multiple zookeeper host ports + * and multiple parameters delimited with semicolons. + * Database name can be omitted. + */ + @Test + public void testTableLineageWithZookeeperDiscovery() { + final String processorName = "PutHiveQL"; + final String transitUri = "jdbc:hive2://0.example.com:2181,1.example.com:2181,2.example.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); + // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id + when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2"); + when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1"); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"), eq("2.example.com"))).thenReturn("cluster1"); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(2, refs.getInputs().size()); + // QualifiedName : Name + final Map expectedInputRefs = new HashMap<>(); + expectedInputRefs.put("default.tableA1@cluster1", "tableA1"); + expectedInputRefs.put("default.tableA2@cluster1", "tableA2"); + for (Referenceable ref : refs.getInputs()) { + final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); + assertTrue(expectedInputRefs.containsKey(qName)); + assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME)); + } + + assertEquals(1, refs.getOutputs().size()); + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals("hive_table", ref.getTypeName()); + assertEquals("tableB1", ref.get(ATTR_NAME)); + assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + + /** + * Hive connection URL using zookeeper and database name. + */ + @Test + public void testTableLineageWithZookeeperDiscoverySpecificDatabase() { + final String processorName = "PutHiveQL"; + final String transitUri = "jdbc:hive2://0.example.com:2181,1.example.com:2181/some_database;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2"; + final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); + when(record.getComponentType()).thenReturn(processorName); + when(record.getTransitUri()).thenReturn(transitUri); + when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); + // E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id + when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2"); + when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1"); + + final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); + when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("cluster1"); + + final AnalysisContext context = Mockito.mock(AnalysisContext.class); + when(context.getClusterResolver()).thenReturn(clusterResolvers); + + final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType()); + assertNotNull(analyzer); + + final DataSetRefs refs = analyzer.analyze(context, record); + assertEquals(2, refs.getInputs().size()); + // QualifiedName : Name + final Map expectedInputRefs = new HashMap<>(); + expectedInputRefs.put("some_database.tableA1@cluster1", "tableA1"); + expectedInputRefs.put("some_database.tableA2@cluster1", "tableA2"); + for (Referenceable ref : refs.getInputs()) { + final String qName = (String) ref.get(ATTR_QUALIFIED_NAME); + assertTrue(expectedInputRefs.containsKey(qName)); + assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME)); + } + + assertEquals(1, refs.getOutputs().size()); + Referenceable ref = refs.getOutputs().iterator().next(); + assertEquals("hive_table", ref.getTypeName()); + assertEquals("tableB1", ref.get(ATTR_NAME)); + assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME)); + } + } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java index 5c0fd0ed66..543ac89f55 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java @@ -31,6 +31,7 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.matches; import static org.mockito.Mockito.when; @@ -73,7 +74,7 @@ public class TestKafkaTopic { when(record.getEventType()).thenReturn(ProvenanceEventType.SEND); final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class); - when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1"); + when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("cluster1"); final AnalysisContext context = Mockito.mock(AnalysisContext.class); when(context.getClusterResolver()).thenReturn(clusterResolvers);