diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index ac56e61f4a..a4a967bce8 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -690,6 +690,9 @@ public class MockProcessSession implements ProcessSession { if (Objects.equals(ff.getId(), flowFile.getId())) { penalizedItr.remove(); penalized.remove(ff); + if (originalVersions.get(ff.getId()) != null) { + provenanceReporter.drop(ff, ff.getAttribute(CoreAttributes.DISCARD_REASON.key())); + } break; } } @@ -702,6 +705,9 @@ public class MockProcessSession implements ProcessSession { beingProcessed.remove(ffId); removedFlowFiles.add(flowFile.getId()); currentVersions.remove(ffId); + if (originalVersions.get(flowFile.getId()) != null) { + provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key())); + } return; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java index 15637da1a9..f3ea7cd515 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java @@ -166,7 +166,7 @@ public class TestStandardRemoteGroupPort { // Assert provenance. final List provenanceEvents = sessionState.getProvenanceEvents(); - assertEquals(1, provenanceEvents.size()); + assertEquals(2, provenanceEvents.size()); final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0); assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType()); assertEquals("nifi://node1.example.com:9090/flowfile-uuid", provenanceEvent.getTransitUri()); @@ -246,7 +246,7 @@ public class TestStandardRemoteGroupPort { // Assert provenance. final List provenanceEvents = sessionState.getProvenanceEvents(); - assertEquals(1, provenanceEvents.size()); + assertEquals(2, provenanceEvents.size()); final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0); assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType()); assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri()); @@ -369,7 +369,8 @@ public class TestStandardRemoteGroupPort { assertEquals(flowFiles.size(), totalPacketsSent.get()); assertEquals("The number of transactions should match as expected.", expectedNumberOfPackets.length, sentPackets.size()); final List provenanceEvents = sessionState.getProvenanceEvents(); - assertEquals(flowFiles.size(), provenanceEvents.size()); + // SEND and DROP events for each flowfile + assertEquals(flowFiles.size() * 2, provenanceEvents.size()); int f = 0; for (int i = 0; i < expectedNumberOfPackets.length; i++) { @@ -383,11 +384,12 @@ public class TestStandardRemoteGroupPort { final DataPacket dataPacket = dataPackets.get(p); assertEquals(flowFile.getSize(), dataPacket.getSize()); - // Assert provenance event - final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(f); + // Assert provenance events (SEND and DROP) + ProvenanceEventRecord provenanceEvent = provenanceEvents.get(f * 2); assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType()); assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri()); - + provenanceEvent = provenanceEvents.get(f * 2 + 1); + assertEquals(ProvenanceEventType.DROP, provenanceEvent.getEventType()); f++; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java index e63e901f8e..e524528ad3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java @@ -314,7 +314,8 @@ public class TestHttpFlowFileServerProtocol { // Assert provenance final List provenanceEvents = sessionState.getProvenanceEvents(); - assertEquals(1, provenanceEvents.size()); + // Assert provenance (SEND and DROP) + assertEquals(2, provenanceEvents.size()); final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0); assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType()); assertEquals(endpointUri, provenanceEvent.getTransitUri()); @@ -410,13 +411,16 @@ public class TestHttpFlowFileServerProtocol { final int flowFileSent = serverProtocol.commitTransferTransaction(peer, "3058746557"); assertEquals(2, flowFileSent); - // Assert provenance + // Assert provenance (SEND and DROP) final List provenanceEvents = sessionState.getProvenanceEvents(); - assertEquals(2, provenanceEvents.size()); - for (final ProvenanceEventRecord provenanceEvent : provenanceEvents) { + assertEquals(4, provenanceEvents.size()); + for (int i = 0; i < provenanceEvents.size(); i += 2) { + ProvenanceEventRecord provenanceEvent = provenanceEvents.get(i); assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType()); assertEquals(endpointUri, provenanceEvent.getTransitUri()); assertEquals("Remote Host=peer-host, Remote DN=unit-test", provenanceEvent.getDetails()); + provenanceEvent = provenanceEvents.get(i + 1); + assertEquals(ProvenanceEventType.DROP, provenanceEvent.getEventType()); } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java index e0924fb730..c6ba7193a2 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java @@ -134,7 +134,7 @@ public class TestSelectHiveQL { invokeOnTrigger(QUERY_WITH_EL, true, "Avro"); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(3, provenanceEvents.size()); + assertEquals(4, provenanceEvents.size()); final ProvenanceEventRecord provenance0 = provenanceEvents.get(0); assertEquals(ProvenanceEventType.FORK, provenance0.getEventType()); @@ -145,6 +145,10 @@ public class TestSelectHiveQL { final ProvenanceEventRecord provenance2 = provenanceEvents.get(2); assertEquals(ProvenanceEventType.FORK, provenance2.getEventType()); + + // The last one was removed as empty + final ProvenanceEventRecord provenance3 = provenanceEvents.get(3); + assertEquals(ProvenanceEventType.DROP, provenance3.getEventType()); } @@ -215,7 +219,7 @@ public class TestSelectHiveQL { } @Test - public void invokeOnTriggerExceptionInPreQieriesNoIncomingFlows() + public void invokeOnTriggerExceptionInPreQueriesNoIncomingFlows() throws InitializationException, ClassNotFoundException, SQLException, IOException { doOnTrigger(QUERY_WITHOUT_EL, false, CSV, @@ -226,7 +230,7 @@ public class TestSelectHiveQL { } @Test - public void invokeOnTriggerExceptionInPreQieriesWithIncomingFlows() + public void invokeOnTriggerExceptionInPreQueriesWithIncomingFlows() throws InitializationException, ClassNotFoundException, SQLException, IOException { doOnTrigger(QUERY_WITHOUT_EL, true, CSV, @@ -237,7 +241,7 @@ public class TestSelectHiveQL { } @Test - public void invokeOnTriggerExceptionInPostQieriesNoIncomingFlows() + public void invokeOnTriggerExceptionInPostQueriesNoIncomingFlows() throws InitializationException, ClassNotFoundException, SQLException, IOException { doOnTrigger(QUERY_WITHOUT_EL, false, CSV, @@ -248,7 +252,7 @@ public class TestSelectHiveQL { } @Test - public void invokeOnTriggerExceptionInPostQieriesWithIncomingFlows() + public void invokeOnTriggerExceptionInPostQueriesWithIncomingFlows() throws InitializationException, ClassNotFoundException, SQLException, IOException { doOnTrigger(QUERY_WITHOUT_EL, true, CSV, diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java index eb1b065b40..356106fcaa 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java @@ -134,7 +134,7 @@ public class TestSelectHive3QL { invokeOnTrigger(QUERY_WITH_EL, true, "Avro"); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(3, provenanceEvents.size()); + assertEquals(4, provenanceEvents.size()); final ProvenanceEventRecord provenance0 = provenanceEvents.get(0); assertEquals(ProvenanceEventType.FORK, provenance0.getEventType()); @@ -145,6 +145,9 @@ public class TestSelectHive3QL { final ProvenanceEventRecord provenance2 = provenanceEvents.get(2); assertEquals(ProvenanceEventType.FORK, provenance2.getEventType()); + + final ProvenanceEventRecord provenance3 = provenanceEvents.get(3); + assertEquals(ProvenanceEventType.DROP, provenance3.getEventType()); } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive_1_1QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive_1_1QL.java index eba231baaa..86b9acb1e8 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive_1_1QL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive_1_1QL.java @@ -134,7 +134,7 @@ public class TestSelectHive_1_1QL { invokeOnTrigger(QUERY_WITH_EL, true, "Avro"); final List provenanceEvents = runner.getProvenanceEvents(); - assertEquals(3, provenanceEvents.size()); + assertEquals(4, provenanceEvents.size()); final ProvenanceEventRecord provenance0 = provenanceEvents.get(0); assertEquals(ProvenanceEventType.FORK, provenance0.getEventType()); @@ -145,6 +145,9 @@ public class TestSelectHive_1_1QL { final ProvenanceEventRecord provenance2 = provenanceEvents.get(2); assertEquals(ProvenanceEventType.FORK, provenance2.getEventType()); + + final ProvenanceEventRecord provenance3 = provenanceEvents.get(3); + assertEquals(ProvenanceEventType.DROP, provenance3.getEventType()); } @@ -204,7 +207,7 @@ public class TestSelectHive_1_1QL { } @Test - public void invokeOnTriggerExceptionInPreQieriesNoIncomingFlows() + public void invokeOnTriggerExceptionInPreQueriesNoIncomingFlows() throws InitializationException, ClassNotFoundException, SQLException, IOException { doOnTrigger(QUERY_WITHOUT_EL, false, CSV, @@ -215,7 +218,7 @@ public class TestSelectHive_1_1QL { } @Test - public void invokeOnTriggerExceptionInPreQieriesWithIncomingFlows() + public void invokeOnTriggerExceptionInPreQueriesWithIncomingFlows() throws InitializationException, ClassNotFoundException, SQLException, IOException { doOnTrigger(QUERY_WITHOUT_EL, true, CSV, @@ -226,7 +229,7 @@ public class TestSelectHive_1_1QL { } @Test - public void invokeOnTriggerExceptionInPostQieriesNoIncomingFlows() + public void invokeOnTriggerExceptionInPostQueriesNoIncomingFlows() throws InitializationException, ClassNotFoundException, SQLException, IOException { doOnTrigger(QUERY_WITHOUT_EL, false, CSV, @@ -237,7 +240,7 @@ public class TestSelectHive_1_1QL { } @Test - public void invokeOnTriggerExceptionInPostQieriesWithIncomingFlows() + public void invokeOnTriggerExceptionInPostQueriesWithIncomingFlows() throws InitializationException, ClassNotFoundException, SQLException, IOException { doOnTrigger(QUERY_WITHOUT_EL, true, CSV,