NIFI-7087: Use FlowManager.findAllConnections() when available

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4026
This commit is contained in:
Matthew Burgess 2020-02-12 16:27:56 -05:00
parent 683b1d9952
commit 84968e70d2
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
4 changed files with 21 additions and 18 deletions

View File

@ -875,7 +875,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
writeLock.lock(); writeLock.lock();
try { try {
// get all connections/queues and recover from swap files. // get all connections/queues and recover from swap files.
final List<Connection> connections = flowManager.getRootGroup().findAllConnections(); final Set<Connection> connections = flowManager.findAllConnections();
flowFileRepository.loadFlowFiles(queueProvider); flowFileRepository.loadFlowFiles(queueProvider);
@ -1086,7 +1086,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
startRemoteGroupPortsAfterInitialization.clear(); startRemoteGroupPortsAfterInitialization.clear();
} }
for (final Connection connection : flowManager.getRootGroup().findAllConnections()) { for (final Connection connection : flowManager.findAllConnections()) {
connection.getFlowFileQueue().startLoadBalancing(); connection.getFlowFileQueue().startLoadBalancing();
} }
} finally { } finally {
@ -2645,7 +2645,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return "Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue"; return "Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue";
} }
final List<Connection> connections = flowManager.getRootGroup().findAllConnections(); final Set<Connection> connections = flowManager.findAllConnections();
FlowFileQueue queue = null; FlowFileQueue queue = null;
for (final Connection connection : connections) { for (final Connection connection : connections) {
if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) { if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) {
@ -2696,7 +2696,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue"); throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue");
} }
final List<Connection> connections = flowManager.getRootGroup().findAllConnections(); final Set<Connection> connections = flowManager.findAllConnections();
FlowFileQueue queue = null; FlowFileQueue queue = null;
for (final Connection connection : connections) { for (final Connection connection : connections) {
if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) { if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) {

View File

@ -21,7 +21,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
@ -33,7 +32,6 @@ import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.RepositoryStatusReport; import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.status.history.ComponentStatusRepository; import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.StatusHistory; import org.apache.nifi.controller.status.history.StatusHistory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.util.Tuple; import org.apache.nifi.util.Tuple;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -336,9 +334,14 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
} }
private Connection getConnection() { private Connection getConnection() {
final ProcessGroup rootGroup = flowManager.getRootGroup(); Connection connection = null;
Optional<Connection> connection = rootGroup.findAllConnections().stream().filter(c -> c.getIdentifier().equals(this.connectionIdentifier)).findFirst(); for (Connection c : flowManager.findAllConnections()) {
return connection.orElse(null); if (c.getIdentifier().equals(this.connectionIdentifier)) {
connection = c;
break;
}
}
return connection;
} }
private FlowFileEvent getStatusReport() { private FlowFileEvent getStatusReport() {

View File

@ -24,7 +24,6 @@ import org.apache.nifi.processor.Processor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
public class ComponentIdentifierLookup implements IdentifierLookup { public class ComponentIdentifierLookup implements IdentifierLookup {
private final FlowController flowController; private final FlowController flowController;
@ -62,10 +61,12 @@ public class ComponentIdentifierLookup implements IdentifierLookup {
@Override @Override
public List<String> getQueueIdentifiers() { public List<String> getQueueIdentifiers() {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup(); Set<Connection> connectionSet = flowController.getFlowManager().findAllConnections();
List<String> identifiers = new ArrayList<>(connectionSet.size());
return rootGroup.findAllConnections().stream() for (Connection c : connectionSet) {
.map(Connection::getIdentifier) identifiers.add(c.getIdentifier());
.collect(Collectors.toList()); }
return identifiers;
} }
} }

View File

@ -26,12 +26,11 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -88,7 +87,7 @@ public class TestConnectionStatusAnalytics {
final FlowFileEvent flowFileEvent = Mockito.mock(FlowFileEvent.class); final FlowFileEvent flowFileEvent = Mockito.mock(FlowFileEvent.class);
final List<Connection> connections = new ArrayList<>(); final Set<Connection> connections = new HashSet<>();
final String connectionIdentifier = "1"; final String connectionIdentifier = "1";
connections.add(connection); connections.add(connection);
@ -96,7 +95,7 @@ public class TestConnectionStatusAnalytics {
when(flowFileQueue.getBackPressureObjectThreshold()).thenReturn(100L); when(flowFileQueue.getBackPressureObjectThreshold()).thenReturn(100L);
when(connection.getIdentifier()).thenReturn(connectionIdentifier); when(connection.getIdentifier()).thenReturn(connectionIdentifier);
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
when(processGroup.findAllConnections()).thenReturn(connections); when(flowManager.findAllConnections()).thenReturn(connections);
when(flowManager.getRootGroup()).thenReturn(processGroup); when(flowManager.getRootGroup()).thenReturn(processGroup);
when(flowFileEvent.getContentSizeIn()).thenReturn(10L); when(flowFileEvent.getContentSizeIn()).thenReturn(10L);
when(flowFileEvent.getContentSizeOut()).thenReturn(10L); when(flowFileEvent.getContentSizeOut()).thenReturn(10L);