From 84968e70d293e49888addc3870c6a9cf222103b0 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Wed, 12 Feb 2020 16:27:56 -0500 Subject: [PATCH] NIFI-7087: Use FlowManager.findAllConnections() when available Signed-off-by: Matthew Burgess This closes #4026 --- .../org/apache/nifi/controller/FlowController.java | 8 ++++---- .../status/analytics/ConnectionStatusAnalytics.java | 13 ++++++++----- .../nifi/provenance/ComponentIdentifierLookup.java | 11 ++++++----- .../analytics/TestConnectionStatusAnalytics.java | 7 +++---- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 72c3416108..33df9bed66 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -875,7 +875,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node writeLock.lock(); try { // get all connections/queues and recover from swap files. - final List connections = flowManager.getRootGroup().findAllConnections(); + final Set connections = flowManager.findAllConnections(); flowFileRepository.loadFlowFiles(queueProvider); @@ -1086,7 +1086,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node startRemoteGroupPortsAfterInitialization.clear(); } - for (final Connection connection : flowManager.getRootGroup().findAllConnections()) { + for (final Connection connection : flowManager.findAllConnections()) { connection.getFlowFileQueue().startLoadBalancing(); } } finally { @@ -2645,7 +2645,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node return "Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue"; } - final List connections = flowManager.getRootGroup().findAllConnections(); + final Set connections = flowManager.findAllConnections(); FlowFileQueue queue = null; for (final Connection connection : connections) { if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) { @@ -2696,7 +2696,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue"); } - final List connections = flowManager.getRootGroup().findAllConnections(); + final Set connections = flowManager.findAllConnections(); FlowFileQueue queue = null; for (final Connection connection : connections) { if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java index a5245660c5..5500fbb369 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Optional; import java.util.stream.Stream; import org.apache.commons.collections4.MapUtils; @@ -33,7 +32,6 @@ import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.repository.RepositoryStatusReport; import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.controller.status.history.StatusHistory; -import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.util.Tuple; import org.slf4j.Logger; @@ -336,9 +334,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { } private Connection getConnection() { - final ProcessGroup rootGroup = flowManager.getRootGroup(); - Optional connection = rootGroup.findAllConnections().stream().filter(c -> c.getIdentifier().equals(this.connectionIdentifier)).findFirst(); - return connection.orElse(null); + Connection connection = null; + for (Connection c : flowManager.findAllConnections()) { + if (c.getIdentifier().equals(this.connectionIdentifier)) { + connection = c; + break; + } + } + return connection; } private FlowFileEvent getStatusReport() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java index 8dc1a9d8a8..f5da0fc294 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/provenance/ComponentIdentifierLookup.java @@ -24,7 +24,6 @@ import org.apache.nifi.processor.Processor; import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; public class ComponentIdentifierLookup implements IdentifierLookup { private final FlowController flowController; @@ -62,10 +61,12 @@ public class ComponentIdentifierLookup implements IdentifierLookup { @Override public List getQueueIdentifiers() { - final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup(); + Set connectionSet = flowController.getFlowManager().findAllConnections(); + List identifiers = new ArrayList<>(connectionSet.size()); - return rootGroup.findAllConnections().stream() - .map(Connection::getIdentifier) - .collect(Collectors.toList()); + for (Connection c : connectionSet) { + identifiers.add(c.getIdentifier()); + } + return identifiers; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java index ffa86a87b8..f406d1267b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java @@ -26,12 +26,11 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.when; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -88,7 +87,7 @@ public class TestConnectionStatusAnalytics { final FlowFileEvent flowFileEvent = Mockito.mock(FlowFileEvent.class); - final List connections = new ArrayList<>(); + final Set connections = new HashSet<>(); final String connectionIdentifier = "1"; connections.add(connection); @@ -96,7 +95,7 @@ public class TestConnectionStatusAnalytics { when(flowFileQueue.getBackPressureObjectThreshold()).thenReturn(100L); when(connection.getIdentifier()).thenReturn(connectionIdentifier); when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); - when(processGroup.findAllConnections()).thenReturn(connections); + when(flowManager.findAllConnections()).thenReturn(connections); when(flowManager.getRootGroup()).thenReturn(processGroup); when(flowFileEvent.getContentSizeIn()).thenReturn(10L); when(flowFileEvent.getContentSizeOut()).thenReturn(10L);