diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 34ea266d7a..aef6d46d40 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -1518,7 +1518,29 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException { readLock.lock(); try { - final ScheduledStateLookup scheduledStateLookup = procNode -> startConnectablesAfterInitialization.contains(procNode) ? ScheduledState.RUNNING : procNode.getScheduledState(); + final ScheduledStateLookup scheduledStateLookup = new ScheduledStateLookup() { + @Override + public ScheduledState getScheduledState(final ProcessorNode procNode) { + if (startConnectablesAfterInitialization.contains(procNode)) { + return ScheduledState.RUNNING; + } + + return procNode.getScheduledState(); + } + + @Override + public ScheduledState getScheduledState(final Port port) { + if (startConnectablesAfterInitialization.contains(port)) { + return ScheduledState.RUNNING; + } + if (startRemoteGroupPortsAfterInitialization.contains(port)) { + return ScheduledState.RUNNING; + } + + return port.getScheduledState(); + } + }; + serializer.serialize(this, os, scheduledStateLookup); } finally { readLock.unlock(); @@ -2922,6 +2944,33 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + public void stopConnectable(final Connectable connectable) { + final ProcessGroup group = requireNonNull(connectable).getProcessGroup(); + + writeLock.lock(); + try { + switch (requireNonNull(connectable).getConnectableType()) { + case FUNNEL: + // Ignore. We don't support stopping funnels. + break; + case INPUT_PORT: + case REMOTE_INPUT_PORT: + startConnectablesAfterInitialization.remove(connectable); + group.stopInputPort((Port) connectable); + break; + case OUTPUT_PORT: + case REMOTE_OUTPUT_PORT: + startConnectablesAfterInitialization.remove(connectable); + group.stopOutputPort((Port) connectable); + break; + default: + throw new IllegalArgumentException(); + } + } finally { + writeLock.unlock(); + } + } + public void startTransmitting(final RemoteGroupPort remoteGroupPort) { writeLock.lock(); try { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 975f95425d..6f1e8e1055 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -774,19 +774,19 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { case DISABLED: // switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing), // and then we have to disable it. - port.getProcessGroup().stopInputPort(port); + controller.stopConnectable(port); port.getProcessGroup().disableInputPort(port); break; case RUNNING: // we want to run now. Make sure processor is not disabled and then start it. port.getProcessGroup().enableInputPort(port); - port.getProcessGroup().startInputPort(port); + controller.startConnectable(port); break; case STOPPED: if (port.getScheduledState() == ScheduledState.DISABLED) { port.getProcessGroup().enableInputPort(port); } else if (port.getScheduledState() == ScheduledState.RUNNING) { - port.getProcessGroup().stopInputPort(port); + controller.stopConnectable(port); } break; } @@ -803,19 +803,19 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { case DISABLED: // switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing), // and then we have to disable it. - port.getProcessGroup().stopOutputPort(port); + controller.stopConnectable(port); port.getProcessGroup().disableOutputPort(port); break; case RUNNING: // we want to run now. Make sure processor is not disabled and then start it. port.getProcessGroup().enableOutputPort(port); - port.getProcessGroup().startOutputPort(port); + controller.startConnectable(port); break; case STOPPED: if (port.getScheduledState() == ScheduledState.DISABLED) { port.getProcessGroup().enableOutputPort(port); } else if (port.getScheduledState() == ScheduledState.RUNNING) { - port.getProcessGroup().stopOutputPort(port); + controller.stopConnectable(port); } break; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java index 07f6017b5c..39693b8987 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.serialization; +import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; @@ -24,5 +25,17 @@ public interface ScheduledStateLookup { ScheduledState getScheduledState(ProcessorNode procNode); - public static final ScheduledStateLookup IDENTITY_LOOKUP = ProcessorNode::getScheduledState; + ScheduledState getScheduledState(Port port); + + public static final ScheduledStateLookup IDENTITY_LOOKUP = new ScheduledStateLookup() { + @Override + public ScheduledState getScheduledState(final ProcessorNode procNode) { + return procNode.getScheduledState(); + } + + @Override + public ScheduledState getScheduledState(final Port port) { + return port.getScheduledState(); + } + }; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index fea1ecb641..702932cce6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -159,19 +159,19 @@ public class StandardFlowSerializer implements FlowSerializer { if (group.isRootGroup()) { for (final Port port : group.getInputPorts()) { - addRootGroupPort(element, (RootGroupPort) port, "inputPort"); + addRootGroupPort(element, (RootGroupPort) port, "inputPort", scheduledStateLookup); } for (final Port port : group.getOutputPorts()) { - addRootGroupPort(element, (RootGroupPort) port, "outputPort"); + addRootGroupPort(element, (RootGroupPort) port, "outputPort", scheduledStateLookup); } } else { for (final Port port : group.getInputPorts()) { - addPort(element, port, "inputPort"); + addPort(element, port, "inputPort", scheduledStateLookup); } for (final Port port : group.getOutputPorts()) { - addPort(element, port, "outputPort"); + addPort(element, port, "outputPort", scheduledStateLookup); } } @@ -330,7 +330,7 @@ public class StandardFlowSerializer implements FlowSerializer { parentElement.appendChild(element); } - private void addPort(final Element parentElement, final Port port, final String elementName) { + private void addPort(final Element parentElement, final Port port, final String elementName, final ScheduledStateLookup scheduledStateLookup) { final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement(elementName); parentElement.appendChild(element); @@ -338,12 +338,12 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "name", port.getName()); addPosition(element, port.getPosition()); addTextElement(element, "comments", port.getComments()); - addTextElement(element, "scheduledState", port.getScheduledState().name()); + addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name()); parentElement.appendChild(element); } - private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName) { + private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName, final ScheduledStateLookup scheduledStateLookup) { final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement(elementName); parentElement.appendChild(element); @@ -351,7 +351,7 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "name", port.getName()); addPosition(element, port.getPosition()); addTextElement(element, "comments", port.getComments()); - addTextElement(element, "scheduledState", port.getScheduledState().name()); + addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name()); addTextElement(element, "maxConcurrentTasks", String.valueOf(port.getMaxConcurrentTasks())); for (final String user : port.getUserAccessControl()) { addTextElement(element, "userAccessControl", user);