diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 0e0c74b7d3..d1822efebf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -772,6 +772,56 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + // Update scheduled state of Remote Group Ports + final List remoteProcessGroupList = getChildrenByTagName(processGroupElement, "remoteProcessGroup"); + for (final Element remoteGroupElement : remoteProcessGroupList) { + final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteGroupElement, encryptor); + final RemoteProcessGroup rpg = processGroup.getRemoteProcessGroup(remoteGroupDto.getId()); + + // input ports + final List inputPortElements = getChildrenByTagName(remoteGroupElement, "inputPort"); + for (final Element inputPortElement : inputPortElements) { + final RemoteProcessGroupPortDescriptor portDescriptor = FlowFromDOMFactory.getRemoteProcessGroupPort(inputPortElement); + final String inputPortId = portDescriptor.getId(); + final RemoteGroupPort inputPort = rpg.getInputPort(inputPortId); + if (inputPort == null) { + continue; + } + + if (portDescriptor.isTransmitting()) { + if (inputPort.getScheduledState() != ScheduledState.RUNNING && inputPort.getScheduledState() != ScheduledState.STARTING) { + rpg.startTransmitting(inputPort); + } + } else { + if (inputPort.getScheduledState() != ScheduledState.STOPPED && inputPort.getScheduledState() != ScheduledState.STOPPING) { + rpg.stopTransmitting(inputPort); + } + } + } + + // output ports + final List outputPortElements = getChildrenByTagName(remoteGroupElement, "outputPort"); + for (final Element outputPortElement : outputPortElements) { + final RemoteProcessGroupPortDescriptor portDescriptor = FlowFromDOMFactory.getRemoteProcessGroupPort(outputPortElement); + final String outputPortId = portDescriptor.getId(); + final RemoteGroupPort outputPort = rpg.getOutputPort(outputPortId); + if (outputPort == null) { + continue; + } + + if (portDescriptor.isTransmitting()) { + if (outputPort.getScheduledState() != ScheduledState.RUNNING && outputPort.getScheduledState() != ScheduledState.STARTING) { + rpg.startTransmitting(outputPort); + } + } else { + if (outputPort.getScheduledState() != ScheduledState.STOPPED && outputPort.getScheduledState() != ScheduledState.STOPPING) { + rpg.stopTransmitting(outputPort); + } + } + } + } + + // add labels final List labelNodeList = getChildrenByTagName(processGroupElement, "label"); for (final Element labelElement : labelNodeList) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 26f83b5ffd..20bdb6046d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -778,7 +778,7 @@ public final class FingerprintFactory { } private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) { - for (final String childName : new String[]{"id", "scheduledState", "maxConcurrentTasks", "useCompression"}) { + for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression"}) { appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName)); } @@ -787,7 +787,6 @@ public final class FingerprintFactory { private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final RemoteProcessGroupPortDTO port) { builder.append(port.getId()); - builder.append(Boolean.TRUE.equals(port.isTransmitting()) ? "RUNNING" : "STOPPED"); builder.append(port.getConcurrentlySchedulableTaskCount()); builder.append(port.getUseCompression()); return builder;