diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 425110c527..8a79acfe65 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -73,6 +73,7 @@ import org.apache.nifi.controller.serialization.FlowSynchronizer; import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.controller.service.ControllerServiceLoader; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.fingerprint.FingerprintException; @@ -749,6 +750,19 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + private ControllerServiceState getFinalTransitionState(final ControllerServiceState state) { + switch (state) { + case DISABLED: + case DISABLING: + return ControllerServiceState.DISABLED; + case ENABLED: + case ENABLING: + return ControllerServiceState.ENABLED; + default: + throw new AssertionError(); + } + } + private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException { @@ -779,6 +793,36 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // get the real process group and ID final ProcessGroup processGroup = controller.getGroup(processGroupDto.getId()); + // determine the scheduled state of all of the Controller Service + final List controllerServiceNodeList = getChildrenByTagName(processGroupElement, "controllerService"); + final Set toDisable = new HashSet<>(); + final Set toEnable = new HashSet<>(); + + for (final Element serviceElement : controllerServiceNodeList) { + final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(serviceElement, encryptor); + final ControllerServiceNode serviceNode = processGroup.getControllerService(dto.getId()); + + // Check if the controller service is in the correct state. We consider it the correct state if + // we are in a transitional state and heading in the right direction or already in the correct state. + // E.g., it is the correct state if it should be 'DISABLED' and it is either DISABLED or DISABLING. + final ControllerServiceState serviceState = getFinalTransitionState(serviceNode.getState()); + final ControllerServiceState clusterState = getFinalTransitionState(ControllerServiceState.valueOf(dto.getState())); + + if (serviceState != clusterState) { + switch (clusterState) { + case DISABLED: + toDisable.add(serviceNode); + break; + case ENABLED: + toEnable.add(serviceNode); + break; + } + } + } + + controller.disableControllerServicesAsync(toDisable); + controller.enableControllerServices(toEnable); + // processors & ports cannot be updated - they must be the same. Except for the scheduled state. final List processorNodeList = getChildrenByTagName(processGroupElement, "processor"); for (final Element processorElement : processorNodeList) {