diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 5b2ea41976..3ca28e52cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -138,6 +138,7 @@ import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.controller.serialization.FlowSerializer; import org.apache.nifi.controller.serialization.FlowSynchronizationException; import org.apache.nifi.controller.serialization.FlowSynchronizer; +import org.apache.nifi.controller.serialization.ScheduledStateLookup; import org.apache.nifi.controller.service.ControllerServiceInvocationHandler; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; @@ -1509,7 +1510,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException { readLock.lock(); try { - serializer.serialize(this, os); + final ScheduledStateLookup scheduledStateLookup = procNode -> startConnectablesAfterInitialization.contains(procNode) ? ScheduledState.RUNNING : procNode.getScheduledState(); + serializer.serialize(this, os, scheduledStateLookup); } finally { readLock.unlock(); } @@ -2932,6 +2934,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new IllegalStateException("Cannot find ProcessorNode with ID " + processorId + " within ProcessGroup with ID " + parentGroupId); } group.stopProcessor(node); + // If we are ready to start the processor upon initialization of the controller, don't. + startConnectablesAfterInitialization.remove(node); } public void stopAllProcessors() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 1e229a24dc..a9372a4a34 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -1006,6 +1006,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { controller.startProcessor(processGroup.getIdentifier(), procNode.getIdentifier()); } else if (ScheduledState.DISABLED.equals(scheduledState)) { processGroup.disableProcessor(procNode); + } else if (ScheduledState.STOPPED.equals(scheduledState)) { + controller.stopProcessor(processGroup.getIdentifier(), procNode.getIdentifier()); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java index dae7566ba9..b01f6e3631 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSerializer.java @@ -34,9 +34,10 @@ public interface FlowSerializer { * * @param controller a controller * @param os an output stream to write the configuration to + * @param stateLookup a lookup that can be used to determine the ScheduledState of a Processor * * @throws FlowSerializationException if serialization failed */ - void serialize(FlowController controller, OutputStream os) throws FlowSerializationException; + void serialize(FlowController controller, OutputStream os, ScheduledStateLookup stateLookup) throws FlowSerializationException; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java new file mode 100644 index 0000000000..07f6017b5c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.serialization; + +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ScheduledState; + +public interface ScheduledStateLookup { + + ScheduledState getScheduledState(ProcessorNode procNode); + + public static final ScheduledStateLookup IDENTITY_LOOKUP = ProcessorNode::getScheduledState; +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index cd0276958f..fea1ecb641 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -79,7 +79,7 @@ public class StandardFlowSerializer implements FlowSerializer { } @Override - public void serialize(final FlowController controller, final OutputStream os) throws FlowSerializationException { + public void serialize(final FlowController controller, final OutputStream os, final ScheduledStateLookup scheduledStateLookup) throws FlowSerializationException { try { // create a new, empty document final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); @@ -94,7 +94,7 @@ public class StandardFlowSerializer implements FlowSerializer { doc.appendChild(rootNode); addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount()); addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount()); - addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup"); + addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup", scheduledStateLookup); // Add root-level controller services final Element controllerServicesNode = doc.createElement("controllerServices"); @@ -144,7 +144,7 @@ public class StandardFlowSerializer implements FlowSerializer { parentElement.appendChild(element); } - private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName) { + private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName, final ScheduledStateLookup scheduledStateLookup) { final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement(elementName); parentElement.appendChild(element); @@ -154,7 +154,7 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "comment", group.getComments()); for (final ProcessorNode processor : group.getProcessors()) { - addProcessor(element, processor); + addProcessor(element, processor, scheduledStateLookup); } if (group.isRootGroup()) { @@ -184,7 +184,7 @@ public class StandardFlowSerializer implements FlowSerializer { } for (final ProcessGroup childGroup : group.getProcessGroups()) { - addProcessGroup(element, childGroup, "processGroup"); + addProcessGroup(element, childGroup, "processGroup", scheduledStateLookup); } for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) { @@ -363,7 +363,7 @@ public class StandardFlowSerializer implements FlowSerializer { parentElement.appendChild(element); } - private void addProcessor(final Element parentElement, final ProcessorNode processor) { + private void addProcessor(final Element parentElement, final ProcessorNode processor, final ScheduledStateLookup scheduledStateLookup) { final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement("processor"); parentElement.appendChild(element); @@ -384,7 +384,7 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "yieldPeriod", processor.getYieldPeriod()); addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString()); addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant())); - addTextElement(element, "scheduledState", processor.getScheduledState().name()); + addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(processor).name()); addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name()); addTextElement(element, "executionNode", processor.getExecutionNode().name()); addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java index 0bc216f59a..1957cf4733 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java @@ -23,6 +23,7 @@ import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.controller.serialization.FlowSerializer; +import org.apache.nifi.controller.serialization.ScheduledStateLookup; import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.VolatileBulletinRepository; @@ -96,7 +97,7 @@ public class StandardFlowServiceTest { FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializer.serialize(flowController, baos); + serializer.serialize(flowController, baos, ScheduledStateLookup.IDENTITY_LOOKUP); String expectedFlow = new String(flowBytes).trim(); String actualFlow = new String(baos.toByteArray()).trim(); @@ -120,7 +121,7 @@ public class StandardFlowServiceTest { FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializer.serialize(flowController, baos); + serializer.serialize(flowController, baos, ScheduledStateLookup.IDENTITY_LOOKUP); String expectedFlow = new String(flowBytes).trim(); String actualFlow = new String(baos.toByteArray()).trim(); @@ -140,7 +141,7 @@ public class StandardFlowServiceTest { FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializer.serialize(flowController, baos); + serializer.serialize(flowController, baos, ScheduledStateLookup.IDENTITY_LOOKUP); String expectedFlow = new String(originalBytes).trim(); String actualFlow = new String(baos.toByteArray()).trim(); @@ -162,7 +163,7 @@ public class StandardFlowServiceTest { FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializer.serialize(flowController, baos); + serializer.serialize(flowController, baos, ScheduledStateLookup.IDENTITY_LOOKUP); String expectedFlow = new String(originalBytes).trim(); String actualFlow = new String(baos.toByteArray()).trim(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java index e85edb06b1..f61dcc2ce2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java @@ -92,7 +92,7 @@ public class StandardFlowSerializerTest { // serialize the controller final ByteArrayOutputStream os = new ByteArrayOutputStream(); - serializer.serialize(controller, os); + serializer.serialize(controller, os, ScheduledStateLookup.IDENTITY_LOOKUP); // verify the results contain the serialized string final String serializedFlow = os.toString(StandardCharsets.UTF_8.name());