diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java index 560dc058b5..1781d1866d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java @@ -42,4 +42,12 @@ public interface FlowFileEventRepository extends Closeable { * @param cutoffEpochMilliseconds cutoff */ void purgeTransferEvents(long cutoffEpochMilliseconds); + + /** + * Causes any flow file events of the given component to be purged from the + * repository + * + * @param componentIdentifier Identifier of the component + */ + void purgeTransferEvents(String componentIdentifier); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 98b33ecc54..db18adaba1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2651,7 +2651,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } public void onProcessorRemoved(final ProcessorNode procNode) { - allProcessors.remove(procNode.getIdentifier()); + String identifier = procNode.getIdentifier(); + flowFileEventRepository.purgeTransferEvents(identifier); + allProcessors.remove(identifier); } public ProcessorNode getProcessorNode(final String id) { @@ -2663,7 +2665,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } public void onConnectionRemoved(final Connection connection) { - allConnections.remove(connection.getIdentifier()); + String identifier = connection.getIdentifier(); + flowFileEventRepository.purgeTransferEvents(identifier); + allConnections.remove(identifier); } public Connection getConnection(final String id) { @@ -2675,7 +2679,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } public void onInputPortRemoved(final Port inputPort) { - allInputPorts.remove(inputPort.getIdentifier()); + String identifier = inputPort.getIdentifier(); + flowFileEventRepository.purgeTransferEvents(identifier); + allInputPorts.remove(identifier); } public Port getInputPort(final String id) { @@ -2687,7 +2693,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } public void onOutputPortRemoved(final Port outputPort) { - allOutputPorts.remove(outputPort.getIdentifier()); + String identifier = outputPort.getIdentifier(); + flowFileEventRepository.purgeTransferEvents(identifier); + allOutputPorts.remove(identifier); } public Port getOutputPort(final String id) { @@ -2699,7 +2707,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } public void onFunnelRemoved(final Funnel funnel) { - allFunnels.remove(funnel.getIdentifier()); + String identifier = funnel.getIdentifier(); + flowFileEventRepository.purgeTransferEvents(identifier); + allFunnels.remove(identifier); } public Funnel getFunnel(final String id) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java index b9a82edf8a..c60f98da5c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java @@ -64,4 +64,9 @@ public class RingBufferEventRepository implements FlowFileEventRepository { } } + @Override + public void purgeTransferEvents(String componentIdentifier) { + componentEventMap.remove(componentIdentifier); + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java index cb5c3069c9..2bc158fdcd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.Test; +import org.testng.Assert; public class TestRingBufferEventRepository { @@ -33,7 +34,7 @@ public class TestRingBufferEventRepository { final RingBufferEventRepository repo = new RingBufferEventRepository(5); long insertNanos = 0L; for (int i = 0; i < 1000000; i++) { - final FlowFileEvent event = generateEvent(); + final FlowFileEvent event = generateEvent("ABC"); final long insertStart = System.nanoTime(); repo.updateRepository(event); @@ -49,11 +50,39 @@ public class TestRingBufferEventRepository { repo.close(); } - private FlowFileEvent generateEvent() { + @Test + public void testPurge() throws IOException { + final FlowFileEventRepository repo = new RingBufferEventRepository(5); + String id1 = "component1"; + String id2 = "component2"; + repo.updateRepository(generateEvent(id1)); + repo.updateRepository(generateEvent(id2)); + RepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000); + FlowFileEvent entry = report.getReportEntry(id1); + Assert.assertNotNull(entry); + entry = report.getReportEntry(id2); + Assert.assertNotNull(entry); + + repo.purgeTransferEvents(id1); + report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000); + entry = report.getReportEntry(id1); + Assert.assertNull(entry); + entry = report.getReportEntry(id2); + Assert.assertNotNull(entry); + + repo.purgeTransferEvents(id2); + report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000); + entry = report.getReportEntry(id2); + Assert.assertNull(entry); + + repo.close(); + } + + private FlowFileEvent generateEvent(final String id) { return new FlowFileEvent() { @Override public String getComponentIdentifier() { - return "ABC"; + return id; } @Override