From d3f54994a6510b5a428f30e65b3d456225749937 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 14 Mar 2018 17:42:10 +0900 Subject: [PATCH] NIFI-4972 - SelectHiveQL to emit FETCH provenance event SelectHiveQL should emit FETCH instead of CONTENT_MODIFIED when it has incoming connections. Signed-off-by: Pierre Villard This closes #2543. --- .../additionalDetails.html | 2 +- .../nifi/processors/hive/SelectHiveQL.java | 7 +++---- .../nifi/processors/hive/TestSelectHiveQL.java | 17 +++++++++++++++++ 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html index 38ad684f43..91c39799ed 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html @@ -392,7 +392,7 @@ Processor 3 SEND
- RECEIVE
+ RECEIVE, FETCH
jdbc:hive2://hive.example.com:10000/default hive_table diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index c15a9e1360..832636beb3 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -410,10 +410,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { new Object[]{flowfile, nrOfRows.get()}); if (context.hasIncomingConnection()) { - // If the flow file came from an incoming connection, issue a Modify Content provenance event - - session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + nrOfRows.get() + " rows", - stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + // If the flow file came from an incoming connection, issue a Fetch provenance event + session.getProvenanceReporter().fetch(flowfile, dbcpService.getConnectionURL(), + "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); } else { // If we created a flow file from rows received from Hive, issue a Receive provenance event session.getProvenanceReporter().receive(flowfile, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java index 3c3b7f9ef4..bb919d83fc 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java @@ -25,6 +25,8 @@ import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.dbcp.hive.HiveDBCPService; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -118,11 +120,26 @@ public class TestSelectHiveQL { public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException { runner.setIncomingConnection(false); invokeOnTrigger(QUERY_WITHOUT_EL, false, "Avro"); + + final List provenanceEvents = runner.getProvenanceEvents(); + final ProvenanceEventRecord provenance0 = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.RECEIVE, provenance0.getEventType()); + assertEquals("jdbc:derby:target/db;create=true", provenance0.getTransitUri()); } @Test public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { invokeOnTrigger(QUERY_WITH_EL, true, "Avro"); + + final List provenanceEvents = runner.getProvenanceEvents(); + assertEquals(2, provenanceEvents.size()); + + final ProvenanceEventRecord provenance0 = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.FETCH, provenance0.getEventType()); + assertEquals("jdbc:derby:target/db;create=true", provenance0.getTransitUri()); + + final ProvenanceEventRecord provenance1 = provenanceEvents.get(1); + assertEquals(ProvenanceEventType.FORK, provenance1.getEventType()); }