diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java index c31a957921..944b10acb7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java @@ -34,6 +34,8 @@ public class VersionControlInformationDTO { private Integer version; private Boolean modified; private Boolean current; + private String state; + private String stateExplanation; @ApiModelProperty("The ID of the Process Group that is under version control") public String getGroupId() { @@ -135,4 +137,24 @@ public class VersionControlInformationDTO { public void setCurrent(Boolean current) { this.current = current; } + + @ApiModelProperty(readOnly = true, + value = "The current state of the Process Group, as it relates to the Versioned Flow", + allowableValues = "LOCALLY_MODIFIED_DESCENDANT, LOCALLY_MODIFIED, STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE") + public String getState() { + return state; + } + + public void setState(final String state) { + this.state = state; + } + + @ApiModelProperty(readOnly = true, value = "Explanation of why the group is in the specified state") + public String getStateExplanation() { + return stateExplanation; + } + + public void setStateExplanation(String explanation) { + this.stateExplanation = explanation; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index 2f28963eab..2219d6dcd8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller.service; +import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.VersionedComponent; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; @@ -27,7 +28,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -public interface ControllerServiceNode extends ConfiguredComponent, VersionedComponent { +public interface ControllerServiceNode extends ConfiguredComponent, ConfigurableComponent, VersionedComponent { /** * @return the Process Group that this Controller Service belongs to, or null if the Controller Service diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index d81b7d3a6c..17131dd1b1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -462,11 +462,11 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi /** * @param id of the Controller Service - * @return the Controller Service with the given ID, if it exists as a child or - * descendant of this ProcessGroup. This performs a recursive search of all - * descendant ProcessGroups + * @param includeDescendantGroups whether or not to include descendant process groups + * @param includeAncestorGroups whether or not to include ancestor process groups + * @return the Controller Service with the given ID */ - ControllerServiceNode findControllerService(String id); + ControllerServiceNode findControllerService(String id, boolean includeDescendantGroups, boolean includeAncestorGroups); /** * @return a List of all Controller Services contained within this ProcessGroup and any child Process Groups @@ -976,4 +976,9 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * @param flowRegistry the Flow Registry to synchronize with */ void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry); + + /** + * Called whenever a component within this group or the group itself is modified + */ + void onComponentModified(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index 0dd60704bc..7d92246f40 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -61,9 +61,9 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, void setName(String name); - void setInputPorts(Set ports); + void setInputPorts(Set ports, boolean pruneUnusedPorts); - void setOutputPorts(Set ports); + void setOutputPorts(Set ports, boolean pruneUnusedPorts); Set getInputPorts(); @@ -215,11 +215,6 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, */ void reinitialize(boolean isClustered); - /** - * Removes all non existent ports from this RemoteProcessGroup. - */ - void removeAllNonExistentPorts(); - /** * Removes a port that no longer exists on the remote instance from this * RemoteProcessGroup diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java index b54a1c99a3..1f65a19ea6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java @@ -76,6 +76,11 @@ public interface VersionControlInformation { */ boolean isCurrent(); + /** + * @return the current status of the Process Group as it relates to the associated Versioned Flow. + */ + VersionedFlowStatus getStatus(); + /** * @return the snapshot of the flow that was synchronized with the Flow Registry */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java new file mode 100644 index 0000000000..d20a13f052 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +public enum VersionedFlowState { + + /** + * We are unable to communicate with the Flow Registry in order to determine the appropriate state + */ + SYNC_FAILURE, + + /** + * This Process Group (or a child/descendant Process Group that is not itself under Version Control) + * is on the latest version of the Versioned Flow, but is different than the Versioned Flow that is + * stored in the Flow Registry. + */ + LOCALLY_MODIFIED, + + /** + * This Process Group has not been modified since it was last synchronized with the Flow Registry, but + * the Flow Registry has a newer version of the flow than what is contained in this Process Group. + */ + STALE, + + /** + * This Process Group (or a child/descendant Process Group that is not itself under Version Control) + * has been modified since it was last synchronized with the Flow Registry, and the Flow Registry has + * a newer version of the flow than what is contained in this Process Group. + */ + LOCALLY_MODIFIED_AND_STALE, + + /** + * This Process Group and all child/descendant Process Groups are on the latest version of the flow in + * the Flow Registry and have no local modifications. + */ + UP_TO_DATE; +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java new file mode 100644 index 0000000000..9b58d9ae17 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowStatus.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow; + +public interface VersionedFlowStatus { + + /** + * @return the current state of the versioned process group + */ + VersionedFlowState getState(); + + /** + * @return an explanation of why the process group is in the state that it is in. + */ + String getStateExplanation(); +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 3909387688..2afa9dcfb7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,6 +16,39 @@ */ package org.apache.nifi.controller; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLContext; + import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -169,7 +202,6 @@ import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedProcessGroup; -import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.remote.HttpRemoteSiteListener; @@ -225,38 +257,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import static java.util.Objects.requireNonNull; - public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider, IdentifierLookup, ReloadComponent { @@ -1983,14 +1983,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (remoteGroupDTO.getContents() != null) { final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents(); - // ensure there input ports + // ensure there are input ports if (contents.getInputPorts() != null) { - remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts())); + remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()), false); } // ensure there are output ports if (contents.getOutputPorts() != null) { - remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts())); + remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()), false); } } @@ -2035,12 +2035,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R instantiateSnippet(childGroup, childTemplateDTO, false); if (groupDTO.getVersionControlInformation() != null) { - final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup versionedGroup = flowMapper.mapProcessGroup(childGroup, getFlowRegistryClient(), false); - final VersionControlInformation vci = StandardVersionControlInformation.Builder .fromDto(groupDTO.getVersionControlInformation()) - .flowSnapshot(versionedGroup) .build(); childGroup.setVersionControlInformation(vci, Collections.emptyMap()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 71a587c171..28d9b791b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -931,6 +931,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final Label label = controller.createLabel(labelDTO.getId(), labelDTO.getLabel()); label.setStyle(labelDTO.getStyle()); label.setPosition(new Position(labelDTO.getPosition().getX(), labelDTO.getPosition().getY())); + label.setVersionedComponentId(labelDTO.getVersionedComponentId()); if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) { label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight())); } @@ -1327,13 +1328,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "inputPort")) { inputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement)); } - remoteGroup.setInputPorts(inputPorts); + remoteGroup.setInputPorts(inputPorts, false); final Set outputPorts = new HashSet<>(); for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "outputPort")) { outputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement)); } - remoteGroup.setOutputPorts(outputPorts); + remoteGroup.setOutputPorts(outputPorts, false); processGroup.addRemoteProcessGroup(remoteGroup); for (final RemoteProcessGroupPortDescriptor remoteGroupPortDTO : outputPorts) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 9a14464cd8..4b186a9513 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -83,11 +83,14 @@ import org.apache.nifi.registry.flow.VersionedControllerService; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowState; +import org.apache.nifi.registry.flow.VersionedFlowStatus; import org.apache.nifi.registry.flow.VersionedFunnel; import org.apache.nifi.registry.flow.VersionedLabel; import org.apache.nifi.registry.flow.VersionedPort; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.VersionedProcessor; +import org.apache.nifi.registry.flow.VersionedPropertyDescriptor; import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; @@ -166,6 +169,8 @@ public final class StandardProcessGroup implements ProcessGroup { private final Map templates = new HashMap<>(); private final StringEncryptor encryptor; private final MutableVariableRegistry variableRegistry; + private final AtomicReference flowStatus = new AtomicReference<>( + new StandardVersionedFlowStatus(null, "Not yet synchronized with Flow Registry", null)); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -494,6 +499,7 @@ public final class StandardProcessGroup implements ProcessGroup { port.setProcessGroup(this); inputPorts.put(requireNonNull(port).getIdentifier(), port); flowController.onInputPortAdded(port); + onComponentModified(); } finally { writeLock.unlock(); } @@ -528,6 +534,8 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group"); } + onComponentModified(); + flowController.onInputPortRemoved(port); LOG.info("Input Port {} removed from flow", port); } finally { @@ -575,6 +583,7 @@ public final class StandardProcessGroup implements ProcessGroup { port.setProcessGroup(this); outputPorts.put(port.getIdentifier(), port); flowController.onOutputPortAdded(port); + onComponentModified(); } finally { writeLock.unlock(); } @@ -600,6 +609,8 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException(port.getIdentifier() + " is not an Output Port of this Process Group"); } + onComponentModified(); + flowController.onOutputPortRemoved(port); LOG.info("Output Port {} removed from flow", port); } finally { @@ -640,6 +651,7 @@ public final class StandardProcessGroup implements ProcessGroup { processGroups.put(Objects.requireNonNull(group).getIdentifier(), group); flowController.onProcessGroupAdded(group); + onComponentModified(); } finally { writeLock.unlock(); } @@ -679,6 +691,8 @@ public final class StandardProcessGroup implements ProcessGroup { removeComponents(group); processGroups.remove(group.getIdentifier()); + onComponentModified(); + flowController.onProcessGroupRemoved(group); LOG.info("{} removed from flow", group); } finally { @@ -734,6 +748,7 @@ public final class StandardProcessGroup implements ProcessGroup { remoteGroup.setProcessGroup(this); remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), remoteGroup); + onComponentModified(); } finally { writeLock.unlock(); } @@ -767,6 +782,8 @@ public final class StandardProcessGroup implements ProcessGroup { } } + onComponentModified(); + for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) { // must copy to avoid a concurrent modification final Set copy = new HashSet<>(port.getConnections()); @@ -802,6 +819,7 @@ public final class StandardProcessGroup implements ProcessGroup { processor.getVariableRegistry().setParent(getVariableRegistry()); processors.put(processorId, processor); flowController.onProcessorAdded(processor); + onComponentModified(); } finally { writeLock.unlock(); } @@ -843,6 +861,8 @@ public final class StandardProcessGroup implements ProcessGroup { } processors.remove(id); + onComponentModified(); + flowController.onProcessorRemoved(processor); LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers(); @@ -912,6 +932,7 @@ public final class StandardProcessGroup implements ProcessGroup { writeLock.lock(); try { connections.put(connection.getIdentifier(), connection); + onComponentModified(); connection.setProcessGroup(this); } finally { writeLock.unlock(); @@ -983,6 +1004,7 @@ public final class StandardProcessGroup implements ProcessGroup { } connections.put(connection.getIdentifier(), connection); flowController.onConnectionAdded(connection); + onComponentModified(); } finally { writeLock.unlock(); } @@ -1042,6 +1064,8 @@ public final class StandardProcessGroup implements ProcessGroup { // remove the connection from our map connections.remove(connection.getIdentifier()); LOG.info("{} removed from flow", connection); + onComponentModified(); + flowController.onConnectionRemoved(connection); } finally { writeLock.unlock(); @@ -1109,6 +1133,7 @@ public final class StandardProcessGroup implements ProcessGroup { label.setProcessGroup(this); labels.put(label.getIdentifier(), label); + onComponentModified(); } finally { writeLock.unlock(); } @@ -1123,6 +1148,7 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException(label + " is not a member of this Process Group."); } + onComponentModified(); LOG.info("Label with ID {} removed from flow", label.getIdentifier()); } finally { writeLock.unlock(); @@ -1828,6 +1854,8 @@ public final class StandardProcessGroup implements ProcessGroup { if (autoStart) { startFunnel(funnel); } + + onComponentModified(); } finally { writeLock.unlock(); } @@ -1859,18 +1887,43 @@ public final class StandardProcessGroup implements ProcessGroup { @Override - public ControllerServiceNode findControllerService(final String id) { - return findControllerService(id, this); + public ControllerServiceNode findControllerService(final String id, final boolean includeDescendants, final boolean includeAncestors) { + ControllerServiceNode serviceNode; + if (includeDescendants) { + serviceNode = findDescendantControllerService(id, this); + } else { + serviceNode = getControllerService(id); + } + + if (serviceNode == null && includeAncestors) { + serviceNode = findAncestorControllerService(id, getParent()); + } + + return serviceNode; } - private ControllerServiceNode findControllerService(final String id, final ProcessGroup start) { + private ControllerServiceNode findAncestorControllerService(final String id, final ProcessGroup start) { + if (start == null) { + return null; + } + + final ControllerServiceNode serviceNode = start.getControllerService(id); + if (serviceNode != null) { + return serviceNode; + } + + final ProcessGroup parent = start.getParent(); + return findAncestorControllerService(id, parent); + } + + private ControllerServiceNode findDescendantControllerService(final String id, final ProcessGroup start) { ControllerServiceNode service = start.getControllerService(id); if (service != null) { return service; } for (final ProcessGroup group : start.getProcessGroups()) { - service = findControllerService(id, group); + service = findDescendantControllerService(id, group); if (service != null) { return service; } @@ -1916,6 +1969,8 @@ public final class StandardProcessGroup implements ProcessGroup { } funnels.remove(funnel.getIdentifier()); + onComponentModified(); + flowController.onFunnelRemoved(funnel); LOG.info("{} removed from flow", funnel); } finally { @@ -1947,6 +2002,7 @@ public final class StandardProcessGroup implements ProcessGroup { service.getVariableRegistry().setParent(getVariableRegistry()); this.controllerServices.put(service.getIdentifier(), service); LOG.info("{} added to {}", service, this); + onComponentModified(); } finally { writeLock.unlock(); } @@ -2010,6 +2066,21 @@ public final class StandardProcessGroup implements ProcessGroup { } controllerServices.remove(service.getIdentifier()); + onComponentModified(); + + // For any component that references this Controller Service, find the component's Process Group + // and notify the Process Group that a component has been modified. This way, we know to re-calculate + // whether or not the Process Group has local modifications. + service.getReferences().getReferencingComponents().stream() + .map(ConfiguredComponent::getProcessGroupIdentifier) + .filter(id -> !id.equals(getIdentifier())) + .forEach(groupId -> { + final ProcessGroup descendant = findProcessGroup(groupId); + if (descendant != null) { + descendant.onComponentModified(); + } + }); + flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier()); removed = true; @@ -2043,6 +2114,7 @@ public final class StandardProcessGroup implements ProcessGroup { templates.put(id, template); template.setProcessGroup(this); LOG.info("{} added to {}", template, this); + onComponentModified(); } finally { writeLock.unlock(); } @@ -2112,6 +2184,8 @@ public final class StandardProcessGroup implements ProcessGroup { } templates.remove(template.getIdentifier()); + onComponentModified(); + LOG.info("{} removed from flow", template); } finally { writeLock.unlock(); @@ -2172,6 +2246,8 @@ public final class StandardProcessGroup implements ProcessGroup { toRemove.verifyCanDelete(true); } + onComponentModified(); + for (final String id : connectionIdsToRemove) { removeConnection(connections.get(id)); } @@ -2224,6 +2300,8 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException("Cannot move Ports into the root group"); } + onComponentModified(); + for (final String id : getKeys(snippet.getInputPorts())) { destination.addInputPort(inputPorts.remove(id)); } @@ -2844,6 +2922,34 @@ public final class StandardProcessGroup implements ProcessGroup { return versionControlInfo.get(); } + @Override + public void onComponentModified() { + // We no longer know if or how the Process Group has changed, so the next time that we + // get the local modifications, we must re-calculate it. We cannot simply assume that + // the flow was modified now, because if a Processor Property changed from 'A' to 'B', + // then back to 'A', then we have to know that it was not modified. So we set it to null + // to indicate that we must calculate the local modifications. + final StandardVersionControlInformation svci = this.versionControlInfo.get(); + if (svci == null) { + // This group is not under version control directly. Notify parent. + final ProcessGroup parentGroup = parent.get(); + if (parentGroup != null) { + parentGroup.onComponentModified(); + } + } + + clearFlowDifferences(); + } + + private void clearFlowDifferences() { + boolean updated = false; + while (!updated) { + final StandardVersionedFlowStatus status = flowStatus.get(); + final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), null); + updated = flowStatus.compareAndSet(status, updatedStatus); + } + } + @Override public void setVersionControlInformation(final VersionControlInformation versionControlInformation, final Map versionedComponentIds) { final StandardVersionControlInformation svci = new StandardVersionControlInformation( @@ -2854,16 +2960,63 @@ public final class StandardProcessGroup implements ProcessGroup { versionControlInformation.getVersion(), stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true), versionControlInformation.isModified(), - versionControlInformation.isCurrent()) { + versionControlInformation.isCurrent(), + versionControlInformation.getStatus()) { @Override public boolean isModified() { - final Set differences = StandardProcessGroup.this.getModifications(); - if (differences == null) { - return false; + boolean updated = false; + while (true) { + final StandardVersionedFlowStatus status = flowStatus.get(); + Set differences = status.getCurrentDifferences(); + if (differences == null) { + differences = getModifications(); + if (differences == null) { + return false; + } + + final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), differences); + updated = flowStatus.compareAndSet(status, updatedStatus); + + if (updated) { + return !differences.isEmpty(); + } + + continue; + } + + return !differences.isEmpty(); + } + } + + @Override + public VersionedFlowStatus getStatus() { + // If current state is a sync failure, then + final StandardVersionedFlowStatus status = flowStatus.get(); + final VersionedFlowState state = status.getState(); + if (state == VersionedFlowState.SYNC_FAILURE) { + return status; } - return !differences.isEmpty(); + final boolean modified = isModified(); + if (!modified) { + final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get(); + if (vci.getFlowSnapshot() == null) { + return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry", null); + } + } + + final boolean stale = !isCurrent(); + + if (modified && stale) { + return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED_AND_STALE, "Local changes have been made and a newer version of this flow is available", null); + } else if (modified) { + return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED, "Local changes have been made", null); + } else if (stale) { + return new StandardVersionedFlowStatus(VersionedFlowState.STALE, "A newer version of this flow is available", null); + } else { + return new StandardVersionedFlowStatus(VersionedFlowState.UP_TO_DATE, "Flow version is current", null); + } } }; @@ -2875,6 +3028,7 @@ public final class StandardProcessGroup implements ProcessGroup { try { updateVersionedComponentIds(this, versionedComponentIds); this.versionControlInfo.set(svci); + clearFlowDifferences(); } finally { writeLock.unlock(); } @@ -2901,6 +3055,7 @@ public final class StandardProcessGroup implements ProcessGroup { copy.setProcessors(processGroup.getProcessors()); copy.setRemoteProcessGroups(processGroup.getRemoteProcessGroups()); copy.setVariables(processGroup.getVariables()); + copy.setLabels(processGroup.getLabels()); final Set copyChildren = new HashSet<>(); @@ -2944,8 +3099,22 @@ public final class StandardProcessGroup implements ProcessGroup { } applyVersionedComponentIds(processGroup, versionedComponentIds::get); + + // If we versioned any parent groups' Controller Services, set their versioned component id's too. + final ProcessGroup parent = processGroup.getParent(); + if (parent != null) { + for (final ControllerServiceNode service : parent.getControllerServices(true)) { + if (!service.getVersionedComponentId().isPresent()) { + final String versionedId = versionedComponentIds.get(service.getIdentifier()); + if (versionedId != null) { + service.setVersionedComponentId(versionedId); + } + } + } + } } + private void applyVersionedComponentIds(final ProcessGroup processGroup, final Function lookup) { processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier())); @@ -2980,6 +3149,14 @@ public final class StandardProcessGroup implements ProcessGroup { .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup)); } + private void setSyncFailedState(final String explanation) { + boolean updated = false; + while (!updated) { + final StandardVersionedFlowStatus status = flowStatus.get(); + final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, explanation, status.getCurrentDifferences()); + updated = flowStatus.compareAndSet(status, updatedStatus); + } + } @Override public void synchronizeWithFlowRegistry(final FlowRegistryClient flowRegistryClient) { @@ -2991,6 +3168,10 @@ public final class StandardProcessGroup implements ProcessGroup { final String registryId = vci.getRegistryIdentifier(); final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId); if (flowRegistry == null) { + final String message = String.format("Unable to synchronize Process Group with Flow Registry because Process Group was placed under Version Control using Flow Registry " + + "with identifier %s but cannot find any Flow Registry with this identifier", registryId); + setSyncFailedState(message); + LOG.error("Unable to synchronize {} with Flow Registry because Process Group was placed under Version Control using Flow Registry " + "with identifier {} but cannot find any Flow Registry with this identifier", this, registryId); return; @@ -3005,8 +3186,12 @@ public final class StandardProcessGroup implements ProcessGroup { final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents(); vci.setFlowSnapshot(registryFlow); } catch (final IOException | NiFiRegistryException e) { + final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not retrieve version %s of flow with identifier %s in bucket %s", + vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier()); + setSyncFailedState(message); + LOG.error("Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}", - new Object[] {this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier()}, e); + this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier(), e); return; } } @@ -3027,7 +3212,17 @@ public final class StandardProcessGroup implements ProcessGroup { LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}", new Object[] {this, vci.getVersion(), latestVersion}); } + + boolean updated = false; + while (!updated) { + final StandardVersionedFlowStatus status = flowStatus.get(); + final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(null, null, status.getCurrentDifferences()); + updated = flowStatus.compareAndSet(status, updatedStatus); + } } catch (final IOException | NiFiRegistryException e) { + final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry"); + setSyncFailedState(message); + LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e); } } @@ -3041,12 +3236,12 @@ public final class StandardProcessGroup implements ProcessGroup { verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), true); + final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), true); final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup); final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents()); - final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, new StaticDifferenceDescriptor()); + final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor()); final FlowComparison flowComparison = flowComparator.compare(); final Set updatedVersionedComponentIds = new HashSet<>(); @@ -3055,6 +3250,25 @@ public final class StandardProcessGroup implements ProcessGroup { continue; } + // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level + // and if so compare our VersionedControllerService to the existing service. + if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) { + final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA(); + if (ComponentType.CONTROLLER_SERVICE == component.getComponentType()) { + final ControllerServiceNode serviceNode = getVersionedControllerService(this, component.getIdentifier()); + if (serviceNode != null) { + final VersionedControllerService versionedService = mapper.mapControllerService(serviceNode, controllerServiceProvider); + final Set differences = flowComparator.compareControllerServices(versionedService, (VersionedControllerService) component); + + if (!differences.isEmpty()) { + updatedVersionedComponentIds.add(component.getIdentifier()); + } + + continue; + } + } + } + final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA(); updatedVersionedComponentIds.add(component.getIdentifier()); @@ -3081,6 +3295,35 @@ public final class StandardProcessGroup implements ProcessGroup { } } + private Set getAncestorGroupServiceIds() { + final Set ancestorServiceIds; + ProcessGroup parentGroup = getParent(); + + if (parentGroup == null) { + ancestorServiceIds = Collections.emptySet(); + } else { + ancestorServiceIds = parentGroup.getControllerServices(true).stream() + .map(ControllerServiceNode::getIdentifier) + .collect(Collectors.toSet()); + } + + return ancestorServiceIds; + } + + private ControllerServiceNode getVersionedControllerService(final ProcessGroup group, final String versionedComponentId) { + if (group == null) { + return null; + } + + for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) { + if (serviceNode.getVersionedComponentId().isPresent() && serviceNode.getVersionedComponentId().get().equals(versionedComponentId)) { + return serviceNode; + } + } + + return getVersionedControllerService(group.getParent(), versionedComponentId); + } + private Set getKnownVariableNames() { final Set variableNames = new HashSet<>(); populateKnownVariableNames(this, variableNames); @@ -3159,6 +3402,44 @@ public final class StandardProcessGroup implements ProcessGroup { group.setVersionControlInformation(vci, Collections.emptyMap()); } + + // Controller Services + // Controller Services have to be handled a bit differently than other components. This is because Processors and Controller + // Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding + // Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each + // Controller Service. This way, we ensure that all services have been created before setting the properties. This allows us to + // properly obtain the correct mapping of Controller Service VersionedComponentID to Controller Service instance id. + final Map servicesByVersionedId = group.getControllerServices(false).stream() + .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); + + final Set controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet()); + + final Map services = new HashMap<>(); + + // Add any Controller Service that does not yet exist. + for (final VersionedControllerService proposedService : proposed.getControllerServices()) { + ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier()); + if (service == null) { + service = addControllerService(group, proposedService, componentIdSeed); + LOG.info("Added {} to {}", service, this); + } + + services.put(service, proposedService); + } + + // Update all of the Controller Services to match the VersionedControllerService + for (final Map.Entry entry : services.entrySet()) { + final ControllerServiceNode service = entry.getKey(); + final VersionedControllerService proposedService = entry.getValue(); + + if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { + updateControllerService(service, proposedService); + LOG.info("Updated {}", service); + } + + controllerServicesRemoved.remove(proposedService.getIdentifier()); + } + // Child groups final Map childGroupsByVersionedId = group.getProcessGroups().stream() .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); @@ -3179,26 +3460,6 @@ public final class StandardProcessGroup implements ProcessGroup { childGroupsRemoved.remove(proposedChildGroup.getIdentifier()); } - - // Controller Services - final Map servicesByVersionedId = group.getControllerServices(false).stream() - .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); - final Set controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet()); - - for (final VersionedControllerService proposedService : proposed.getControllerServices()) { - final ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier()); - if (service == null) { - final ControllerServiceNode added = addControllerService(group, proposedService, componentIdSeed); - LOG.info("Added {} to {}", added, this); - } else if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) { - updateControllerService(service, proposedService); - LOG.info("Updated {}", service); - } - - controllerServicesRemoved.remove(proposedService.getIdentifier()); - } - - // Funnels final Map funnelsByVersionedId = group.getFunnels().stream() .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity())); @@ -3608,7 +3869,7 @@ public final class StandardProcessGroup implements ProcessGroup { service.setAnnotationData(proposed.getAnnotationData()); service.setComments(proposed.getComments()); service.setName(proposed.getName()); - service.setProperties(populatePropertiesMap(service.getProperties(), proposed.getProperties())); + service.setProperties(populatePropertiesMap(service.getProperties(), proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup())); if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) { final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); @@ -3728,7 +3989,7 @@ public final class StandardProcessGroup implements ProcessGroup { processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode())); processor.setName(proposed.getName()); processor.setPenalizationPeriod(proposed.getPenaltyDuration()); - processor.setProperties(populatePropertiesMap(processor.getProperties(), proposed.getProperties())); + processor.setProperties(populatePropertiesMap(processor.getProperties(), proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup())); processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS); processor.setScheduldingPeriod(proposed.getSchedulingPeriod()); processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy())); @@ -3745,19 +4006,60 @@ public final class StandardProcessGroup implements ProcessGroup { } - private Map populatePropertiesMap(final Map currentProperties, final Map proposedProperties) { + private Map populatePropertiesMap(final Map currentProperties, final Map proposedProperties, + final Map proposedDescriptors, final ProcessGroup group) { + final Map fullPropertyMap = new HashMap<>(); for (final PropertyDescriptor property : currentProperties.keySet()) { fullPropertyMap.put(property.getName(), null); } if (proposedProperties != null) { - fullPropertyMap.putAll(proposedProperties); + for (final Map.Entry entry : proposedProperties.entrySet()) { + final String propertyName = entry.getKey(); + final VersionedPropertyDescriptor descriptor = proposedDescriptors.get(propertyName); + + String value; + if (descriptor != null && descriptor.getIdentifiesControllerService()) { + // Property identifies a Controller Service. So the value that we want to assign is not the value given. + // The value given is instead the Versioned Component ID of the Controller Service. We want to resolve this + // to the instance ID of the Controller Service. + final String serviceVersionedComponentId = entry.getValue(); + final String instanceId = getServiceInstanceId(serviceVersionedComponentId, group); + value = instanceId == null ? serviceVersionedComponentId : instanceId; + } else { + value = entry.getValue(); + } + + fullPropertyMap.put(propertyName, value); + } } return fullPropertyMap; } + private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) { + for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) { + final Optional optionalVersionedId = serviceNode.getVersionedComponentId(); + if (!optionalVersionedId.isPresent()) { + continue; + } + + final String versionedId = optionalVersionedId.get(); + if (versionedId.equals(serviceVersionedComponentId)) { + return serviceNode.getIdentifier(); + } + } + + final ProcessGroup parent = group.getParent(); + if (parent == null) { + return null; + } + + return getServiceInstanceId(serviceVersionedComponentId, parent); + + } + private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) { final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getTargetUris()); rpg.setVersionedComponentId(proposed.getIdentifier()); @@ -3773,12 +4075,12 @@ public final class StandardProcessGroup implements ProcessGroup { rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout()); rpg.setInputPorts(proposed.getInputPorts() == null ? Collections.emptySet() : proposed.getInputPorts().stream() .map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier())) - .collect(Collectors.toSet())); + .collect(Collectors.toSet()), false); rpg.setName(proposed.getName()); rpg.setNetworkInterface(proposed.getLocalNetworkInterface()); rpg.setOutputPorts(proposed.getOutputPorts() == null ? Collections.emptySet() : proposed.getOutputPorts().stream() .map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier())) - .collect(Collectors.toSet())); + .collect(Collectors.toSet()), false); rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); rpg.setProxyHost(proposed.getProxyHost()); rpg.setProxyPort(proposed.getProxyPort()); @@ -3831,12 +4133,12 @@ public final class StandardProcessGroup implements ProcessGroup { } final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false); + final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), false); final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup); final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot()); - final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor()); + final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorGroupServiceIds(), new EvolvingDifferenceDescriptor()); final FlowComparison comparison = flowComparator.compare(); final Set differences = comparison.getDifferences(); final Set functionalDifferences = differences.stream() diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java new file mode 100644 index 0000000000..f362c1e25d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import java.util.Set; + +import org.apache.nifi.registry.flow.VersionedFlowState; +import org.apache.nifi.registry.flow.VersionedFlowStatus; +import org.apache.nifi.registry.flow.diff.FlowDifference; + +class StandardVersionedFlowStatus implements VersionedFlowStatus { + private final VersionedFlowState state; + private final String explanation; + private final Set currentDifferences; + + StandardVersionedFlowStatus(final VersionedFlowState state, final String explanation, final Set differences) { + this.state = state; + this.explanation = explanation; + this.currentDifferences = differences; + } + + @Override + public VersionedFlowState getState() { + return state; + } + + @Override + public String getStateExplanation() { + return explanation; + } + + Set getCurrentDifferences() { + return currentDifferences; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java index 92a4166d71..106d19a342 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java @@ -34,6 +34,7 @@ public class StandardVersionControlInformation implements VersionControlInformat private volatile VersionedProcessGroup flowSnapshot; private volatile boolean modified; private volatile boolean current; + private final VersionedFlowStatus status; public static class Builder { private String registryIdentifier; @@ -47,6 +48,7 @@ public class StandardVersionControlInformation implements VersionControlInformat private VersionedProcessGroup flowSnapshot; private Boolean modified = null; private Boolean current = null; + private VersionedFlowStatus status; public Builder registryId(String registryId) { this.registryIdentifier = registryId; @@ -103,6 +105,11 @@ public class StandardVersionControlInformation implements VersionControlInformat return this; } + public Builder status(final VersionedFlowStatus status) { + this.status = status; + return this; + } + public static Builder fromDto(VersionControlInformationDTO dto) { Builder builder = new Builder(); builder.registryId(dto.getRegistryId()) @@ -126,7 +133,7 @@ public class StandardVersionControlInformation implements VersionControlInformat Objects.requireNonNull(version, "Version must be specified"); final StandardVersionControlInformation svci = new StandardVersionControlInformation(registryIdentifier, registryName, - bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current); + bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current, status); svci.setBucketName(bucketName); svci.setFlowName(flowName); @@ -138,7 +145,7 @@ public class StandardVersionControlInformation implements VersionControlInformat public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version, - final VersionedProcessGroup snapshot, final boolean modified, final boolean current) { + final VersionedProcessGroup snapshot, final boolean modified, final boolean current, final VersionedFlowStatus status) { this.registryIdentifier = registryId; this.registryName = registryName; this.bucketIdentifier = bucketId; @@ -147,6 +154,7 @@ public class StandardVersionControlInformation implements VersionControlInformat this.flowSnapshot = snapshot; this.modified = modified; this.current = current; + this.status = status; } @@ -232,4 +240,9 @@ public class StandardVersionControlInformation implements VersionControlInformat public void setFlowSnapshot(final VersionedProcessGroup flowSnapshot) { this.flowSnapshot = flowSnapshot; } + + @Override + public VersionedFlowStatus getStatus() { + return status; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java deleted file mode 100644 index 193bde8454..0000000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.registry.flow.mapping; - -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; - -import org.apache.nifi.registry.flow.BatchSize; -import org.apache.nifi.registry.flow.Bundle; -import org.apache.nifi.registry.flow.ComponentType; -import org.apache.nifi.registry.flow.ConnectableComponent; -import org.apache.nifi.registry.flow.ConnectableComponentType; -import org.apache.nifi.registry.flow.ControllerServiceAPI; -import org.apache.nifi.registry.flow.PortType; -import org.apache.nifi.registry.flow.Position; -import org.apache.nifi.registry.flow.VersionedConnection; -import org.apache.nifi.registry.flow.VersionedControllerService; -import org.apache.nifi.registry.flow.VersionedFunnel; -import org.apache.nifi.registry.flow.VersionedLabel; -import org.apache.nifi.registry.flow.VersionedPort; -import org.apache.nifi.registry.flow.VersionedProcessGroup; -import org.apache.nifi.registry.flow.VersionedProcessor; -import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; -import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; -import org.apache.nifi.web.api.dto.BatchSettingsDTO; -import org.apache.nifi.web.api.dto.BundleDTO; -import org.apache.nifi.web.api.dto.ConnectableDTO; -import org.apache.nifi.web.api.dto.ConnectionDTO; -import org.apache.nifi.web.api.dto.ControllerServiceApiDTO; -import org.apache.nifi.web.api.dto.ControllerServiceDTO; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; -import org.apache.nifi.web.api.dto.FunnelDTO; -import org.apache.nifi.web.api.dto.LabelDTO; -import org.apache.nifi.web.api.dto.PortDTO; -import org.apache.nifi.web.api.dto.PositionDTO; -import org.apache.nifi.web.api.dto.ProcessGroupDTO; -import org.apache.nifi.web.api.dto.ProcessorConfigDTO; -import org.apache.nifi.web.api.dto.ProcessorDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; - - -public class NiFiRegistryDtoMapper { - // We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when - // we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned' - // identifier based on the comopnent's actual id. We do connections last, so that all components will already have been - // created before attempting to create the connection, where the ConnectableDTO is converted. - private Map versionedComponentIds = new HashMap<>(); - - public VersionedProcessGroup mapProcessGroup(final ProcessGroupDTO dto) { - versionedComponentIds.clear(); - return mapGroup(dto); - } - - private VersionedProcessGroup mapGroup(final ProcessGroupDTO dto) { - final VersionedProcessGroup versionedGroup = new VersionedProcessGroup(); - versionedGroup.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - versionedGroup.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - versionedGroup.setName(dto.getName()); - versionedGroup.setComments(dto.getComments()); - versionedGroup.setPosition(mapPosition(dto.getPosition())); - - final FlowSnippetDTO contents = dto.getContents(); - - versionedGroup.setControllerServices(contents.getControllerServices().stream() - .map(this::mapControllerService) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setFunnels(contents.getFunnels().stream() - .map(this::mapFunnel) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setInputPorts(contents.getInputPorts().stream() - .map(this::mapPort) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setOutputPorts(contents.getOutputPorts().stream() - .map(this::mapPort) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setLabels(contents.getLabels().stream() - .map(this::mapLabel) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setProcessors(contents.getProcessors().stream() - .map(this::mapProcessor) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setRemoteProcessGroups(contents.getRemoteProcessGroups().stream() - .map(this::mapRemoteProcessGroup) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setProcessGroups(contents.getProcessGroups().stream() - .map(this::mapGroup) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - versionedGroup.setConnections(contents.getConnections().stream() - .map(this::mapConnection) - .collect(Collectors.toCollection(LinkedHashSet::new))); - - return versionedGroup; - } - - private String getId(final String currentVersionedId, final String componentId) { - final String versionedId; - if (currentVersionedId == null) { - versionedId = UUID.nameUUIDFromBytes(componentId.getBytes(StandardCharsets.UTF_8)).toString(); - } else { - versionedId = currentVersionedId; - } - - versionedComponentIds.put(componentId, versionedId); - return versionedId; - } - - private String getGroupId(final String groupId) { - return versionedComponentIds.get(groupId); - } - - public VersionedConnection mapConnection(final ConnectionDTO dto) { - final VersionedConnection connection = new VersionedConnection(); - connection.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - connection.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - connection.setName(dto.getName()); - connection.setBackPressureDataSizeThreshold(dto.getBackPressureDataSizeThreshold()); - connection.setBackPressureObjectThreshold(dto.getBackPressureObjectThreshold()); - connection.setFlowFileExpiration(dto.getFlowFileExpiration()); - connection.setLabelIndex(dto.getLabelIndex()); - connection.setPosition(mapPosition(dto.getPosition())); - connection.setPrioritizers(dto.getPrioritizers()); - connection.setSelectedRelationships(dto.getSelectedRelationships()); - connection.setzIndex(dto.getzIndex()); - - connection.setBends(dto.getBends().stream() - .map(this::mapPosition) - .collect(Collectors.toList())); - - connection.setSource(mapConnectable(dto.getSource())); - connection.setDestination(mapConnectable(dto.getDestination())); - - return connection; - } - - public ConnectableComponent mapConnectable(final ConnectableDTO dto) { - final ConnectableComponent component = new ConnectableComponent(); - - final String versionedId = dto.getVersionedComponentId(); - if (versionedId == null) { - final String resolved = versionedComponentIds.get(dto.getId()); - if (resolved == null) { - throw new IllegalArgumentException("Unable to map Connectable Component with identifier " + dto.getId() + " to any version-controlled component"); - } - - component.setId(resolved); - } else { - component.setId(versionedId); - } - - component.setComments(dto.getComments()); - component.setGroupId(dto.getGroupId()); - component.setName(dto.getName()); - component.setType(ConnectableComponentType.valueOf(dto.getType())); - return component; - } - - public VersionedControllerService mapControllerService(final ControllerServiceDTO dto) { - final VersionedControllerService service = new VersionedControllerService(); - service.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - service.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - service.setName(dto.getName()); - service.setAnnotationData(dto.getAnnotationData()); - service.setBundle(mapBundle(dto.getBundle())); - service.setComments(dto.getComments()); - service.setControllerServiceApis(dto.getControllerServiceApis().stream() - .map(this::mapControllerServiceApi) - .collect(Collectors.toList())); - service.setProperties(dto.getProperties()); - service.setType(dto.getType()); - return null; - } - - private Bundle mapBundle(final BundleDTO dto) { - final Bundle bundle = new Bundle(); - bundle.setGroup(dto.getGroup()); - bundle.setArtifact(dto.getArtifact()); - bundle.setVersion(dto.getVersion()); - return bundle; - } - - private ControllerServiceAPI mapControllerServiceApi(final ControllerServiceApiDTO dto) { - final ControllerServiceAPI api = new ControllerServiceAPI(); - api.setBundle(mapBundle(dto.getBundle())); - api.setType(dto.getType()); - return api; - } - - public VersionedFunnel mapFunnel(final FunnelDTO dto) { - final VersionedFunnel funnel = new VersionedFunnel(); - funnel.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - funnel.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - funnel.setPosition(mapPosition(dto.getPosition())); - return funnel; - } - - public VersionedLabel mapLabel(final LabelDTO dto) { - final VersionedLabel label = new VersionedLabel(); - label.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - label.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - label.setHeight(dto.getHeight()); - label.setWidth(dto.getWidth()); - label.setLabel(dto.getLabel()); - label.setPosition(mapPosition(dto.getPosition())); - label.setStyle(dto.getStyle()); - return label; - } - - public VersionedPort mapPort(final PortDTO dto) { - final VersionedPort port = new VersionedPort(); - port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - port.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - port.setComments(dto.getComments()); - port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount()); - port.setName(dto.getName()); - port.setPosition(mapPosition(dto.getPosition())); - port.setType(PortType.valueOf(dto.getType())); - return port; - } - - public Position mapPosition(final PositionDTO dto) { - final Position position = new Position(); - position.setX(dto.getX()); - position.setY(dto.getY()); - return position; - } - - public VersionedProcessor mapProcessor(final ProcessorDTO dto) { - final ProcessorConfigDTO config = dto.getConfig(); - - final VersionedProcessor processor = new VersionedProcessor(); - processor.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - processor.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - processor.setType(dto.getType()); - processor.setAnnotationData(config.getAnnotationData()); - processor.setAutoTerminatedRelationships(config.getAutoTerminatedRelationships()); - processor.setBulletinLevel(config.getBulletinLevel()); - processor.setBundle(mapBundle(dto.getBundle())); - processor.setComments(config.getComments()); - processor.setConcurrentlySchedulableTaskCount(config.getConcurrentlySchedulableTaskCount()); - processor.setExecutionNode(config.getExecutionNode()); - processor.setName(dto.getName()); - processor.setPenaltyDuration(config.getPenaltyDuration()); - processor.setPosition(mapPosition(dto.getPosition())); - processor.setProperties(config.getProperties()); - processor.setRunDurationMillis(config.getRunDurationMillis()); - processor.setSchedulingPeriod(config.getSchedulingPeriod()); - processor.setSchedulingStrategy(config.getSchedulingStrategy()); - processor.setStyle(dto.getStyle()); - processor.setYieldDuration(config.getYieldDuration()); - return processor; - } - - public VersionedRemoteProcessGroup mapRemoteProcessGroup(final RemoteProcessGroupDTO dto) { - final VersionedRemoteProcessGroup rpg = new VersionedRemoteProcessGroup(); - rpg.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - rpg.setGroupIdentifier(getGroupId(dto.getParentGroupId())); - rpg.setComments(dto.getComments()); - rpg.setCommunicationsTimeout(dto.getCommunicationsTimeout()); - rpg.setLocalNetworkInterface(dto.getLocalNetworkInterface()); - rpg.setName(dto.getName()); - rpg.setInputPorts(dto.getContents().getInputPorts().stream() - .map(port -> mapRemotePort(port, ComponentType.REMOTE_INPUT_PORT)) - .collect(Collectors.toSet())); - rpg.setOutputPorts(dto.getContents().getOutputPorts().stream() - .map(port -> mapRemotePort(port, ComponentType.REMOTE_OUTPUT_PORT)) - .collect(Collectors.toSet())); - rpg.setPosition(mapPosition(dto.getPosition())); - rpg.setProxyHost(dto.getProxyHost()); - rpg.setProxyPort(dto.getProxyPort()); - rpg.setProxyUser(dto.getProxyUser()); - rpg.setTargetUri(dto.getTargetUri()); - rpg.setTargetUris(dto.getTargetUris()); - rpg.setTransportProtocol(dto.getTransportProtocol()); - rpg.setYieldDuration(dto.getYieldDuration()); - return rpg; - } - - public VersionedRemoteGroupPort mapRemotePort(final RemoteProcessGroupPortDTO dto, final ComponentType componentType) { - final VersionedRemoteGroupPort port = new VersionedRemoteGroupPort(); - port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId())); - port.setGroupIdentifier(getGroupId(dto.getGroupId())); - port.setComments(dto.getComments()); - port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount()); - port.setRemoteGroupId(dto.getGroupId()); - port.setName(dto.getName()); - port.setUseCompression(dto.getUseCompression()); - port.setBatchSize(mapBatchSettings(dto.getBatchSettings())); - port.setTargetId(dto.getTargetId()); - port.setComponentType(componentType); - return port; - } - - private BatchSize mapBatchSettings(final BatchSettingsDTO dto) { - final BatchSize batchSize = new BatchSize(); - batchSize.setCount(dto.getCount()); - batchSize.setDuration(dto.getDuration()); - batchSize.setSize(dto.getSize()); - return batchSize; - } -} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index 7bab76d815..bdd328c94d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -28,7 +28,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.lang3.ClassUtils; @@ -44,6 +43,7 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.nar.ExtensionManager; @@ -59,15 +59,16 @@ import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.PortType; import org.apache.nifi.registry.flow.Position; -import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedControllerService; +import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.VersionedFunnel; import org.apache.nifi.registry.flow.VersionedLabel; import org.apache.nifi.registry.flow.VersionedPort; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.VersionedProcessor; +import org.apache.nifi.registry.flow.VersionedPropertyDescriptor; import org.apache.nifi.registry.flow.VersionedRemoteGroupPort; import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup; import org.apache.nifi.remote.RemoteGroupPort; @@ -80,56 +81,16 @@ public class NiFiRegistryFlowMapper { // created before attempting to create the connection, where the ConnectableDTO is converted. private Map versionedComponentIds = new HashMap<>(); - public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean mapDescendantVersionedFlows) { + public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, final FlowRegistryClient registryClient, + final boolean mapDescendantVersionedFlows) { versionedComponentIds.clear(); - final InstantiatedVersionedProcessGroup mapped = mapGroup(group, registryClient, true, mapDescendantVersionedFlows); + final InstantiatedVersionedProcessGroup mapped = mapGroup(group, serviceProvider, registryClient, true, mapDescendantVersionedFlows); - populateReferencedAncestorServices(group, mapped); populateReferencedAncestorVariables(group, mapped); return mapped; } - private void populateReferencedAncestorServices(final ProcessGroup group, final VersionedProcessGroup versionedGroup) { - final Set ancestorControllerServices = group.getControllerServices(true); - ancestorControllerServices.remove(group.getControllerServices(false)); - final Map ancestorServicesById = ancestorControllerServices.stream() - .collect(Collectors.toMap(ControllerServiceNode::getIdentifier, Function.identity())); - - final Set referenced = new HashSet<>(); - - for (final ProcessorNode processor : group.findAllProcessors()) { - findReferencedServices(processor, ancestorServicesById, referenced); - } - - for (final ControllerServiceNode service : group.findAllControllerServices()) { - findReferencedServices(service, ancestorServicesById, referenced); - } - - final Set versionedServices = referenced.stream().map(this::mapControllerService) - .collect(Collectors.toCollection(LinkedHashSet::new)); - - versionedGroup.getControllerServices().addAll(versionedServices); - } - - private Set findReferencedServices(final ConfiguredComponent component, final Map ancestorServicesById, - final Set referenced) { - - for (final Map.Entry entry : component.getProperties().entrySet()) { - final PropertyDescriptor descriptor = entry.getKey(); - if (descriptor.getControllerServiceDefinition() != null) { - final String serviceId = entry.getValue(); - final ControllerServiceNode serviceNode = ancestorServicesById.get(serviceId); - if (serviceNode != null) { - referenced.add(serviceNode); - referenced.addAll(findReferencedServices(serviceNode, ancestorServicesById, referenced)); - } - } - } - - return referenced; - } - private void populateReferencedAncestorVariables(final ProcessGroup group, final VersionedProcessGroup versionedGroup) { final Set ancestorVariableNames = new HashSet<>(); populateVariableNames(group.getParent(), ancestorVariableNames); @@ -167,7 +128,9 @@ public class NiFiRegistryFlowMapper { } - private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel, final boolean mapDescendantVersionedFlows) { + private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceLookup, final FlowRegistryClient registryClient, + final boolean topLevel, final boolean mapDescendantVersionedFlows) { + final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier()); versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier())); versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier())); @@ -212,7 +175,7 @@ public class NiFiRegistryFlowMapper { } versionedGroup.setControllerServices(group.getControllerServices(false).stream() - .map(this::mapControllerService) + .map(service -> mapControllerService(service, serviceLookup)) .collect(Collectors.toCollection(LinkedHashSet::new))); versionedGroup.setFunnels(group.getFunnels().stream() @@ -232,7 +195,7 @@ public class NiFiRegistryFlowMapper { .collect(Collectors.toCollection(LinkedHashSet::new))); versionedGroup.setProcessors(group.getProcessors().stream() - .map(this::mapProcessor) + .map(processor -> mapProcessor(processor, serviceLookup)) .collect(Collectors.toCollection(LinkedHashSet::new))); versionedGroup.setRemoteProcessGroups(group.getRemoteProcessGroups().stream() @@ -240,7 +203,7 @@ public class NiFiRegistryFlowMapper { .collect(Collectors.toCollection(LinkedHashSet::new))); versionedGroup.setProcessGroups(group.getProcessGroups().stream() - .map(grp -> mapGroup(grp, registryClient, false, mapDescendantVersionedFlows)) + .map(grp -> mapGroup(grp, serviceLookup, registryClient, false, mapDescendantVersionedFlows)) .collect(Collectors.toCollection(LinkedHashSet::new))); versionedGroup.setConnections(group.getConnections().stream() @@ -335,7 +298,7 @@ public class NiFiRegistryFlowMapper { return component; } - public VersionedControllerService mapControllerService(final ControllerServiceNode controllerService) { + public VersionedControllerService mapControllerService(final ControllerServiceNode controllerService, final ControllerServiceProvider serviceProvider) { final VersionedControllerService versionedService = new InstantiatedVersionedControllerService(controllerService.getIdentifier(), controllerService.getProcessGroupIdentifier()); versionedService.setIdentifier(getId(controllerService.getVersionedComponentId(), controllerService.getIdentifier())); versionedService.setGroupIdentifier(getGroupId(controllerService.getProcessGroupIdentifier())); @@ -345,14 +308,16 @@ public class NiFiRegistryFlowMapper { versionedService.setComments(controllerService.getComments()); versionedService.setControllerServiceApis(mapControllerServiceApis(controllerService)); - versionedService.setProperties(mapProperties(controllerService)); + versionedService.setProperties(mapProperties(controllerService, serviceProvider)); + versionedService.setPropertyDescriptors(mapPropertyDescriptors(controllerService)); versionedService.setType(controllerService.getCanonicalClassName()); return versionedService; } - private Map mapProperties(final ConfiguredComponent component) { + private Map mapProperties(final ConfiguredComponent component, final ControllerServiceProvider serviceProvider) { final Map mapped = new HashMap<>(); + component.getProperties().keySet().stream() .filter(property -> !property.isSensitive()) .forEach(property -> { @@ -360,11 +325,34 @@ public class NiFiRegistryFlowMapper { if (value == null) { value = property.getDefaultValue(); } + + if (value != null && property.getControllerServiceDefinition() != null) { + // Property references a Controller Service. Instead of storing the existing value, we want + // to store the Versioned Component ID of the service. + final ControllerServiceNode controllerService = serviceProvider.getControllerServiceNode(value); + if (controllerService != null) { + value = getId(controllerService.getVersionedComponentId(), controllerService.getIdentifier()); + } + } + mapped.put(property.getName(), value); }); + return mapped; } + private Map mapPropertyDescriptors(final ConfiguredComponent component) { + final Map descriptors = new HashMap<>(); + for (final PropertyDescriptor descriptor : component.getProperties().keySet()) { + final VersionedPropertyDescriptor versionedDescriptor = new VersionedPropertyDescriptor(); + versionedDescriptor.setName(descriptor.getName()); + versionedDescriptor.setDisplayName(descriptor.getDisplayName()); + versionedDescriptor.setIdentifiesControllerService(descriptor.getControllerServiceDefinition() != null); + descriptors.put(descriptor.getName(), versionedDescriptor); + } + return descriptors; + } + private Bundle mapBundle(final BundleCoordinate coordinate) { final Bundle versionedBundle = new Bundle(); versionedBundle.setGroup(coordinate.getGroup()); @@ -441,7 +429,7 @@ public class NiFiRegistryFlowMapper { return position; } - public VersionedProcessor mapProcessor(final ProcessorNode procNode) { + public VersionedProcessor mapProcessor(final ProcessorNode procNode, final ControllerServiceProvider serviceProvider) { final VersionedProcessor processor = new InstantiatedVersionedProcessor(procNode.getIdentifier(), procNode.getProcessGroupIdentifier()); processor.setIdentifier(getId(procNode.getVersionedComponentId(), procNode.getIdentifier())); processor.setGroupIdentifier(getGroupId(procNode.getProcessGroupIdentifier())); @@ -456,7 +444,8 @@ public class NiFiRegistryFlowMapper { processor.setName(procNode.getName()); processor.setPenaltyDuration(procNode.getPenalizationPeriod()); processor.setPosition(mapPosition(procNode.getPosition())); - processor.setProperties(mapProperties(procNode)); + processor.setProperties(mapProperties(procNode, serviceProvider)); + processor.setPropertyDescriptors(mapPropertyDescriptors(procNode)); processor.setRunDurationMillis(procNode.getRunDuration(TimeUnit.MILLISECONDS)); processor.setSchedulingPeriod(procNode.getSchedulingPeriod()); processor.setSchedulingStrategy(procNode.getSchedulingStrategy().name()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 67710fe713..ef05a1b15f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -178,7 +178,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { }; final Runnable checkAuthorizations = new InitializationTask(); - backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id); + backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id, true); backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS); backgroundThreadExecutor.submit(() -> { try { @@ -435,11 +435,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { * and started. * * @param ports the new ports + * @param pruneUnusedPorts if true, any ports that are not included in the given set of ports + * and that do not have any incoming connections will be removed. * * @throws NullPointerException if the given argument is null */ @Override - public void setInputPorts(final Set ports) { + public void setInputPorts(final Set ports, final boolean pruneUnusedPorts) { writeLock.lock(); try { final List newPortTargetIds = new ArrayList<>(); @@ -478,16 +480,18 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { // See if we have any ports that no longer exist; cannot be removed within the loop because it would cause // a ConcurrentModificationException. - final Iterator itr = inputPorts.values().iterator(); - while (itr.hasNext()) { - final StandardRemoteGroupPort port = itr.next(); - if (!newPortTargetIds.contains(port.getTargetIdentifier())) { - port.setTargetExists(false); - port.setTargetRunning(false); + if (pruneUnusedPorts) { + final Iterator itr = inputPorts.values().iterator(); + while (itr.hasNext()) { + final StandardRemoteGroupPort port = itr.next(); + if (!newPortTargetIds.contains(port.getTargetIdentifier())) { + port.setTargetExists(false); + port.setTargetRunning(false); - // If port has incoming connection, it will be cleaned up when the connection is removed - if (!port.hasIncomingConnection()) { - itr.remove(); + // If port has incoming connection, it will be cleaned up when the connection is removed + if (!port.hasIncomingConnection()) { + itr.remove(); + } } } } @@ -521,11 +525,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { * and started. * * @param ports the new ports + * @param pruneUnusedPorts if true, will remove any ports that are not in the given list and that have + * no outgoing connections * * @throws NullPointerException if the given argument is null */ @Override - public void setOutputPorts(final Set ports) { + public void setOutputPorts(final Set ports, final boolean pruneUnusedPorts) { writeLock.lock(); try { final List newPortTargetIds = new ArrayList<>(); @@ -535,7 +541,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { final Map outputPortByTargetId = outputPorts.values().stream() .collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity())); - final Map outputPortByName = inputPorts.values().stream() + final Map outputPortByName = outputPorts.values().stream() .collect(Collectors.toMap(StandardRemoteGroupPort::getName, Function.identity())); // Check if we have a matching port already and add the port if not. We determine a matching port @@ -564,16 +570,18 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { // See if we have any ports that no longer exist; cannot be removed within the loop because it would cause // a ConcurrentModificationException. - final Iterator itr = outputPorts.values().iterator(); - while (itr.hasNext()) { - final StandardRemoteGroupPort port = itr.next(); - if (!newPortTargetIds.contains(port.getTargetIdentifier())) { - port.setTargetExists(false); - port.setTargetRunning(false); + if (pruneUnusedPorts) { + final Iterator itr = outputPorts.values().iterator(); + while (itr.hasNext()) { + final StandardRemoteGroupPort port = itr.next(); + if (!newPortTargetIds.contains(port.getTargetIdentifier())) { + port.setTargetExists(false); + port.setTargetRunning(false); - // If port has connections, it will be cleaned up when connections are removed - if (port.getConnections().isEmpty()) { - itr.remove(); + // If port has connections, it will be cleaned up when connections are removed + if (port.getConnections().isEmpty()) { + itr.remove(); + } } } } @@ -617,53 +625,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } } - @Override - public void removeAllNonExistentPorts() { - writeLock.lock(); - try { - final Set inputPortIds = new HashSet<>(); - final Set outputPortIds = new HashSet<>(); - - for (final Map.Entry entry : inputPorts.entrySet()) { - final RemoteGroupPort port = entry.getValue(); - - if (port.getTargetExists()) { - continue; - } - - // If there's a connection, we don't remove it. - if (port.hasIncomingConnection()) { - continue; - } - - inputPortIds.add(entry.getKey()); - } - - for (final Map.Entry entry : outputPorts.entrySet()) { - final RemoteGroupPort port = entry.getValue(); - - if (port.getTargetExists()) { - continue; - } - - // If there's a connection, we don't remove it. - if (!port.getConnections().isEmpty()) { - continue; - } - - outputPortIds.add(entry.getKey()); - } - - for (final String id : inputPortIds) { - inputPorts.remove(id); - } - for (final String id : outputPortIds) { - outputPorts.remove(id); - } - } finally { - writeLock.unlock(); - } - } /** * Adds an Output Port to this Remote Process Group that is described by @@ -865,35 +826,16 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) { dto = apiClient.getController(targetUris); } catch (IOException e) { - writeLock.lock(); - try { - for (final Iterator iter = inputPorts.values().iterator(); iter.hasNext();) { - final StandardRemoteGroupPort inputPort = iter.next(); - if (!inputPort.hasIncomingConnection()) { - iter.remove(); - } - } - - for (final Iterator iter = outputPorts.values().iterator(); iter.hasNext();) { - final StandardRemoteGroupPort outputPort = iter.next(); - if (outputPort.getConnections().isEmpty()) { - iter.remove(); - } - } - } finally { - writeLock.unlock(); - } - throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + targetUris + " due to: " + e.getMessage()); } writeLock.lock(); try { if (dto.getInputPorts() != null) { - setInputPorts(convertRemotePort(dto.getInputPorts())); + setInputPorts(convertRemotePort(dto.getInputPorts()), true); } if (dto.getOutputPorts() != null) { - setOutputPorts(convertRemotePort(dto.getOutputPorts())); + setOutputPorts(convertRemotePort(dto.getOutputPorts()), true); } // set the controller details diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index ef69906b5d..d006cffa8d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -347,7 +347,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public ControllerServiceNode findControllerService(final String id) { + public ControllerServiceNode findControllerService(final String id, final boolean includeDescendants, final boolean includeAncestors) { return serviceMap.get(id); } @@ -678,4 +678,8 @@ public class MockProcessGroup implements ProcessGroup { @Override public void verifyCanShowLocalModifications() { } + + @Override + public void onComponentModified() { + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 98c7bc8e8a..49452963f4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -3731,7 +3731,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false); + final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false); return versionedGroup; } @@ -3753,21 +3753,39 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), false, NiFiUserUtils.getNiFiUser()); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false); + final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false); final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents(); final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup); final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup); - final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor()); + final Set ancestorServiceIds = getAncestorGroupServiceIds(processGroup); + final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor()); final FlowComparison flowComparison = flowComparator.compare(); - final Set differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison); + final Set differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison, + diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED && diff.getDifferenceType() != DifferenceType.STYLE_CHANGED); + final FlowComparisonEntity entity = new FlowComparisonEntity(); entity.setComponentDifferences(differenceDtos); return entity; } + private Set getAncestorGroupServiceIds(final ProcessGroup group) { + final Set ancestorServiceIds; + ProcessGroup parentGroup = group.getParent(); + + if (parentGroup == null) { + ancestorServiceIds = Collections.emptySet(); + } else { + ancestorServiceIds = parentGroup.getControllerServices(true).stream() + .map(ControllerServiceNode::getIdentifier) + .collect(Collectors.toSet()); + } + + return ancestorServiceIds; + } + @Override public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, NiFiRegistryException { final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId); @@ -3852,12 +3870,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, flowRegistryClient, true); + final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true); final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents); final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Versioned Flow", updatedSnapshot.getFlowContents()); - final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, new StaticDifferenceDescriptor()); + final Set ancestorGroupServiceIds = getAncestorGroupServiceIds(group); + final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorGroupServiceIds, new StaticDifferenceDescriptor()); final FlowComparison comparison = flowComparator.compare(); final Set affectedComponents = comparison.getDifferences().stream() diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 3fa4462245..7b753d6dc9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -1717,7 +1717,7 @@ public class ProcessGroupResource extends ApplicationResource { // To accomplish this, we call updateProcessGroupContents() passing 'true' for the updateSettings flag but null out the position. flowSnapshot.getFlowContents().setPosition(null); entity = serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId, - versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, true, true); + versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, true, true); } populateRemainingProcessGroupEntityContent(entity); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 5b33d9025d..8e5974ac54 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -117,6 +117,8 @@ import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedComponent; +import org.apache.nifi.registry.flow.VersionedFlowState; +import org.apache.nifi.registry.flow.VersionedFlowStatus; import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; @@ -213,6 +215,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -2187,17 +2190,23 @@ public final class DtoFactory { public Set createComponentDifferenceDtos(final FlowComparison comparison) { + return createComponentDifferenceDtos(comparison, null); + } + + public Set createComponentDifferenceDtos(final FlowComparison comparison, final Predicate filter) { final Map> differencesByComponent = new HashMap<>(); for (final FlowDifference difference : comparison.getDifferences()) { - final ComponentDifferenceDTO componentDiff = createComponentDifference(difference); - final List differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>()); + if (filter == null || filter.test(difference)) { + final ComponentDifferenceDTO componentDiff = createComponentDifference(difference); + final List differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>()); - final DifferenceDTO dto = new DifferenceDTO(); - dto.setDifferenceType(difference.getDifferenceType().getDescription()); - dto.setDifference(difference.getDescription()); + final DifferenceDTO dto = new DifferenceDTO(); + dto.setDifferenceType(difference.getDifferenceType().getDescription()); + dto.setDifference(difference.getDescription()); - differences.add(dto); + differences.add(dto); + } } for (final Map.Entry> entry : differencesByComponent.entrySet()) { @@ -2252,6 +2261,12 @@ public final class DtoFactory { dto.setVersion(versionControlInfo.getVersion()); dto.setCurrent(versionControlInfo.isCurrent()); dto.setModified(versionControlInfo.isModified()); + + final VersionedFlowStatus status = versionControlInfo.getStatus(); + final VersionedFlowState state = status.getState(); + dto.setState(state == null ? null : state.name()); + dto.setStateExplanation(status.getStateExplanation()); + return dto; } @@ -3488,6 +3503,8 @@ public final class DtoFactory { copy.setVersion(original.getVersion()); copy.setCurrent(original.getCurrent()); copy.setModified(original.getModified()); + copy.setState(original.getState()); + copy.setStateExplanation(original.getStateExplanation()); return copy; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 29e5f7d7cb..907c8dc73f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -52,6 +52,7 @@ import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.ContentNotFoundException; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -172,6 +173,10 @@ public class ControllerFacade implements Authorizable { } } + public ControllerServiceProvider getControllerServiceProvider() { + return flowController; + } + /** * Sets the name of this controller. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java index 0f9ec7a272..4d8e984c7a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.web.dao.impl; +import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS; + import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.state.Scope; @@ -45,8 +47,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS; - public class StandardControllerServiceDAO extends ComponentDAO implements ControllerServiceDAO { private ControllerServiceProvider serviceProvider; @@ -172,6 +172,22 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro } } + controllerService.getProcessGroup().onComponentModified(); + + // For any component that references this Controller Service, find the component's Process Group + // and notify the Process Group that a component has been modified. This way, we know to re-calculate + // whether or not the Process Group has local modifications. + final ProcessGroup group = controllerService.getProcessGroup(); + controllerService.getReferences().getReferencingComponents().stream() + .map(ConfiguredComponent::getProcessGroupIdentifier) + .filter(id -> !id.equals(group.getIdentifier())) + .forEach(groupId -> { + final ProcessGroup descendant = group.findProcessGroup(groupId); + if (descendant != null) { + descendant.onComponentModified(); + } + }); + return controllerService; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java index e4ec239ea3..60426c066a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java @@ -91,6 +91,8 @@ public class StandardFunnelDAO extends ComponentDAO implements FunnelDAO { } } + funnel.getProcessGroup().onComponentModified(); + return funnel; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java index f830e9b3ff..2d47720f28 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java @@ -237,6 +237,7 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO { inputPort.setMaxConcurrentTasks(concurrentTasks); } + inputPort.getProcessGroup().onComponentModified(); return inputPort; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java index 2a8b19f081..b8105e6fc7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java @@ -102,6 +102,7 @@ public class StandardLabelDAO extends ComponentDAO implements LabelDAO { label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight())); } + label.getProcessGroup().onComponentModified(); return label; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java index bad9e3ae72..72bc49bfa2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java @@ -233,6 +233,7 @@ public class StandardOutputPortDAO extends ComponentDAO implements PortDAO { outputPort.setMaxConcurrentTasks(concurrentTasks); } + outputPort.getProcessGroup().onComponentModified(); return outputPort; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index bb7edb1af0..1aaf4cc5a3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -204,7 +204,11 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou CompletableFuture future = CompletableFuture.completedFuture(null); for (final String serviceId : serviceIds) { - final ControllerServiceNode serviceNode = group.findControllerService(serviceId); + final ControllerServiceNode serviceNode = group.findControllerService(serviceId, true, true); + if (serviceNode == null) { + throw new ResourceNotFoundException("Could not find Controller Service with identifier " + serviceId); + } + if (ControllerServiceState.ENABLED.equals(state)) { final CompletableFuture serviceFuture = flowController.enableControllerService(serviceNode); future = CompletableFuture.allOf(future, serviceFuture); @@ -234,6 +238,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou group.setComments(comments); } + group.onComponentModified(); return group; } @@ -247,7 +252,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou final String registryName = flowRegistry == null ? registryId : flowRegistry.getName(); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController.getFlowRegistryClient(), false); + final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController, flowController.getFlowRegistryClient(), false); final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation) .registryName(registryName) @@ -257,6 +262,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou .build(); group.setVersionControlInformation(vci, versionedComponentMapping); + group.onComponentModified(); return group; } @@ -279,6 +285,8 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou .build(); group.setVersionControlInformation(svci, Collections.emptyMap()); + group.onComponentModified(); + return group; } @@ -295,6 +303,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou .forEach(var -> variableMap.put(var.getName(), var.getValue())); group.setVariables(variableMap); + group.onComponentModified(); return group; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java index ffbe21cd33..95d0b54b84 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java @@ -411,6 +411,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { // configure the processor configureProcessor(processor, processorDTO); + parentGroup.onComponentModified(); // attempt to change the underlying processor if an updated bundle is specified // updating the bundle must happen after configuring so that any additional classpath resources are set first diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index d638839848..c570dfcd4b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -314,6 +314,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot // perform the update updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup); + remoteProcessGroup.getProcessGroup().onComponentModified(); return port; } @@ -332,6 +333,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot // perform the update updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup); + remoteProcessGroup.getProcessGroup().onComponentModified(); return port; } @@ -373,8 +375,6 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot public RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroupDTO remoteProcessGroupDTO) { RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupDTO.getId()); return updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO); - - } private RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroup remoteProcessGroup, RemoteProcessGroupDTO remoteProcessGroupDTO) { @@ -447,6 +447,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot } } + remoteProcessGroup.getProcessGroup().onComponentModified(); return remoteProcessGroup; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java index 7fdaf56b37..a801dcb8b3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java @@ -26,6 +26,7 @@ import org.apache.nifi.web.api.dto.DtoFactory; import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.api.entity.AffectedComponentEntity; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; @@ -38,6 +39,9 @@ public class AffectedComponentUtils { case AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR: final ProcessorEntity procEntity = serviceFacade.getProcessor(componentEntity.getId(), user); return dtoFactory.createAffectedComponentEntity(procEntity); + case AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE: + final ControllerServiceEntity serviceEntity = serviceFacade.getControllerService(componentEntity.getId(), user); + return dtoFactory.createAffectedComponentEntity(serviceEntity); case AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT: { final RemoteProcessGroupEntity remoteGroupEntity = serviceFacade.getRemoteProcessGroup(componentEntity.getComponent().getProcessGroupId(), user); final RemoteProcessGroupContentsDTO remoteGroupContents = remoteGroupEntity.getComponent().getContents(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java index 3961be7f3f..28fef0f9a8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java @@ -107,7 +107,8 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle final int scheduleComponentStatus = clusterResponse.getStatus(); if (scheduleComponentStatus != Status.OK.getStatusCode()) { - throw new LifecycleManagementException("Failed to transition components to a state of " + desiredState); + final String explanation = getResponseEntity(clusterResponse, String.class); + throw new LifecycleManagementException("Failed to transition components to a state of " + desiredState + " due to " + explanation); } final boolean processorsTransitioned = waitForProcessorStatus(user, exampleUri, groupId, componentMap, desiredState, pause); @@ -312,7 +313,8 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle final int disableServicesStatus = clusterResponse.getStatus(); if (disableServicesStatus != Status.OK.getStatusCode()) { - throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState); + final String explanation = getResponseEntity(clusterResponse, String.class); + throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState + " due to " + explanation); } final boolean serviceTransitioned = waitForControllerServiceStatus(user, originalUri, groupId, affectedServiceIds, desiredState, pause);