NIFI-9729: When restarting components in the VersionedFlowSynchronizer, first filter out any components that are intended to be stopped.

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5806.
This commit is contained in:
Mark Payne 2022-02-25 11:32:27 -05:00 committed by Joe Gresock
parent e74991e705
commit 6cea5ea520
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
7 changed files with 298 additions and 15 deletions

View File

@ -23,6 +23,12 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
public interface ProcessGroupSynchronizer {
/**
* Synchronize the given Process Group to match the proposed snaphsot
* @param group the Process Group to update
* @param proposedSnapshot the proposed/desired state for the process group
* @param synchronizationOptions options for how to synchronize the group
*/
void synchronize(ProcessGroup group, VersionedFlowSnapshot proposedSnapshot, GroupSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException;
void verifyCanSynchronize(ProcessGroup group, VersionedProcessGroup proposed, boolean verifyConnectionRemoval);

View File

@ -108,6 +108,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -1696,33 +1697,59 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
if (proposed.getInputPorts() != null) {
for (final VersionedRemoteGroupPort port : proposed.getInputPorts()) {
if (port.getScheduledState() != org.apache.nifi.flow.ScheduledState.RUNNING) {
continue;
}
final String portId = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), rpg.getIdentifier());
final RemoteGroupPort remoteGroupPort = rpg.getInputPort(portId);
final RemoteGroupPort remoteGroupPort = getRpgInputPort(port, rpg, componentIdGenerator);
if (remoteGroupPort != null) {
context.getComponentScheduler().startComponent(remoteGroupPort);
synchronizeTransmissionState(port, remoteGroupPort);
}
}
}
if (proposed.getOutputPorts() != null) {
for (final VersionedRemoteGroupPort port : proposed.getOutputPorts()) {
if (port.getScheduledState() != org.apache.nifi.flow.ScheduledState.RUNNING) {
continue;
}
final String portId = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), rpg.getIdentifier());
final RemoteGroupPort remoteGroupPort = rpg.getOutputPort(portId);
final RemoteGroupPort remoteGroupPort = getRpgOutputPort(port, rpg, componentIdGenerator);
if (remoteGroupPort != null) {
context.getComponentScheduler().startComponent(remoteGroupPort);
synchronizeTransmissionState(port, remoteGroupPort);
}
}
}
}
private RemoteGroupPort getRpgInputPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator) {
return getRpgPort(port, rpg, componentIdGenerator, RemoteProcessGroup::getInputPort);
}
private RemoteGroupPort getRpgOutputPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator) {
return getRpgPort(port, rpg, componentIdGenerator, RemoteProcessGroup::getOutputPort);
}
private RemoteGroupPort getRpgPort(final VersionedRemoteGroupPort port, final RemoteProcessGroup rpg, final ComponentIdGenerator componentIdGenerator,
final BiFunction<RemoteProcessGroup, String, RemoteGroupPort> portLookup) {
final String instanceId = port.getInstanceIdentifier();
if (instanceId != null) {
final RemoteGroupPort remoteGroupPort = portLookup.apply(rpg, instanceId);
if (remoteGroupPort != null) {
return remoteGroupPort;
}
}
final String portId = componentIdGenerator.generateUuid(port.getIdentifier(), port.getInstanceIdentifier(), rpg.getIdentifier());
final RemoteGroupPort remoteGroupPort = portLookup.apply(rpg, portId);
return remoteGroupPort;
}
private void synchronizeTransmissionState(final VersionedRemoteGroupPort versionedPort, final RemoteGroupPort remoteGroupPort) {
final ScheduledState portState = remoteGroupPort.getScheduledState();
if (versionedPort.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
if (portState != ScheduledState.RUNNING) {
context.getComponentScheduler().startComponent(remoteGroupPort);
}
} else {
if (portState == ScheduledState.RUNNING) {
remoteGroupPort.getRemoteProcessGroup().stopTransmitting(remoteGroupPort);
}
}
}
private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed, final ComponentIdGenerator componentIdGenerator, final String rpgId) {
final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();

View File

@ -482,6 +482,16 @@ public class AffectedComponentSet {
reportingTasks.forEach(flowController::startReportingTask);
}
public void removeComponents(final ComponentSetFilter filter) {
inputPorts.removeIf(filter::testInputPort);
outputPorts.removeIf(filter::testOutputPort);
remoteInputPorts.removeIf(filter::testRemoteInputPort);
remoteOutputPorts.removeIf(filter::testRemoteOutputPort);
processors.removeIf(filter::testProcessor);
controllerServices.removeIf(filter::testControllerService);
reportingTasks.removeIf(filter::testReportingTask);
}
/**
* Returns a new AffectedComponentSet that represents only those components that currently exist within the NiFi instance. When a set of dataflow updates have occurred, it is very possible
* that one or more components referred to by the AffectedComponentSet no longer exist (for example, there was a dataflow update that removed a Processor, so that Processor no longer exists).

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.serialization;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.remote.RemoteGroupPort;
public interface ComponentSetFilter {
boolean testProcessor(ProcessorNode processor);
boolean testReportingTask(ReportingTaskNode reportingTask);
boolean testControllerService(ControllerServiceNode controllerService);
boolean testInputPort(Port port);
boolean testOutputPort(Port port);
boolean testRemoteInputPort(RemoteGroupPort port);
boolean testRemoteOutputPort(RemoteGroupPort port);
default ComponentSetFilter reverse() {
final ComponentSetFilter original = this;
return new ComponentSetFilter() {
@Override
public boolean testProcessor(final ProcessorNode processor) {
return !original.testProcessor(processor);
}
@Override
public boolean testReportingTask(final ReportingTaskNode reportingTask) {
return !original.testReportingTask(reportingTask);
}
@Override
public boolean testControllerService(final ControllerServiceNode controllerService) {
return !original.testControllerService(controllerService);
}
@Override
public boolean testInputPort(final Port port) {
return !original.testInputPort(port);
}
@Override
public boolean testOutputPort(final Port port) {
return !original.testOutputPort(port);
}
@Override
public boolean testRemoteInputPort(final RemoteGroupPort port) {
return !original.testRemoteInputPort(port);
}
@Override
public boolean testRemoteOutputPort(final RemoteGroupPort port) {
return !original.testRemoteOutputPort(port);
}
};
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.serialization;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedRemoteGroupPort;
import org.apache.nifi.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.remote.RemoteGroupPort;
import java.util.HashMap;
import java.util.Map;
public class RunningComponentSetFilter implements ComponentSetFilter {
private final Map<String, VersionedControllerService> controllerServices = new HashMap<>();
private final Map<String, VersionedProcessor> processors = new HashMap<>();
private final Map<String, VersionedReportingTask> reportingTasks = new HashMap<>();
private final Map<String, VersionedPort> inputPorts = new HashMap<>();
private final Map<String, VersionedPort> outputPorts = new HashMap<>();
private final Map<String, VersionedRemoteGroupPort> remoteInputPorts = new HashMap<>();
private final Map<String, VersionedRemoteGroupPort> remoteOutputPorts = new HashMap<>();
public RunningComponentSetFilter(final VersionedDataflow dataflow) {
dataflow.getControllerServices().forEach(service -> controllerServices.put(service.getInstanceIdentifier(), service));
dataflow.getReportingTasks().forEach(task -> reportingTasks.put(task.getInstanceIdentifier(), task));
flatten(dataflow.getRootGroup());
}
private void flatten(final VersionedProcessGroup group) {
group.getInputPorts().forEach(port -> inputPorts.put(port.getInstanceIdentifier(), port));
group.getOutputPorts().forEach(port -> outputPorts.put(port.getInstanceIdentifier(), port));
group.getControllerServices().forEach(service -> controllerServices.put(service.getInstanceIdentifier(), service));
group.getProcessors().forEach(processor -> processors.put(processor.getInstanceIdentifier(), processor));
for (final VersionedRemoteProcessGroup rpg : group.getRemoteProcessGroups()) {
rpg.getInputPorts().forEach(port -> {
if (port.getInstanceIdentifier() != null) {
remoteInputPorts.put(port.getInstanceIdentifier(), port);
}
});
rpg.getOutputPorts().forEach(port -> {
if (port.getInstanceIdentifier() != null) {
remoteOutputPorts.put(port.getInstanceIdentifier(), port);
}
});
}
group.getProcessGroups().forEach(this::flatten);
}
@Override
public boolean testProcessor(final ProcessorNode processor) {
final VersionedProcessor versionedProcessor = processors.get(processor.getIdentifier());
return versionedProcessor != null && versionedProcessor.getScheduledState() == ScheduledState.RUNNING;
}
@Override
public boolean testReportingTask(final ReportingTaskNode reportingTask) {
final VersionedReportingTask versionedReportingTask = reportingTasks.get(reportingTask.getIdentifier());
return versionedReportingTask != null && versionedReportingTask.getScheduledState() == ScheduledState.RUNNING;
}
@Override
public boolean testControllerService(final ControllerServiceNode controllerService) {
final VersionedControllerService versionedService = controllerServices.get(controllerService.getIdentifier());
return versionedService != null && versionedService.getScheduledState() == ScheduledState.ENABLED;
}
@Override
public boolean testInputPort(final Port port) {
final VersionedPort versionedPort = inputPorts.get(port.getIdentifier());
return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING;
}
@Override
public boolean testOutputPort(final Port port) {
final VersionedPort versionedPort = outputPorts.get(port.getIdentifier());
return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING;
}
@Override
public boolean testRemoteInputPort(final RemoteGroupPort port) {
final VersionedRemoteGroupPort versionedPort = remoteInputPorts.get(port.getIdentifier());
return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING;
}
@Override
public boolean testRemoteOutputPort(final RemoteGroupPort port) {
final VersionedRemoteGroupPort versionedPort = remoteOutputPorts.get(port.getIdentifier());
return versionedPort != null && versionedPort.getScheduledState() == ScheduledState.RUNNING;
}
}

View File

@ -186,7 +186,12 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
} finally {
// We have to call toExistingSet() here because some of the components that existed in the active set may no longer exist,
// so attempting to start them will fail.
activeSet.toExistingSet().toStartableSet().start();
final AffectedComponentSet startable = activeSet.toExistingSet().toStartableSet();
final ComponentSetFilter runningComponentFilter = new RunningComponentSetFilter(proposedFlow.getVersionedDataflow());
final ComponentSetFilter stoppedComponentFilter = runningComponentFilter.reverse();
startable.removeComponents(stoppedComponentFilter);
startable.start();
}
final long millis = System.currentTimeMillis() - start;

View File

@ -437,6 +437,43 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
waitFor(() -> isNodeDisconnectedDueToMissingConnection(5672, connection.getId()));
}
@Test
public void testComponentStatesRestoredOnReconnect() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
final ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success");
getClientUtil().startProcessor(generate);
waitForQueueCount(connection.getId(), 2);
// Shut down node 2
disconnectNode(2);
getClientUtil().stopProcessor(generate);
getClientUtil().startProcessor(terminate);
waitForQueueCount(connection.getId(), 0);
reconnectNode(2);
waitForAllNodesConnected();
getClientUtil().waitForStoppedProcessor(generate.getId());
waitForQueueCount(connection.getId(), 0);
switchClientToNode(2);
// Ensure that Node 2 has the correct state for each processor.
waitFor(() -> {
final ProcessorEntity latestTerminate = getNifiClient().getProcessorClient(DO_NOT_REPLICATE).getProcessor(terminate.getId());
return "RUNNING".equalsIgnoreCase(latestTerminate.getComponent().getState());
});
waitFor(() -> {
final ProcessorEntity latestGenerate = getNifiClient().getProcessorClient(DO_NOT_REPLICATE).getProcessor(generate.getId());
return "STOPPED".equalsIgnoreCase(latestGenerate.getComponent().getState());
});
}
private boolean isNodeDisconnectedDueToMissingConnection(final int nodeApiPort, final String connectionId) throws NiFiClientException, IOException {
final NodeDTO node2Dto = getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()
.filter(dto -> dto.getApiPort() == nodeApiPort)