NIFI-5936: Added DROP provenance event to MockProcessSession.remove() to match real impl

This closes #5302

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Matthew Burgess 2021-08-10 16:11:20 -04:00 committed by Mike Thomsen
parent 9a56d23311
commit 5ff4974b1f
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
6 changed files with 43 additions and 21 deletions

View File

@ -690,6 +690,9 @@ public class MockProcessSession implements ProcessSession {
if (Objects.equals(ff.getId(), flowFile.getId())) {
penalizedItr.remove();
penalized.remove(ff);
if (originalVersions.get(ff.getId()) != null) {
provenanceReporter.drop(ff, ff.getAttribute(CoreAttributes.DISCARD_REASON.key()));
}
break;
}
}
@ -702,6 +705,9 @@ public class MockProcessSession implements ProcessSession {
beingProcessed.remove(ffId);
removedFlowFiles.add(flowFile.getId());
currentVersions.remove(ffId);
if (originalVersions.get(flowFile.getId()) != null) {
provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key()));
}
return;
}
}

View File

@ -166,7 +166,7 @@ public class TestStandardRemoteGroupPort {
// Assert provenance.
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
assertEquals(2, provenanceEvents.size());
final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
assertEquals("nifi://node1.example.com:9090/flowfile-uuid", provenanceEvent.getTransitUri());
@ -246,7 +246,7 @@ public class TestStandardRemoteGroupPort {
// Assert provenance.
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
assertEquals(2, provenanceEvents.size());
final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri());
@ -369,7 +369,8 @@ public class TestStandardRemoteGroupPort {
assertEquals(flowFiles.size(), totalPacketsSent.get());
assertEquals("The number of transactions should match as expected.", expectedNumberOfPackets.length, sentPackets.size());
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(flowFiles.size(), provenanceEvents.size());
// SEND and DROP events for each flowfile
assertEquals(flowFiles.size() * 2, provenanceEvents.size());
int f = 0;
for (int i = 0; i < expectedNumberOfPackets.length; i++) {
@ -383,11 +384,12 @@ public class TestStandardRemoteGroupPort {
final DataPacket dataPacket = dataPackets.get(p);
assertEquals(flowFile.getSize(), dataPacket.getSize());
// Assert provenance event
final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(f);
// Assert provenance events (SEND and DROP)
ProvenanceEventRecord provenanceEvent = provenanceEvents.get(f * 2);
assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri());
provenanceEvent = provenanceEvents.get(f * 2 + 1);
assertEquals(ProvenanceEventType.DROP, provenanceEvent.getEventType());
f++;
}
}

View File

@ -314,7 +314,8 @@ public class TestHttpFlowFileServerProtocol {
// Assert provenance
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
// Assert provenance (SEND and DROP)
assertEquals(2, provenanceEvents.size());
final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
assertEquals(endpointUri, provenanceEvent.getTransitUri());
@ -410,13 +411,16 @@ public class TestHttpFlowFileServerProtocol {
final int flowFileSent = serverProtocol.commitTransferTransaction(peer, "3058746557");
assertEquals(2, flowFileSent);
// Assert provenance
// Assert provenance (SEND and DROP)
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(2, provenanceEvents.size());
for (final ProvenanceEventRecord provenanceEvent : provenanceEvents) {
assertEquals(4, provenanceEvents.size());
for (int i = 0; i < provenanceEvents.size(); i += 2) {
ProvenanceEventRecord provenanceEvent = provenanceEvents.get(i);
assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
assertEquals(endpointUri, provenanceEvent.getTransitUri());
assertEquals("Remote Host=peer-host, Remote DN=unit-test", provenanceEvent.getDetails());
provenanceEvent = provenanceEvents.get(i + 1);
assertEquals(ProvenanceEventType.DROP, provenanceEvent.getEventType());
}
}

View File

@ -134,7 +134,7 @@ public class TestSelectHiveQL {
invokeOnTrigger(QUERY_WITH_EL, true, "Avro");
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(3, provenanceEvents.size());
assertEquals(4, provenanceEvents.size());
final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.FORK, provenance0.getEventType());
@ -145,6 +145,10 @@ public class TestSelectHiveQL {
final ProvenanceEventRecord provenance2 = provenanceEvents.get(2);
assertEquals(ProvenanceEventType.FORK, provenance2.getEventType());
// The last one was removed as empty
final ProvenanceEventRecord provenance3 = provenanceEvents.get(3);
assertEquals(ProvenanceEventType.DROP, provenance3.getEventType());
}
@ -215,7 +219,7 @@ public class TestSelectHiveQL {
}
@Test
public void invokeOnTriggerExceptionInPreQieriesNoIncomingFlows()
public void invokeOnTriggerExceptionInPreQueriesNoIncomingFlows()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
doOnTrigger(QUERY_WITHOUT_EL, false, CSV,
@ -226,7 +230,7 @@ public class TestSelectHiveQL {
}
@Test
public void invokeOnTriggerExceptionInPreQieriesWithIncomingFlows()
public void invokeOnTriggerExceptionInPreQueriesWithIncomingFlows()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
doOnTrigger(QUERY_WITHOUT_EL, true, CSV,
@ -237,7 +241,7 @@ public class TestSelectHiveQL {
}
@Test
public void invokeOnTriggerExceptionInPostQieriesNoIncomingFlows()
public void invokeOnTriggerExceptionInPostQueriesNoIncomingFlows()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
doOnTrigger(QUERY_WITHOUT_EL, false, CSV,
@ -248,7 +252,7 @@ public class TestSelectHiveQL {
}
@Test
public void invokeOnTriggerExceptionInPostQieriesWithIncomingFlows()
public void invokeOnTriggerExceptionInPostQueriesWithIncomingFlows()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
doOnTrigger(QUERY_WITHOUT_EL, true, CSV,

View File

@ -134,7 +134,7 @@ public class TestSelectHive3QL {
invokeOnTrigger(QUERY_WITH_EL, true, "Avro");
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(3, provenanceEvents.size());
assertEquals(4, provenanceEvents.size());
final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.FORK, provenance0.getEventType());
@ -145,6 +145,9 @@ public class TestSelectHive3QL {
final ProvenanceEventRecord provenance2 = provenanceEvents.get(2);
assertEquals(ProvenanceEventType.FORK, provenance2.getEventType());
final ProvenanceEventRecord provenance3 = provenanceEvents.get(3);
assertEquals(ProvenanceEventType.DROP, provenance3.getEventType());
}

View File

@ -134,7 +134,7 @@ public class TestSelectHive_1_1QL {
invokeOnTrigger(QUERY_WITH_EL, true, "Avro");
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(3, provenanceEvents.size());
assertEquals(4, provenanceEvents.size());
final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.FORK, provenance0.getEventType());
@ -145,6 +145,9 @@ public class TestSelectHive_1_1QL {
final ProvenanceEventRecord provenance2 = provenanceEvents.get(2);
assertEquals(ProvenanceEventType.FORK, provenance2.getEventType());
final ProvenanceEventRecord provenance3 = provenanceEvents.get(3);
assertEquals(ProvenanceEventType.DROP, provenance3.getEventType());
}
@ -204,7 +207,7 @@ public class TestSelectHive_1_1QL {
}
@Test
public void invokeOnTriggerExceptionInPreQieriesNoIncomingFlows()
public void invokeOnTriggerExceptionInPreQueriesNoIncomingFlows()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
doOnTrigger(QUERY_WITHOUT_EL, false, CSV,
@ -215,7 +218,7 @@ public class TestSelectHive_1_1QL {
}
@Test
public void invokeOnTriggerExceptionInPreQieriesWithIncomingFlows()
public void invokeOnTriggerExceptionInPreQueriesWithIncomingFlows()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
doOnTrigger(QUERY_WITHOUT_EL, true, CSV,
@ -226,7 +229,7 @@ public class TestSelectHive_1_1QL {
}
@Test
public void invokeOnTriggerExceptionInPostQieriesNoIncomingFlows()
public void invokeOnTriggerExceptionInPostQueriesNoIncomingFlows()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
doOnTrigger(QUERY_WITHOUT_EL, false, CSV,
@ -237,7 +240,7 @@ public class TestSelectHive_1_1QL {
}
@Test
public void invokeOnTriggerExceptionInPostQieriesWithIncomingFlows()
public void invokeOnTriggerExceptionInPostQueriesWithIncomingFlows()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
doOnTrigger(QUERY_WITHOUT_EL, true, CSV,