diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java index 78c37ea2f9..de705daa59 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java @@ -37,6 +37,9 @@ public class PutHiveStreaming extends AbstractHiveAnalyzer { @Override public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) { + if (event.getTransitUri() == null) { + return null; + } final URI uri = parseUri(event.getTransitUri()); final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost()); @@ -55,6 +58,6 @@ public class PutHiveStreaming extends AbstractHiveAnalyzer { @Override public String targetComponentTypePattern() { - return "^PutHiveStreaming$"; + return "^PutHive(3)?Streaming$"; } } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java index 5184025020..606f6d5e7b 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java @@ -38,8 +38,16 @@ import static org.mockito.Mockito.when; public class TestPutHiveStreaming { @Test - public void testTableLineage() { - final String processorName = "PutHiveStreaming"; + public void testTableLineageHive1() { + testTableLineage("PutHiveStreaming"); + } + + @Test + public void testTableLineageHive3() { + testTableLineage("PutHive3Streaming"); + } + + private void testTableLineage(String processorName) { final String transitUri = "thrift://0.example.com:9083"; final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class); when(record.getComponentType()).thenReturn(processorName);