From 5cb928131ca418c67827a536b02a046b34d14c7f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 25 May 2017 15:31:43 -0400 Subject: [PATCH] NIFI-3981: When serializing flow to cluster, use the Scheduled State of ports as they are configured to be, not the current state, since the current state may change as soon as the FlowController has finished initializing This closes #1861. Signed-off-by: Bryan Bende --- .../nifi/controller/FlowController.java | 51 ++++++++++++++++++- .../controller/StandardFlowSynchronizer.java | 12 ++--- .../serialization/ScheduledStateLookup.java | 15 +++++- .../serialization/StandardFlowSerializer.java | 16 +++--- 4 files changed, 78 insertions(+), 16 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 34ea266d7a..aef6d46d40 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -1518,7 +1518,29 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException { readLock.lock(); try { - final ScheduledStateLookup scheduledStateLookup = procNode -> startConnectablesAfterInitialization.contains(procNode) ? ScheduledState.RUNNING : procNode.getScheduledState(); + final ScheduledStateLookup scheduledStateLookup = new ScheduledStateLookup() { + @Override + public ScheduledState getScheduledState(final ProcessorNode procNode) { + if (startConnectablesAfterInitialization.contains(procNode)) { + return ScheduledState.RUNNING; + } + + return procNode.getScheduledState(); + } + + @Override + public ScheduledState getScheduledState(final Port port) { + if (startConnectablesAfterInitialization.contains(port)) { + return ScheduledState.RUNNING; + } + if (startRemoteGroupPortsAfterInitialization.contains(port)) { + return ScheduledState.RUNNING; + } + + return port.getScheduledState(); + } + }; + serializer.serialize(this, os, scheduledStateLookup); } finally { readLock.unlock(); @@ -2922,6 +2944,33 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + public void stopConnectable(final Connectable connectable) { + final ProcessGroup group = requireNonNull(connectable).getProcessGroup(); + + writeLock.lock(); + try { + switch (requireNonNull(connectable).getConnectableType()) { + case FUNNEL: + // Ignore. We don't support stopping funnels. + break; + case INPUT_PORT: + case REMOTE_INPUT_PORT: + startConnectablesAfterInitialization.remove(connectable); + group.stopInputPort((Port) connectable); + break; + case OUTPUT_PORT: + case REMOTE_OUTPUT_PORT: + startConnectablesAfterInitialization.remove(connectable); + group.stopOutputPort((Port) connectable); + break; + default: + throw new IllegalArgumentException(); + } + } finally { + writeLock.unlock(); + } + } + public void startTransmitting(final RemoteGroupPort remoteGroupPort) { writeLock.lock(); try { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 975f95425d..6f1e8e1055 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -774,19 +774,19 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { case DISABLED: // switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing), // and then we have to disable it. - port.getProcessGroup().stopInputPort(port); + controller.stopConnectable(port); port.getProcessGroup().disableInputPort(port); break; case RUNNING: // we want to run now. Make sure processor is not disabled and then start it. port.getProcessGroup().enableInputPort(port); - port.getProcessGroup().startInputPort(port); + controller.startConnectable(port); break; case STOPPED: if (port.getScheduledState() == ScheduledState.DISABLED) { port.getProcessGroup().enableInputPort(port); } else if (port.getScheduledState() == ScheduledState.RUNNING) { - port.getProcessGroup().stopInputPort(port); + controller.stopConnectable(port); } break; } @@ -803,19 +803,19 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { case DISABLED: // switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing), // and then we have to disable it. - port.getProcessGroup().stopOutputPort(port); + controller.stopConnectable(port); port.getProcessGroup().disableOutputPort(port); break; case RUNNING: // we want to run now. Make sure processor is not disabled and then start it. port.getProcessGroup().enableOutputPort(port); - port.getProcessGroup().startOutputPort(port); + controller.startConnectable(port); break; case STOPPED: if (port.getScheduledState() == ScheduledState.DISABLED) { port.getProcessGroup().enableOutputPort(port); } else if (port.getScheduledState() == ScheduledState.RUNNING) { - port.getProcessGroup().stopOutputPort(port); + controller.stopConnectable(port); } break; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java index 07f6017b5c..39693b8987 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.serialization; +import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; @@ -24,5 +25,17 @@ public interface ScheduledStateLookup { ScheduledState getScheduledState(ProcessorNode procNode); - public static final ScheduledStateLookup IDENTITY_LOOKUP = ProcessorNode::getScheduledState; + ScheduledState getScheduledState(Port port); + + public static final ScheduledStateLookup IDENTITY_LOOKUP = new ScheduledStateLookup() { + @Override + public ScheduledState getScheduledState(final ProcessorNode procNode) { + return procNode.getScheduledState(); + } + + @Override + public ScheduledState getScheduledState(final Port port) { + return port.getScheduledState(); + } + }; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index fea1ecb641..702932cce6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -159,19 +159,19 @@ public class StandardFlowSerializer implements FlowSerializer { if (group.isRootGroup()) { for (final Port port : group.getInputPorts()) { - addRootGroupPort(element, (RootGroupPort) port, "inputPort"); + addRootGroupPort(element, (RootGroupPort) port, "inputPort", scheduledStateLookup); } for (final Port port : group.getOutputPorts()) { - addRootGroupPort(element, (RootGroupPort) port, "outputPort"); + addRootGroupPort(element, (RootGroupPort) port, "outputPort", scheduledStateLookup); } } else { for (final Port port : group.getInputPorts()) { - addPort(element, port, "inputPort"); + addPort(element, port, "inputPort", scheduledStateLookup); } for (final Port port : group.getOutputPorts()) { - addPort(element, port, "outputPort"); + addPort(element, port, "outputPort", scheduledStateLookup); } } @@ -330,7 +330,7 @@ public class StandardFlowSerializer implements FlowSerializer { parentElement.appendChild(element); } - private void addPort(final Element parentElement, final Port port, final String elementName) { + private void addPort(final Element parentElement, final Port port, final String elementName, final ScheduledStateLookup scheduledStateLookup) { final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement(elementName); parentElement.appendChild(element); @@ -338,12 +338,12 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "name", port.getName()); addPosition(element, port.getPosition()); addTextElement(element, "comments", port.getComments()); - addTextElement(element, "scheduledState", port.getScheduledState().name()); + addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name()); parentElement.appendChild(element); } - private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName) { + private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName, final ScheduledStateLookup scheduledStateLookup) { final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement(elementName); parentElement.appendChild(element); @@ -351,7 +351,7 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "name", port.getName()); addPosition(element, port.getPosition()); addTextElement(element, "comments", port.getComments()); - addTextElement(element, "scheduledState", port.getScheduledState().name()); + addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name()); addTextElement(element, "maxConcurrentTasks", String.valueOf(port.getMaxConcurrentTasks())); for (final String user : port.getUserAccessControl()) { addTextElement(element, "userAccessControl", user);