From 6cea5ea520f48c5d05a30df93fa582cf42d3438e Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 25 Feb 2022 11:32:27 -0500 Subject: [PATCH] NIFI-9729: When restarting components in the VersionedFlowSynchronizer, first filter out any components that are intended to be stopped. Signed-off-by: Joe Gresock This closes #5806. --- .../nifi/groups/ProcessGroupSynchronizer.java | 6 + .../StandardProcessGroupSynchronizer.java | 55 +++++--- .../serialization/AffectedComponentSet.java | 10 ++ .../serialization/ComponentSetFilter.java | 81 ++++++++++++ .../RunningComponentSetFilter.java | 117 ++++++++++++++++++ .../VersionedFlowSynchronizer.java | 7 +- .../clustering/FlowSynchronizationIT.java | 37 ++++++ 7 files changed, 298 insertions(+), 15 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/RunningComponentSetFilter.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java index 4f7a309425..688ea1c31f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java @@ -23,6 +23,12 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshot; public interface ProcessGroupSynchronizer { + /** + * Synchronize the given Process Group to match the proposed snaphsot + * @param group the Process Group to update + * @param proposedSnapshot the proposed/desired state for the process group + * @param synchronizationOptions options for how to synchronize the group + */ void synchronize(ProcessGroup group, VersionedFlowSnapshot proposedSnapshot, GroupSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException; void verifyCanSynchronize(ProcessGroup group, VersionedProcessGroup proposed, boolean verifyConnectionRemoval); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java index 39004a50b6..7ec6231ccc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java @@ -108,6 +108,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -1696,33 +1697,59 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize if (proposed.getInputPorts() != null) { for (final VersionedRemoteGroupPort port : proposed.getInputPorts()) { - if (port.getScheduledState() != org.apache.nifi.flow.ScheduledState.RUNNING) { - continue; - } - - final String portId = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), rpg.getIdentifier()); - final RemoteGroupPort remoteGroupPort = rpg.getInputPort(portId); + final RemoteGroupPort remoteGroupPort = getRpgInputPort(port, rpg, componentIdGenerator); if (remoteGroupPort != null) { - context.getComponentScheduler().startComponent(remoteGroupPort); + synchronizeTransmissionState(port, remoteGroupPort); } } } if (proposed.getOutputPorts() != null) { for (final VersionedRemoteGroupPort port : proposed.getOutputPorts()) { - if (port.getScheduledState() != org.apache.nifi.flow.ScheduledState.RUNNING) { - continue; - } - - final String portId = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), rpg.getIdentifier()); - final RemoteGroupPort remoteGroupPort = rpg.getOutputPort(portId); + final RemoteGroupPort remoteGroupPort = getRpgOutputPort(port, rpg, componentIdGenerator); if (remoteGroupPort != null) { - context.getComponentScheduler().startComponent(remoteGroupPort); + synchronizeTransmissionState(port, remoteGroupPort); } } } } + private RemoteGroupPort getRpgInputPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator) { + return getRpgPort(port, rpg, componentIdGenerator, RemoteProcessGroup::getInputPort); + } + + private RemoteGroupPort getRpgOutputPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator) { + return getRpgPort(port, rpg, componentIdGenerator, RemoteProcessGroup::getOutputPort); + } + + private RemoteGroupPort getRpgPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator, + final BiFunction portLookup) { + final String instanceId = port.getInstanceIdentifier(); + if (instanceId != null) { + final RemoteGroupPort remoteGroupPort = portLookup.apply(rpg, instanceId); + if (remoteGroupPort != null) { + return remoteGroupPort; + } + } + + final String portId = componentIdGenerator.generateUuid(port.getIdentifier(), port.getInstanceIdentifier(), rpg.getIdentifier()); + final RemoteGroupPort remoteGroupPort = portLookup.apply(rpg, portId); + return remoteGroupPort; + } + + private void synchronizeTransmissionState(final VersionedRemoteGroupPort versionedPort, final RemoteGroupPort remoteGroupPort) { + final ScheduledState portState = remoteGroupPort.getScheduledState(); + + if (versionedPort.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) { + if (portState != ScheduledState.RUNNING) { + context.getComponentScheduler().startComponent(remoteGroupPort); + } + } else { + if (portState == ScheduledState.RUNNING) { + remoteGroupPort.getRemoteProcessGroup().stopTransmitting(remoteGroupPort); + } + } + } private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed, final ComponentIdGenerator componentIdGenerator, final String rpgId) { final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java index b62881ef0b..d0b7970bca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java @@ -482,6 +482,16 @@ public class AffectedComponentSet { reportingTasks.forEach(flowController::startReportingTask); } + public void removeComponents(final ComponentSetFilter filter) { + inputPorts.removeIf(filter::testInputPort); + outputPorts.removeIf(filter::testOutputPort); + remoteInputPorts.removeIf(filter::testRemoteInputPort); + remoteOutputPorts.removeIf(filter::testRemoteOutputPort); + processors.removeIf(filter::testProcessor); + controllerServices.removeIf(filter::testControllerService); + reportingTasks.removeIf(filter::testReportingTask); + } + /** * Returns a new AffectedComponentSet that represents only those components that currently exist within the NiFi instance. When a set of dataflow updates have occurred, it is very possible * that one or more components referred to by the AffectedComponentSet no longer exist (for example, there was a dataflow update that removed a Processor, so that Processor no longer exists). diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java new file mode 100644 index 0000000000..6edc1559fb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.serialization; + +import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.remote.RemoteGroupPort; + +public interface ComponentSetFilter { + boolean testProcessor(ProcessorNode processor); + + boolean testReportingTask(ReportingTaskNode reportingTask); + + boolean testControllerService(ControllerServiceNode controllerService); + + boolean testInputPort(Port port); + + boolean testOutputPort(Port port); + + boolean testRemoteInputPort(RemoteGroupPort port); + + boolean testRemoteOutputPort(RemoteGroupPort port); + + default ComponentSetFilter reverse() { + final ComponentSetFilter original = this; + + return new ComponentSetFilter() { + @Override + public boolean testProcessor(final ProcessorNode processor) { + return !original.testProcessor(processor); + } + + @Override + public boolean testReportingTask(final ReportingTaskNode reportingTask) { + return !original.testReportingTask(reportingTask); + } + + @Override + public boolean testControllerService(final ControllerServiceNode controllerService) { + return !original.testControllerService(controllerService); + } + + @Override + public boolean testInputPort(final Port port) { + return !original.testInputPort(port); + } + + @Override + public boolean testOutputPort(final Port port) { + return !original.testOutputPort(port); + } + + @Override + public boolean testRemoteInputPort(final RemoteGroupPort port) { + return !original.testRemoteInputPort(port); + } + + @Override + public boolean testRemoteOutputPort(final RemoteGroupPort port) { + return !original.testRemoteOutputPort(port); + } + }; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/RunningComponentSetFilter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/RunningComponentSetFilter.java new file mode 100644 index 0000000000..c141144b1c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/RunningComponentSetFilter.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.serialization; + +import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.flow.VersionedDataflow; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.flow.ScheduledState; +import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedPort; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.flow.VersionedRemoteGroupPort; +import org.apache.nifi.flow.VersionedRemoteProcessGroup; +import org.apache.nifi.flow.VersionedReportingTask; +import org.apache.nifi.remote.RemoteGroupPort; + +import java.util.HashMap; +import java.util.Map; + +public class RunningComponentSetFilter implements ComponentSetFilter { + private final Map controllerServices = new HashMap<>(); + private final Map processors = new HashMap<>(); + private final Map reportingTasks = new HashMap<>(); + private final Map inputPorts = new HashMap<>(); + private final Map outputPorts = new HashMap<>(); + private final Map remoteInputPorts = new HashMap<>(); + private final Map remoteOutputPorts = new HashMap<>(); + + public RunningComponentSetFilter(final VersionedDataflow dataflow) { + dataflow.getControllerServices().forEach(service -> controllerServices.put(service.getInstanceIdentifier(), service)); + dataflow.getReportingTasks().forEach(task -> reportingTasks.put(task.getInstanceIdentifier(), task)); + flatten(dataflow.getRootGroup()); + } + + private void flatten(final VersionedProcessGroup group) { + group.getInputPorts().forEach(port -> inputPorts.put(port.getInstanceIdentifier(), port)); + group.getOutputPorts().forEach(port -> outputPorts.put(port.getInstanceIdentifier(), port)); + group.getControllerServices().forEach(service -> controllerServices.put(service.getInstanceIdentifier(), service)); + group.getProcessors().forEach(processor -> processors.put(processor.getInstanceIdentifier(), processor)); + + for (final VersionedRemoteProcessGroup rpg : group.getRemoteProcessGroups()) { + rpg.getInputPorts().forEach(port -> { + if (port.getInstanceIdentifier() != null) { + remoteInputPorts.put(port.getInstanceIdentifier(), port); + } + }); + + rpg.getOutputPorts().forEach(port -> { + if (port.getInstanceIdentifier() != null) { + remoteOutputPorts.put(port.getInstanceIdentifier(), port); + } + }); + } + + group.getProcessGroups().forEach(this::flatten); + } + + @Override + public boolean testProcessor(final ProcessorNode processor) { + final VersionedProcessor versionedProcessor = processors.get(processor.getIdentifier()); + return versionedProcessor != null && versionedProcessor.getScheduledState() == ScheduledState.RUNNING; + } + + @Override + public boolean testReportingTask(final ReportingTaskNode reportingTask) { + final VersionedReportingTask versionedReportingTask = reportingTasks.get(reportingTask.getIdentifier()); + return versionedReportingTask != null && versionedReportingTask.getScheduledState() == ScheduledState.RUNNING; + } + + @Override + public boolean testControllerService(final ControllerServiceNode controllerService) { + final VersionedControllerService versionedService = controllerServices.get(controllerService.getIdentifier()); + return versionedService != null && versionedService.getScheduledState() == ScheduledState.ENABLED; + } + + @Override + public boolean testInputPort(final Port port) { + final VersionedPort versionedPort = inputPorts.get(port.getIdentifier()); + return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING; + } + + @Override + public boolean testOutputPort(final Port port) { + final VersionedPort versionedPort = outputPorts.get(port.getIdentifier()); + return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING; + } + + @Override + public boolean testRemoteInputPort(final RemoteGroupPort port) { + final VersionedRemoteGroupPort versionedPort = remoteInputPorts.get(port.getIdentifier()); + return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING; + } + + @Override + public boolean testRemoteOutputPort(final RemoteGroupPort port) { + final VersionedRemoteGroupPort versionedPort = remoteOutputPorts.get(port.getIdentifier()); + return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java index cd2fef089d..226210344d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java @@ -186,7 +186,12 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { } finally { // We have to call toExistingSet() here because some of the components that existed in the active set may no longer exist, // so attempting to start them will fail. - activeSet.toExistingSet().toStartableSet().start(); + final AffectedComponentSet startable = activeSet.toExistingSet().toStartableSet(); + + final ComponentSetFilter runningComponentFilter = new RunningComponentSetFilter(proposedFlow.getVersionedDataflow()); + final ComponentSetFilter stoppedComponentFilter = runningComponentFilter.reverse(); + startable.removeComponents(stoppedComponentFilter); + startable.start(); } final long millis = System.currentTimeMillis() - start; diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java index b7dcad12dd..bba606171f 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java @@ -437,6 +437,43 @@ public class FlowSynchronizationIT extends NiFiSystemIT { waitFor(() -> isNodeDisconnectedDueToMissingConnection(5672, connection.getId())); } + @Test + public void testComponentStatesRestoredOnReconnect() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + final ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success"); + + getClientUtil().startProcessor(generate); + waitForQueueCount(connection.getId(), 2); + + // Shut down node 2 + disconnectNode(2); + + getClientUtil().stopProcessor(generate); + getClientUtil().startProcessor(terminate); + + waitForQueueCount(connection.getId(), 0); + + reconnectNode(2); + waitForAllNodesConnected(); + + getClientUtil().waitForStoppedProcessor(generate.getId()); + waitForQueueCount(connection.getId(), 0); + + switchClientToNode(2); + + // Ensure that Node 2 has the correct state for each processor. + waitFor(() -> { + final ProcessorEntity latestTerminate = getNifiClient().getProcessorClient(DO_NOT_REPLICATE).getProcessor(terminate.getId()); + return "RUNNING".equalsIgnoreCase(latestTerminate.getComponent().getState()); + }); + + waitFor(() -> { + final ProcessorEntity latestGenerate = getNifiClient().getProcessorClient(DO_NOT_REPLICATE).getProcessor(generate.getId()); + return "STOPPED".equalsIgnoreCase(latestGenerate.getComponent().getState()); + }); + } + private boolean isNodeDisconnectedDueToMissingConnection(final int nodeApiPort, final String connectionId) throws NiFiClientException, IOException { final NodeDTO node2Dto = getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream() .filter(dto -> dto.getApiPort() == nodeApiPort)