NIFI-5225: Purge event data from event repository when Connectable is removed

This closes #2732.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Frederik Petersen 2018-05-22 12:55:59 +02:00 committed by Mark Payne
parent cca520aab7
commit d75ba167cd
4 changed files with 60 additions and 8 deletions

View File

@ -42,4 +42,12 @@ public interface FlowFileEventRepository extends Closeable {
* @param cutoffEpochMilliseconds cutoff
*/
void purgeTransferEvents(long cutoffEpochMilliseconds);
/**
* Causes any flow file events of the given component to be purged from the
* repository
*
* @param componentIdentifier Identifier of the component
*/
void purgeTransferEvents(String componentIdentifier);
}

View File

@ -2651,7 +2651,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
public void onProcessorRemoved(final ProcessorNode procNode) {
allProcessors.remove(procNode.getIdentifier());
String identifier = procNode.getIdentifier();
flowFileEventRepository.purgeTransferEvents(identifier);
allProcessors.remove(identifier);
}
public ProcessorNode getProcessorNode(final String id) {
@ -2663,7 +2665,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
public void onConnectionRemoved(final Connection connection) {
allConnections.remove(connection.getIdentifier());
String identifier = connection.getIdentifier();
flowFileEventRepository.purgeTransferEvents(identifier);
allConnections.remove(identifier);
}
public Connection getConnection(final String id) {
@ -2675,7 +2679,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
public void onInputPortRemoved(final Port inputPort) {
allInputPorts.remove(inputPort.getIdentifier());
String identifier = inputPort.getIdentifier();
flowFileEventRepository.purgeTransferEvents(identifier);
allInputPorts.remove(identifier);
}
public Port getInputPort(final String id) {
@ -2687,7 +2693,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
public void onOutputPortRemoved(final Port outputPort) {
allOutputPorts.remove(outputPort.getIdentifier());
String identifier = outputPort.getIdentifier();
flowFileEventRepository.purgeTransferEvents(identifier);
allOutputPorts.remove(identifier);
}
public Port getOutputPort(final String id) {
@ -2699,7 +2707,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
public void onFunnelRemoved(final Funnel funnel) {
allFunnels.remove(funnel.getIdentifier());
String identifier = funnel.getIdentifier();
flowFileEventRepository.purgeTransferEvents(identifier);
allFunnels.remove(identifier);
}
public Funnel getFunnel(final String id) {

View File

@ -64,4 +64,9 @@ public class RingBufferEventRepository implements FlowFileEventRepository {
}
}
@Override
public void purgeTransferEvents(String componentIdentifier) {
componentEventMap.remove(componentIdentifier);
}
}

View File

@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.testng.Assert;
public class TestRingBufferEventRepository {
@ -33,7 +34,7 @@ public class TestRingBufferEventRepository {
final RingBufferEventRepository repo = new RingBufferEventRepository(5);
long insertNanos = 0L;
for (int i = 0; i < 1000000; i++) {
final FlowFileEvent event = generateEvent();
final FlowFileEvent event = generateEvent("ABC");
final long insertStart = System.nanoTime();
repo.updateRepository(event);
@ -49,11 +50,39 @@ public class TestRingBufferEventRepository {
repo.close();
}
private FlowFileEvent generateEvent() {
@Test
public void testPurge() throws IOException {
final FlowFileEventRepository repo = new RingBufferEventRepository(5);
String id1 = "component1";
String id2 = "component2";
repo.updateRepository(generateEvent(id1));
repo.updateRepository(generateEvent(id2));
RepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000);
FlowFileEvent entry = report.getReportEntry(id1);
Assert.assertNotNull(entry);
entry = report.getReportEntry(id2);
Assert.assertNotNull(entry);
repo.purgeTransferEvents(id1);
report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000);
entry = report.getReportEntry(id1);
Assert.assertNull(entry);
entry = report.getReportEntry(id2);
Assert.assertNotNull(entry);
repo.purgeTransferEvents(id2);
report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000);
entry = report.getReportEntry(id2);
Assert.assertNull(entry);
repo.close();
}
private FlowFileEvent generateEvent(final String id) {
return new FlowFileEvent() {
@Override
public String getComponentIdentifier() {
return "ABC";
return id;
}
@Override