From adacb204a8b6518a79600253463a36c9f8afaa37 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 17 Nov 2017 11:02:33 -0500 Subject: [PATCH] NIFI-4436: Bug fixes - Checkpoint before allowing multiple Process Groups with same Versioned Component ID and same parent - Ensure that if flow update is cancelled while processors are being stopped/services disabled that we stop waiting for that to occur. Also ensure that if we fail to update flow that we re-enable/restart the processors and services - Updated verbiage to use a ConciseEvolvingDifferentDescriptor when getting local modifications for a versioned flow - Do not allow outer process group to be saved to flow registry or have local modifications reverted if it has a descendant process group that is under version control and is dirty. Fixed bug where ComponentDifferenceDTO was populated with wrong component id and group id Signed-off-by: Matt Gilman --- .../entity/VersionedFlowSnapshotEntity.java | 11 + .../org/apache/nifi/groups/ProcessGroup.java | 27 +- .../nifi/registry/flow/FlowRegistry.java | 9 +- .../nifi/controller/FlowController.java | 54 +++- .../nifi/groups/StandardProcessGroup.java | 143 +++++++-- .../registry/flow/RestBasedFlowRegistry.java | 17 +- .../service/mock/MockProcessGroup.java | 14 +- .../apache/nifi/web/NiFiServiceFacade.java | 34 ++- .../nifi/web/StandardNiFiServiceFacade.java | 93 ++---- .../nifi/web/api/ProcessGroupResource.java | 11 +- .../apache/nifi/web/api/VersionsResource.java | 285 ++++++++---------- .../concurrent/AsynchronousWebRequest.java | 8 + .../StandardAsynchronousWebRequest.java | 7 + .../apache/nifi/web/api/dto/DtoFactory.java | 15 +- .../apache/nifi/web/dao/ProcessGroupDAO.java | 4 +- .../web/dao/impl/StandardProcessGroupDAO.java | 10 +- .../nifi/web/util/CancellableTimedPause.java | 2 +- .../apache/nifi/web/util/SnippetUtils.java | 16 +- 18 files changed, 496 insertions(+), 264 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java index 170640d873..2faf791d94 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java @@ -28,6 +28,7 @@ public class VersionedFlowSnapshotEntity extends Entity { private VersionedFlowSnapshot versionedFlowSnapshot; private RevisionDTO processGroupRevision; private String registryId; + private Boolean updateDescendantVersionedFlows; @ApiModelProperty("The versioned flow snapshot") public VersionedFlowSnapshot getVersionedFlowSnapshot() { @@ -55,4 +56,14 @@ public class VersionedFlowSnapshotEntity extends Entity { public void setRegistryId(String registryId) { this.registryId = registryId; } + + @ApiModelProperty("If the Process Group to be updated has a child or descendant Process Group that is also under " + + "Version Control, this specifies whether or not the contents of that child/descendant Process Group should be updated.") + public Boolean getUpdateDescendantVersionedFlows() { + return updateDescendantVersionedFlows; + } + + public void setUpdateDescendantVersionedFlows(Boolean update) { + this.updateDescendantVersionedFlows = update; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index 16b4b5eee0..d81b7d3a6c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -784,8 +784,10 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will * throw an IllegalStateException * @param updateSettings whether or not to update the process group's name and positions + * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to + * update the contents of that Process Group */ - void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings); + void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVersionedFlows); /** * Verifies a template with the specified name can be created. @@ -848,7 +850,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi void verifyCanUpdateVariables(Map updatedVariables); /** - * Ensure that the contents of the Process Group can be update to match the given new flow + * Ensures that the contents of the Process Group can be update to match the given new flow * * @param updatedFlow the updated version of the flow * @param verifyConnectionRemoval whether or not to verify that connections that are not present in the updated flow can be removed @@ -859,6 +861,27 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi */ void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty); + /** + * Ensures that the Process Group can have any local changes reverted + * + * @throws IllegalStateException if the Process Group is not in a state that will allow local changes to be reverted + */ + void verifyCanRevertLocalModifications(); + + /** + * Ensures that the Process Group can have its local modifications shown + * + * @throws IllegalStateException if the Process Group is not in a state that will allow local modifications to be shown + */ + void verifyCanShowLocalModifications(); + + /** + * Ensure that the contents of the Process Group can be saved to a Flow Registry in its current state + * + * @throws IllegalStateException if the Process Group cannot currently be saved to a Flow Registry + */ + void verifyCanSaveToFlowRegistry(String registryId, String bucketId, String flowId); + /** * Adds the given template to this Process Group * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java index 76f96f25db..ae43bb5907 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java @@ -159,6 +159,9 @@ public interface FlowRegistry { * @param bucketId the ID of the bucket * @param flowId the ID of the flow * @param version the version to retrieve + * @param fetchRemoteFlows if the remote flow has a child Process Group that also tracks to a remote flow, this specifies whether or not + * the child's contents should be fetched. + * @param user the user on whose behalf the flow contents are being retrieved * @return the contents of the Flow from the Flow Registry * * @throws IOException if unable to communicate with the Flow Registry @@ -167,7 +170,7 @@ public interface FlowRegistry { * @throws NullPointerException if any of the arguments is not specified * @throws IllegalArgumentException if the given version is less than 1 */ - VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, NiFiUser user) throws IOException, NiFiRegistryException; + VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, boolean fetchRemoteFlows, NiFiUser user) throws IOException, NiFiRegistryException; /** * Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry @@ -175,6 +178,8 @@ public interface FlowRegistry { * @param bucketId the ID of the bucket * @param flowId the ID of the flow * @param version the version to retrieve + * @param fetchRemoteFlows if the remote flow has a child Process Group that also tracks to a remote flow, this specifies whether or not + * the child's contents should be fetched. * @return the contents of the Flow from the Flow Registry * * @throws IOException if unable to communicate with the Flow Registry @@ -183,7 +188,7 @@ public interface FlowRegistry { * @throws NullPointerException if any of the arguments is not specified * @throws IllegalArgumentException if the given version is less than 1 */ - VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, NiFiRegistryException; + VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, boolean fetchRemoteFlows) throws IOException, NiFiRegistryException; /** * Retrieves a VersionedFlow by bucket id and flow id diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 5ed5b6e378..3909387688 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -165,8 +165,11 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.StandardVersionControlInformation; +import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.remote.HttpRemoteSiteListener; @@ -1775,6 +1778,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * processor */ public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException { + instantiateSnippet(group, dto, true); + } + + private void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto, final boolean topLevel) throws ProcessorInstantiationException { writeLock.lock(); try { validateSnippetContents(requireNonNull(group), dto); @@ -1789,6 +1796,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData()); serviceNode.setComments(controllerServiceDTO.getComments()); serviceNode.setName(controllerServiceDTO.getName()); + if (!topLevel) { + serviceNode.setVersionedComponentId(controllerServiceDTO.getVersionedComponentId()); + } group.addControllerService(serviceNode); } @@ -1812,6 +1822,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } label.setStyle(labelDTO.getStyle()); + if (!topLevel) { + label.setVersionedComponentId(labelDTO.getVersionedComponentId()); + } + group.addLabel(label); } @@ -1819,6 +1833,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R for (final FunnelDTO funnelDTO : dto.getFunnels()) { final Funnel funnel = createFunnel(funnelDTO.getId()); funnel.setPosition(toPosition(funnelDTO.getPosition())); + if (!topLevel) { + funnel.setVersionedComponentId(funnelDTO.getVersionedComponentId()); + } + group.addFunnel(funnel); } @@ -1840,6 +1858,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R inputPort = createLocalInputPort(portDTO.getId(), portDTO.getName()); } + if (!topLevel) { + inputPort.setVersionedComponentId(portDTO.getVersionedComponentId()); + } inputPort.setPosition(toPosition(portDTO.getPosition())); inputPort.setProcessGroup(group); inputPort.setComments(portDTO.getComments()); @@ -1861,6 +1882,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R outputPort = createLocalOutputPort(portDTO.getId(), portDTO.getName()); } + if (!topLevel) { + outputPort.setVersionedComponentId(portDTO.getVersionedComponentId()); + } outputPort.setPosition(toPosition(portDTO.getPosition())); outputPort.setProcessGroup(group); outputPort.setComments(portDTO.getComments()); @@ -1876,6 +1900,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R procNode.setPosition(toPosition(processorDTO.getPosition())); procNode.setProcessGroup(group); + if (!topLevel) { + procNode.setVersionedComponentId(processorDTO.getVersionedComponentId()); + } final ProcessorConfigDTO config = processorDTO.getConfig(); procNode.setComments(config.getComments()); @@ -1936,6 +1963,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition())); remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout()); remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration()); + if (!topLevel) { + remoteGroup.setVersionedComponentId(remoteGroupDTO.getVersionedComponentId()); + } + if (remoteGroupDTO.getTransportProtocol() == null) { remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.RAW); } else { @@ -1979,6 +2010,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R childGroup.setVariables(groupDTO.getVariables()); } + // If this Process Group is 'top level' then we do not set versioned component ID's. + // We do this only if this component is the child of a Versioned Component. + if (!topLevel) { + childGroup.setVersionedComponentId(groupDTO.getVersionedComponentId()); + } + group.addProcessGroup(childGroup); final FlowSnippetDTO contents = groupDTO.getContents(); @@ -1995,7 +2032,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R childTemplateDTO.setFunnels(contents.getFunnels()); childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups()); childTemplateDTO.setControllerServices(contents.getControllerServices()); - instantiateSnippet(childGroup, childTemplateDTO); + instantiateSnippet(childGroup, childTemplateDTO, false); + + if (groupDTO.getVersionControlInformation() != null) { + final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(); + final VersionedProcessGroup versionedGroup = flowMapper.mapProcessGroup(childGroup, getFlowRegistryClient(), false); + + final VersionControlInformation vci = StandardVersionControlInformation.Builder + .fromDto(groupDTO.getVersionControlInformation()) + .flowSnapshot(versionedGroup) + .build(); + childGroup.setVersionControlInformation(vci, Collections.emptyMap()); + } } // @@ -2039,6 +2087,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } final Connection connection = createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships); + if (!topLevel) { + connection.setVersionedComponentId(connectionDTO.getVersionedComponentId()); + } if (connectionDTO.getBends() != null) { final List bendPoints = new ArrayList<>(); @@ -2088,6 +2139,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R for (final RemoteProcessGroupPortDTO port : ports) { final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); descriptor.setId(port.getId()); + descriptor.setVersionedComponentId(port.getVersionedComponentId()); descriptor.setTargetId(port.getTargetId()); descriptor.setName(port.getName()); descriptor.setComments(port.getComments()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index d1aa4e2f86..51839d0390 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -2821,7 +2821,7 @@ public final class StandardProcessGroup implements ProcessGroup { versionControlInformation.getBucketIdentifier(), versionControlInformation.getFlowIdentifier(), versionControlInformation.getVersion(), - versionControlInformation.getFlowSnapshot(), + stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true), versionControlInformation.isModified(), versionControlInformation.isCurrent()) { @@ -2849,6 +2849,51 @@ public final class StandardProcessGroup implements ProcessGroup { } } + private VersionedProcessGroup stripContentsFromRemoteDescendantGroups(final VersionedProcessGroup processGroup, final boolean topLevel) { + if (processGroup == null) { + return null; + } + + final VersionedProcessGroup copy = new VersionedProcessGroup(); + copy.setComments(processGroup.getComments()); + copy.setComponentType(processGroup.getComponentType()); + copy.setGroupIdentifier(processGroup.getGroupIdentifier()); + copy.setIdentifier(processGroup.getIdentifier()); + copy.setName(processGroup.getName()); + copy.setPosition(processGroup.getPosition()); + copy.setVersionedFlowCoordinates(topLevel ? null : processGroup.getVersionedFlowCoordinates()); + copy.setConnections(processGroup.getConnections()); + copy.setControllerServices(processGroup.getControllerServices()); + copy.setFunnels(processGroup.getFunnels()); + copy.setInputPorts(processGroup.getInputPorts()); + copy.setOutputPorts(processGroup.getOutputPorts()); + copy.setProcessors(processGroup.getProcessors()); + copy.setRemoteProcessGroups(processGroup.getRemoteProcessGroups()); + copy.setVariables(processGroup.getVariables()); + + final Set copyChildren = new HashSet<>(); + + for (final VersionedProcessGroup childGroup : processGroup.getProcessGroups()) { + if (childGroup.getVersionedFlowCoordinates() == null) { + copyChildren.add(stripContentsFromRemoteDescendantGroups(childGroup, false)); + } else { + final VersionedProcessGroup childCopy = new VersionedProcessGroup(); + childCopy.setComments(childGroup.getComments()); + childCopy.setComponentType(childGroup.getComponentType()); + childCopy.setGroupIdentifier(childGroup.getGroupIdentifier()); + childCopy.setIdentifier(childGroup.getIdentifier()); + childCopy.setName(childGroup.getName()); + childCopy.setPosition(childGroup.getPosition()); + childCopy.setVersionedFlowCoordinates(childGroup.getVersionedFlowCoordinates()); + + copyChildren.add(childCopy); + } + } + + copy.setProcessGroups(copyChildren); + return copy; + } + @Override public void disconnectVersionControl() { writeLock.lock(); @@ -2900,7 +2945,7 @@ public final class StandardProcessGroup implements ProcessGroup { }); processGroup.getProcessGroups().stream() - .filter(childGroup -> childGroup.getVersionControlInformation() != null) + .filter(childGroup -> childGroup.getVersionControlInformation() == null) .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup)); } @@ -2925,7 +2970,7 @@ public final class StandardProcessGroup implements ProcessGroup { // We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry. // This allows us to know whether or not the flow has been modified since it was last synced with the Flow Registry. try { - final VersionedFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion()); + final VersionedFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion(), false); final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents(); vci.setFlowSnapshot(registryFlow); } catch (final IOException | NiFiRegistryException e) { @@ -2958,7 +3003,8 @@ public final class StandardProcessGroup implements ProcessGroup { @Override - public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings) { + public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings, + final boolean updateDescendantVersionedFlows) { writeLock.lock(); try { verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); @@ -2986,7 +3032,7 @@ public final class StandardProcessGroup implements ProcessGroup { } final Set knownVariables = getKnownVariableNames(); - updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, knownVariables); + updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, updateDescendantVersionedFlows, knownVariables); } catch (final ProcessorInstantiationException pie) { throw new RuntimeException(pie); } finally { @@ -3013,7 +3059,8 @@ public final class StandardProcessGroup implements ProcessGroup { private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed, - final Set updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final Set variablesToSkip) throws ProcessorInstantiationException { + final Set updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final boolean updateDescendantVersionedGroups, + final Set variablesToSkip) throws ProcessorInstantiationException { group.setComments(proposed.getComments()); @@ -3033,14 +3080,8 @@ public final class StandardProcessGroup implements ProcessGroup { .map(VariableDescriptor::getName) .collect(Collectors.toSet()); - final Set variablesRemoved = new HashSet<>(existingVariableNames); - - if (proposed.getVariables() != null) { - variablesRemoved.removeAll(proposed.getVariables().keySet()); - } final Map updatedVariableMap = new HashMap<>(); - variablesRemoved.forEach(var -> updatedVariableMap.put(var, null)); // If any new variables exist in the proposed flow, add those to the variable registry. for (final Map.Entry entry : proposed.getVariables().entrySet()) { @@ -3069,6 +3110,7 @@ public final class StandardProcessGroup implements ProcessGroup { .flowId(flowId) .flowName(flowId) // flow id not yet known .version(version) + .flowSnapshot(proposed) .modified(false) .current(true) .build(); @@ -3084,11 +3126,13 @@ public final class StandardProcessGroup implements ProcessGroup { for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); + final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates(); + if (childGroup == null) { final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip); LOG.info("Added {} to {}", added, this); - } else { - updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, variablesToSkip); + } else if (childCoordinates == null || updateDescendantVersionedGroups) { + updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, updateDescendantVersionedGroups, variablesToSkip); LOG.info("Updated {}", childGroup); } @@ -3367,7 +3411,7 @@ public final class StandardProcessGroup implements ProcessGroup { final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed)); group.setVersionedComponentId(proposed.getIdentifier()); group.setParent(destination); - updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, variablesToSkip); + updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, true, variablesToSkip); destination.addProcessGroup(group); return group; } @@ -3739,7 +3783,7 @@ public final class StandardProcessGroup implements ProcessGroup { } final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), true); + final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false); final ComparableDataFlow currentFlow = new ComparableDataFlow() { @Override @@ -3765,7 +3809,7 @@ public final class StandardProcessGroup implements ProcessGroup { } }; - final FlowComparator flowComparator = new StandardFlowComparator(currentFlow, snapshotFlow, new EvolvingDifferenceDescriptor()); + final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor()); final FlowComparison comparison = flowComparator.compare(); final Set differences = comparison.getDifferences(); final Set functionalDifferences = differences.stream() @@ -4002,4 +4046,69 @@ public final class StandardProcessGroup implements ProcessGroup { findAllProcessGroups(child, map); } } + + @Override + public void verifyCanSaveToFlowRegistry(final String registryId, final String bucketId, final String flowId) { + verifyNoDescendantsWithLocalModifications("be saved to a Flow Registry"); + + final StandardVersionControlInformation vci = versionControlInfo.get(); + if (vci != null) { + if (flowId != null && flowId.equals(vci.getFlowIdentifier())) { + // Flow ID is the same. We want to publish the Process Group as the next version of the Flow. + // In order to do this, we have to ensure that the Process Group is 'current'. + final boolean current = vci.isCurrent(); + if (!current) { + throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier() + + " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. " + + "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry."); + } + + // Flow ID matches. We want to publish the Process Group as the next version of the Flow, so we must + // ensure that all other parameters match as well. + if (!bucketId.equals(vci.getBucketIdentifier())) { + throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier() + + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request."); + } + + if (!registryId.equals(vci.getRegistryIdentifier())) { + throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier() + + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request."); + } + } else if (flowId != null) { + // Flow ID is specified but different. This is not allowed, because Flow ID's are automatically generated, + // and if the client is specifying an ID then it is either trying to assign the ID of the Flow or it is + // attempting to save a new version of a different flow. Saving a new version of a different Flow is + // not allowed because the Process Group must be in synch with the latest version of the flow before that + // can be done. + throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier() + + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request."); + } + } + } + + @Override + public void verifyCanRevertLocalModifications() { + final StandardVersionControlInformation svci = versionControlInfo.get(); + if (svci == null) { + throw new IllegalStateException("Cannot revert local modifications to Process Group because the Process Group is not under Version Control."); + } + + verifyNoDescendantsWithLocalModifications("have its local modifications reverted"); + } + + @Override + public void verifyCanShowLocalModifications() { + + } + + private void verifyNoDescendantsWithLocalModifications(final String action) { + for (final ProcessGroup descendant : findAllProcessGroups()) { + final VersionControlInformation descendantVci = descendant.getVersionControlInformation(); + if (descendantVci != null && descendantVci.isModified()) { + throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and " + + "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before " + + "this action can be performed on the parent Process Group."); + } + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java index 8bf89c620d..1d3eec606d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java @@ -178,21 +178,24 @@ public class RestBasedFlowRegistry implements FlowRegistry { } @Override - public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final NiFiUser user) throws IOException, NiFiRegistryException { + public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows, final NiFiUser user) + throws IOException, NiFiRegistryException { final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user)); final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version); - final VersionedProcessGroup contents = flowSnapshot.getFlowContents(); - for (final VersionedProcessGroup child : contents.getProcessGroups()) { - populateVersionedContentsRecursively(child, user); + if (fetchRemoteFlows) { + final VersionedProcessGroup contents = flowSnapshot.getFlowContents(); + for (final VersionedProcessGroup child : contents.getProcessGroups()) { + populateVersionedContentsRecursively(child, user); + } } return flowSnapshot; } @Override - public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException { - return getFlowContents(bucketId, flowId, version, null); + public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows) throws IOException, NiFiRegistryException { + return getFlowContents(bucketId, flowId, version, fetchRemoteFlows, null); } private void populateVersionedContentsRecursively(final VersionedProcessGroup group, final NiFiUser user) throws NiFiRegistryException, IOException { @@ -214,7 +217,7 @@ public class RestBasedFlowRegistry implements FlowRegistry { } final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId); - final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version, user); + final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version, true, user); final VersionedProcessGroup contents = snapshot.getFlowContents(); group.setComments(contents.getComments()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index db4ac590d8..ef69906b5d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -649,12 +649,16 @@ public class MockProcessGroup implements ProcessGroup { public void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty) { } + @Override + public void verifyCanSaveToFlowRegistry(String registryId, String bucketId, String flowId) { + } + @Override public void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry) { } @Override - public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings) { + public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVerisonedFlows) { } @Override @@ -666,4 +670,12 @@ public class MockProcessGroup implements ProcessGroup { public void disconnectVersionControl() { this.versionControlInfo = null; } + + @Override + public void verifyCanRevertLocalModifications() { + } + + @Override + public void verifyCanShowLocalModifications() { + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 76cd2c4109..02df16b400 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -1368,11 +1368,13 @@ public interface NiFiServiceFacade { * Retrieves the Versioned Flow Snapshot for the coordinates provided by the given Version Control Information DTO * * @param versionControlInfo the coordinates of the versioned flow + * @param fetchRemoteFlows if the contents of Versioned Flow that is fetched contains a child/descendant Process Group + * that is also under Version Control, this indicates whether that remote flow should also be fetched * @return the VersionedFlowSnapshot that corresponds to the given coordinates * * @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found */ - VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo) throws IOException; + VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows) throws IOException; /** * Returns the name of the Flow Registry that is registered with the given ID. If no Flow Registry exists with the given ID, will return @@ -1406,6 +1408,28 @@ public interface NiFiServiceFacade { */ void verifyCanUpdate(String groupId, VersionedFlowSnapshot proposedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty); + /** + * Verifies that the Process Group with the given identifier can be saved to the flow registry + * + * @param groupId the ID of the Process Group + * @param registryId the ID of the Flow Registry + * @param bucketId the ID of the bucket + * @param flowId the ID of the flow + * + * @throws IllegalStateException if the Process Group cannot be saved to the flow registry with the coordinates specified + */ + void verifyCanSaveToFlowRegistry(String groupId, String registryId, String bucketId, String flowId); + + /** + * Verifies that the Process Group with the given identifier can have its local modifications reverted to the given VersionedFlowSnapshot + * + * @param groupId the ID of the Process Group + * @param versionedFlowSnapshot the Versioned Flow Snapshot + * + * @throws IllegalStateException if the Process Group cannot have its local modifications reverted + */ + void verifyCanRevertLocalModifications(String groupId, VersionedFlowSnapshot versionedFlowSnapshot); + /** * Updates the Process group with the given ID to match the new snapshot * @@ -1414,10 +1438,12 @@ public interface NiFiServiceFacade { * @param versionControlInfo the Version Control information * @param snapshot the new snapshot * @param componentIdSeed the seed to use for generating new component ID's + * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to + * update the contents of that Process Group * @return the Process Group */ ProcessGroupEntity updateProcessGroup(Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed, - boolean verifyNotModified); + boolean verifyNotModified, boolean updateDescendantVersionedFlows); /** * Updates the Process group with the given ID to match the new snapshot @@ -1429,10 +1455,12 @@ public interface NiFiServiceFacade { * @param snapshot the new snapshot * @param componentIdSeed the seed to use for generating new component ID's * @param updateSettings whether or not the process group's name and position should be updated + * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to + * update the contents of that Process Group * @return the Process Group */ ProcessGroupEntity updateProcessGroupContents(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed, - boolean verifyNotModified, boolean updateSettings); + boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows); // ---------------------------------------- // Component state methods diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 4d1bbbcdf9..c66aebb401 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -97,13 +97,12 @@ import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedComponent; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedFlow; -import org.apache.nifi.registry.flow.VersionedFlowCoordinates; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; +import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor; import org.apache.nifi.registry.flow.diff.DifferenceType; -import org.apache.nifi.registry.flow.diff.EvolvingDifferenceDescriptor; import org.apache.nifi.registry.flow.diff.FlowComparator; import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; @@ -3751,10 +3750,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(), - versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser()); + versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), false, NiFiUserUtils.getNiFiUser()); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); - final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, true); + final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false); final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents(); final ComparableDataFlow localFlow = new ComparableDataFlow() { @@ -3781,7 +3780,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } }; - final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new EvolvingDifferenceDescriptor()); + final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor()); final FlowComparison flowComparison = flowComparator.compare(); final Set differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison); @@ -3852,6 +3851,23 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty); } + @Override + public void verifyCanSaveToFlowRegistry(final String groupId, final String registryId, final String bucketId, final String flowId) { + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + group.verifyCanSaveToFlowRegistry(registryId, bucketId, flowId); + } + + @Override + public void verifyCanRevertLocalModifications(final String groupId, final VersionedFlowSnapshot versionedFlowSnapshot) { + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + group.verifyCanRevertLocalModifications(); + + // verify that the process group can be updated to the given snapshot. We do not verify that connections can + // be removed, because the flow may still be running, and it only matters that the connections can be removed once the components + // have been stopped. + group.verifyCanUpdate(versionedFlowSnapshot, false, false); + } + @Override public Set getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) throws IOException { final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId); @@ -4028,7 +4044,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo) throws IOException { + public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) throws IOException { final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId()); if (flowRegistry == null) { throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId()); @@ -4036,15 +4052,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final VersionedFlowSnapshot snapshot; try { - snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser()); + snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser()); } catch (final NiFiRegistryException e) { throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket " + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion()); } - // If this Flow has a reference to a remote flow, we need to pull that remote flow as well - populateVersionedChildFlows(snapshot); - return snapshot; } @@ -4054,74 +4067,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return flowRegistry == null ? flowRegistryId : flowRegistry.getName(); } - private void populateVersionedChildFlows(final VersionedFlowSnapshot snapshot) throws IOException { - final VersionedProcessGroup group = snapshot.getFlowContents(); - - for (final VersionedProcessGroup child : group.getProcessGroups()) { - populateVersionedFlows(child); - } - } - - private void populateVersionedFlows(final VersionedProcessGroup group) throws IOException { - final VersionedFlowCoordinates remoteCoordinates = group.getVersionedFlowCoordinates(); - - if (remoteCoordinates != null) { - final String registryUrl = remoteCoordinates.getRegistryUrl(); - final String registryId = flowRegistryClient.getFlowRegistryId(registryUrl); - if (registryId == null) { - throw new IllegalArgumentException("Process Group with ID " + group.getIdentifier() + " is under Version Control, referencing a Flow Registry at URL [" + registryUrl - + "], but no Flow Registry is currently registered for that URL."); - } - - final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId); - - final VersionedFlowSnapshot childSnapshot; - try { - childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion(), NiFiUserUtils.getNiFiUser()); - } catch (final NiFiRegistryException e) { - throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket " - + remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion()); - } - - final VersionedProcessGroup fetchedGroup = childSnapshot.getFlowContents(); - group.setComments(fetchedGroup.getComments()); - group.setPosition(fetchedGroup.getPosition()); - group.setName(fetchedGroup.getName()); - group.setVariables(fetchedGroup.getVariables()); - - group.setConnections(new LinkedHashSet<>(fetchedGroup.getConnections())); - group.setControllerServices(new LinkedHashSet<>(fetchedGroup.getControllerServices())); - group.setFunnels(new LinkedHashSet<>(fetchedGroup.getFunnels())); - group.setInputPorts(new LinkedHashSet<>(fetchedGroup.getInputPorts())); - group.setLabels(new LinkedHashSet<>(fetchedGroup.getLabels())); - group.setOutputPorts(new LinkedHashSet<>(fetchedGroup.getOutputPorts())); - group.setProcessGroups(new LinkedHashSet<>(fetchedGroup.getProcessGroups())); - group.setProcessors(new LinkedHashSet<>(fetchedGroup.getProcessors())); - group.setRemoteProcessGroups(new LinkedHashSet<>(fetchedGroup.getRemoteProcessGroups())); - } - - for (final VersionedProcessGroup child : group.getProcessGroups()) { - populateVersionedFlows(child); - } - } - - @Override public ProcessGroupEntity updateProcessGroup(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo, - final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) { + final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); - return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true); + return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true, updateDescendantVersionedFlows); } @Override public ProcessGroupEntity updateProcessGroupContents(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo, - final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) { + final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) { final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId); final RevisionUpdate snapshot = updateComponent(user, revision, processGroupNode, - () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings), + () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows), processGroup -> dtoFactory.createProcessGroupDto(processGroup)); final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index de56a4f4eb..7262a821a9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -1644,7 +1644,7 @@ public class ProcessGroupResource extends ApplicationResource { if (versionControlInfo != null) { // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. // Step 2: Retrieve flow from Flow Registry - final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo); + final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true); final Bucket bucket = flowSnapshot.getBucket(); final VersionedFlow flow = flowSnapshot.getFlow(); @@ -1653,6 +1653,8 @@ public class ProcessGroupResource extends ApplicationResource { versionControlInfo.setFlowDescription(flow.getDescription()); versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId())); + versionControlInfo.setModified(false); + versionControlInfo.setCurrent(flowSnapshot.isLatest()); // Step 3: Resolve Bundle info BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents()); @@ -1709,8 +1711,13 @@ public class ProcessGroupResource extends ApplicationResource { final RevisionDTO revisionDto = entity.getRevision(); final String newGroupId = entity.getComponent().getId(); final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId); + + // We don't want the Process Group's position to be updated because we want to keep the position where the user + // placed the Process Group. However, we do want to use the name of the Process Group that is in the Flow Contents. + // To accomplish this, we call updateProcessGroupContents() passing 'true' for the updateSettings flag but null out the position. + flowSnapshot.getFlowContents().setPosition(null); entity = serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId, - versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, false); + versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, true, true); } populateRemainingProcessGroupEntityContent(entity); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index 3684f04b2f..f2a207e970 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -17,21 +17,6 @@ package org.apache.nifi.web.api; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.stream.Collectors; - import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -92,10 +77,24 @@ import org.apache.nifi.web.util.AffectedComponentUtils; import org.apache.nifi.web.util.CancellableTimedPause; import org.apache.nifi.web.util.ComponentLifecycle; import org.apache.nifi.web.util.LifecycleManagementException; -import org.apache.nifi.web.util.Pause; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -454,51 +453,15 @@ public class VersionsResource extends ApplicationResource { super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true); }, () -> { - final VersionControlInformationEntity entity = serviceFacade.getVersionControlInformation(groupId); - if (entity != null) { - final String flowId = requestEntity.getVersionedFlow().getFlowId(); - if (flowId != null && flowId.equals(entity.getVersionControlInformation().getFlowId())) { - // Flow ID is the same. We want to publish the Process Group as the next version of the Flow. - // In order to do this, we have to ensure that the Process Group is 'current'. - final Boolean current = entity.getVersionControlInformation().getCurrent(); - if (current == null) { - throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId - + " because it is not yet known whether or not this Process Group is the most recent version of the flow. " - + "Please try the request again after the Process Group has been synchronized with the Flow Registry."); - } - - if (current == Boolean.FALSE) { - throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId - + " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. " - + "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry."); - } - - // Flow ID matches. We want to publish the Process Group as the next version of the Flow, so we must - // ensure that all other parameters match as well. - if (!requestEntity.getVersionedFlow().getBucketId().equals(entity.getVersionControlInformation().getBucketId())) { - throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId - + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request."); - } - - if (!requestEntity.getVersionedFlow().getRegistryId().equals(entity.getVersionControlInformation().getRegistryId())) { - throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId - + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request."); - } - - } else if (flowId != null) { - // Flow ID is specified but different. This is not allowed, because Flow ID's are automatically generated, - // and if the client is specifying an ID then it is either trying to assign the ID of the Flow or it is - // attempting to save a new version of a different flow. Saving a new version of a different Flow is - // not allowed because the Process Group must be in synch with the latest version of the flow before that - // can be done. - throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId - + " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request."); - } - } + final VersionedFlowDTO versionedFlow = requestEntity.getVersionedFlow(); + final String registryId = versionedFlow.getRegistryId(); + final String bucketId = versionedFlow.getBucketId(); + final String flowId = versionedFlow.getFlowId(); + serviceFacade.verifyCanSaveToFlowRegistry(groupId, registryId, bucketId, flowId); }, (rev, flowEntity) -> { // Register the current flow with the Flow Registry. - final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, requestEntity); + final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, flowEntity); // Update the Process Group's Version Control Information final VersionControlInformationEntity responseEntity = serviceFacade.setVersionControlInformation(rev, groupId, @@ -756,7 +719,8 @@ public class VersionsResource extends ApplicationResource { versionControlInfoDto.setRegistryId(requestEntity.getRegistryId()); versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(requestEntity.getRegistryId())); - final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false); + final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false, + entity.getUpdateDescendantVersionedFlows()); final VersionControlInformationDTO updatedVci = updatedGroup.getComponent().getVersionControlInformation(); final VersionControlInformationEntity responseEntity = new VersionControlInformationEntity(); @@ -1039,7 +1003,7 @@ public class VersionsResource extends ApplicationResource { // 14. Re-Start all Processors, Funnels, Ports that are affected and not removed. // Step 0: Get the Versioned Flow Snapshot from the Flow Registry - final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation()); + final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true); // The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update // the flow snapshot to contain compatible bundles. @@ -1085,7 +1049,7 @@ public class VersionsResource extends ApplicationResource { final Consumer> updateTask = vcur -> { try { final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri, - affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, true); + affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, true, true); vcur.markComplete(updatedVersionControlEntity); } catch (final LifecycleManagementException e) { @@ -1188,7 +1152,7 @@ public class VersionsResource extends ApplicationResource { final String idGenerationSeed = getIdGenerationSeed().orElse(null); // Step 0: Get the Versioned Flow Snapshot from the Flow Registry - final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation()); + final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), false); // The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update // the flow snapshot to contain compatible bundles. @@ -1221,8 +1185,7 @@ public class VersionsResource extends ApplicationResource { () -> { // Step 3: Verify that all components in the snapshot exist on all nodes // Step 4: Verify that Process Group is already under version control. If not, must start Version Control instead of updating flow - // Step 5: Verify that Process Group is not 'dirty' - serviceFacade.verifyCanUpdate(groupId, flowSnapshot, false, false); + serviceFacade.verifyCanRevertLocalModifications(groupId, flowSnapshot); }, (revision, processGroupEntity) -> { // Ensure that the information passed in is correct @@ -1254,7 +1217,7 @@ public class VersionsResource extends ApplicationResource { final Consumer> updateTask = vcur -> { try { final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri, - affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false); + affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false, false); vcur.markComplete(updatedVersionControlEntity); } catch (final LifecycleManagementException e) { @@ -1288,7 +1251,7 @@ public class VersionsResource extends ApplicationResource { private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri, final Set affectedComponents, final NiFiUser user, final boolean replicateRequest, final VersionControlInformationEntity requestEntity, final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest asyncRequest, final String idGenerationSeed, - final boolean verifyNotModified) throws LifecycleManagementException { + final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException { // Steps 6-7: Determine which components must be stopped and stop them. final Set stoppableReferenceTypes = new HashSet<>(); @@ -1302,7 +1265,8 @@ public class VersionsResource extends ApplicationResource { .collect(Collectors.toSet()); logger.info("Stopping {} Processors", runningComponents.size()); - final Pause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + asyncRequest.setCancelCallback(stopComponentsPause::cancel); componentLifecycle.scheduleComponents(exampleUri, user, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause); if (asyncRequest.isCancelled()) { @@ -1317,7 +1281,8 @@ public class VersionsResource extends ApplicationResource { .collect(Collectors.toSet()); logger.info("Disabling {} Controller Services", enabledServices.size()); - final Pause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + asyncRequest.setCancelCallback(disableServicesPause::cancel); componentLifecycle.activateControllerServices(exampleUri, user, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause); if (asyncRequest.isCancelled()) { @@ -1328,96 +1293,113 @@ public class VersionsResource extends ApplicationResource { logger.info("Updating Process Group with ID {} to version {} of the Versioned Flow", groupId, flowSnapshot.getSnapshotMetadata().getVersion()); // If replicating request, steps 10-12 are performed on each node individually, and this is accomplished // by replicating a PUT to /nifi-api/versions/process-groups/{groupId} - if (replicateRequest) { + try { + if (replicateRequest) { - final URI updateUri; - try { - updateUri = new URI(exampleUri.getScheme(), exampleUri.getUserInfo(), exampleUri.getHost(), - exampleUri.getPort(), "/nifi-api/versions/process-groups/" + groupId, null, exampleUri.getFragment()); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - - final Map headers = new HashMap<>(); - headers.put("content-type", MediaType.APPLICATION_JSON); - - final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity(); - snapshotEntity.setProcessGroupRevision(requestEntity.getProcessGroupRevision()); - snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId()); - snapshotEntity.setVersionedFlow(flowSnapshot); - - final NodeResponse clusterResponse; - try { - if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse(); - } else { - clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse(); + final URI updateUri; + try { + updateUri = new URI(exampleUri.getScheme(), exampleUri.getUserInfo(), exampleUri.getHost(), + exampleUri.getPort(), "/nifi-api/versions/process-groups/" + groupId, null, exampleUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); } - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie); + + final Map headers = new HashMap<>(); + headers.put("content-type", MediaType.APPLICATION_JSON); + + final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity(); + snapshotEntity.setProcessGroupRevision(requestEntity.getProcessGroupRevision()); + snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId()); + snapshotEntity.setVersionedFlow(flowSnapshot); + snapshotEntity.setUpdateDescendantVersionedFlows(updateDescendantVersionedFlows); + + final NodeResponse clusterResponse; + try { + logger.debug("Replicating PUT request to {} for user {}", updateUri, user); + + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse(); + } else { + clusterResponse = getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse(); + } + } catch (final InterruptedException ie) { + logger.warn("Interrupted while replicating PUT request to {} for user {}", updateUri, user); + Thread.currentThread().interrupt(); + throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie); + } + + final int updateFlowStatus = clusterResponse.getStatus(); + if (updateFlowStatus != Status.OK.getStatusCode()) { + final String explanation = getResponseEntity(clusterResponse, String.class); + logger.error("Failed to update flow across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}", + updateUri, user, updateFlowStatus, explanation); + throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation); + } + + } else { + // Step 10: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot, + // that it has no data in it. Ensure that no Input Port was removed, unless it currently has no incoming connections. + // Ensure that no Output Port was removed, unless it currently has no outgoing connections. + serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, verifyNotModified); + + // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed + final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); + final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId); + final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation(); + + final Bucket bucket = flowSnapshot.getBucket(); + final VersionedFlow flow = flowSnapshot.getFlow(); + + final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata(); + final VersionControlInformationDTO vci = new VersionControlInformationDTO(); + vci.setBucketId(metadata.getBucketIdentifier()); + vci.setBucketName(bucket.getName()); + vci.setCurrent(flowSnapshot.isLatest()); + vci.setFlowDescription(flow.getDescription()); + vci.setFlowId(flow.getIdentifier()); + vci.setFlowName(flow.getName()); + vci.setGroupId(groupId); + vci.setModified(false); + vci.setRegistryId(requestVci.getRegistryId()); + vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId())); + vci.setVersion(metadata.getVersion()); + + serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false, updateDescendantVersionedFlows); + } + } finally { + if (!asyncRequest.isCancelled()) { + if (logger.isDebugEnabled()) { + logger.debug("Re-Enabling {} Controller Services: {}", enabledServices.size(), enabledServices); + } + + asyncRequest.update(new Date(), "Re-Enabling Controller Services", 60); + + // Step 13. Re-enable all disabled controller services + final CancellableTimedPause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + asyncRequest.setCancelCallback(enableServicesPause::cancel); + final Set servicesToEnable = getUpdatedEntities(enabledServices, user); + logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size()); + componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause); } - final int disableServicesStatus = clusterResponse.getStatus(); - if (disableServicesStatus != Status.OK.getStatusCode()) { - final String explanation = getResponseEntity(clusterResponse, String.class); - throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation); + if (!asyncRequest.isCancelled()) { + if (logger.isDebugEnabled()) { + logger.debug("Restart {} Processors: {}", runningComponents.size(), runningComponents); + } + + asyncRequest.update(new Date(), "Restarting Processors", 80); + + // Step 14. Restart all components + final Set componentsToStart = getUpdatedEntities(runningComponents, user); + final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); + asyncRequest.setCancelCallback(startComponentsPause::cancel); + logger.info("Restarting {} Processors", componentsToStart.size()); + componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause); } - - } else { - // Step 10: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot, - // that it has no data in it. Ensure that no Input Port was removed, unless it currently has no incoming connections. - // Ensure that no Output Port was removed, unless it currently has no outgoing connections. - serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, verifyNotModified); - - // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed - final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); - final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId); - final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation(); - - final Bucket bucket = flowSnapshot.getBucket(); - final VersionedFlow flow = flowSnapshot.getFlow(); - - final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata(); - final VersionControlInformationDTO vci = new VersionControlInformationDTO(); - vci.setBucketId(metadata.getBucketIdentifier()); - vci.setBucketName(bucket.getName()); - vci.setCurrent(flowSnapshot.isLatest()); - vci.setFlowDescription(flow.getDescription()); - vci.setFlowId(flow.getIdentifier()); - vci.setFlowName(flow.getName()); - vci.setGroupId(groupId); - vci.setModified(false); - vci.setRegistryId(requestVci.getRegistryId()); - vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId())); - vci.setVersion(metadata.getVersion()); - - serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false); } - if (asyncRequest.isCancelled()) { - return null; - } - asyncRequest.update(new Date(), "Re-Enabling Controller Services", 60); - - // Step 13. Re-enable all disabled controller services - final Pause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); - final Set servicesToEnable = getUpdatedEntities(enabledServices, user); - logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size()); - componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause); - - if (asyncRequest.isCancelled()) { - return null; - } - asyncRequest.update(new Date(), "Restarting Processors", 80); - - // Step 14. Restart all components - final Set componentsToStart = getUpdatedEntities(runningComponents, user); - final Pause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); - logger.info("Restarting {} Processors", componentsToStart.size()); - componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause); - + asyncRequest.setCancelCallback(null); if (asyncRequest.isCancelled()) { return null; } @@ -1426,6 +1408,7 @@ public class VersionsResource extends ApplicationResource { return serviceFacade.getVersionControlInformation(groupId); } + /** * Extracts the response entity from the specified node response. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java index 1309eeed4a..3cecdebb52 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java @@ -99,4 +99,12 @@ public interface AsynchronousWebRequest { * @return true if the request has been canceled, false otherwise */ boolean isCancelled(); + + /** + * Sets the cancel callback to the given runnable, so that if {@link #cancel()} is called, the given {@link Runnable} will be triggered. + * If null is passed, no operation will be triggered when the task is cancelled. + * + * @param runnable the callback + */ + void setCancelCallback(Runnable runnable); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java index 4810a3297b..8e2e221c09 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java @@ -34,6 +34,7 @@ public class StandardAsynchronousWebRequest implements AsynchronousWebRequest private volatile String failureReason; private volatile boolean cancelled; private volatile T results; + private volatile Runnable cancelCallback; public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user, final String state) { this.id = requestId; @@ -56,6 +57,11 @@ public class StandardAsynchronousWebRequest implements AsynchronousWebRequest return processGroupId; } + @Override + public void setCancelCallback(final Runnable runnable) { + this.cancelCallback = runnable; + } + @Override public void markComplete(final T results) { this.complete = true; @@ -130,6 +136,7 @@ public class StandardAsynchronousWebRequest implements AsynchronousWebRequest percentComplete = 100; state = "Canceled by user"; setFailureReason("Request cancelled by user"); + cancelCallback.run(); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 6077268921..7d40473a0a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -118,6 +118,7 @@ import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedComponent; import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedFunnel; @@ -2202,15 +2203,23 @@ public final class DtoFactory { private ComponentDifferenceDTO createComponentDifference(final FlowDifference difference) { VersionedComponent component = difference.getComponentA(); - if (component == null) { + if (component == null || difference.getComponentB() instanceof InstantiatedVersionedComponent) { component = difference.getComponentB(); } final ComponentDifferenceDTO dto = new ComponentDifferenceDTO(); - dto.setComponentId(component.getIdentifier()); dto.setComponentName(component.getName()); dto.setComponentType(component.getComponentType().name()); - dto.setProcessGroupId(dto.getProcessGroupId()); + + if (component instanceof InstantiatedVersionedComponent) { + final InstantiatedVersionedComponent instantiatedComponent = (InstantiatedVersionedComponent) component; + dto.setComponentId(instantiatedComponent.getInstanceId()); + dto.setProcessGroupId(instantiatedComponent.getInstanceGroupId()); + } else { + dto.setComponentId(component.getIdentifier()); + dto.setProcessGroupId(dto.getProcessGroupId()); + } + return dto; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java index 7cf61eab1b..9259bf4c4f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java @@ -114,10 +114,12 @@ public interface ProcessGroupDAO { * @param versionControlInformation the new Version Control Information * @param componentIdSeed the seed value to use for generating ID's for new components * @param updateSettings whether or not to update the process group's name and position + * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to + * update the contents of that Process Group * @return the process group */ ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed, - boolean verifyNotModified, boolean updateSettings); + boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows); /** * Applies the given Version Control Information to the Process Group diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index e3c4725c8e..bb7edb1af0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -29,6 +29,8 @@ import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.ProcessGroupDTO; @@ -244,8 +246,12 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId); final String registryName = flowRegistry == null ? registryId : flowRegistry.getName(); + final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); + final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController.getFlowRegistryClient(), false); + final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation) .registryName(registryName) + .flowSnapshot(flowSnapshot) .modified(false) .current(true) .build(); @@ -264,9 +270,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou @Override public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation, - final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) { + final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) { final ProcessGroup group = locateProcessGroup(flowController, groupId); - group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings); + group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows); final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation) .flowSnapshot(proposedSnapshot.getFlowContents()) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java index 1f83a6f024..dea43f6cb0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java @@ -43,7 +43,7 @@ public class CancellableTimedPause implements Pause { long sysTime = System.nanoTime(); final long maxWaitTime = System.nanoTime() + pauseNanos; - while (sysTime < maxWaitTime) { + while (sysTime < maxWaitTime && !cancelled) { try { TimeUnit.NANOSECONDS.sleep(pauseNanos); } catch (final InterruptedException ie) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java index 4469ea1e94..11bd1b800c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java @@ -421,18 +421,24 @@ public final class SnippetUtils { } // get a list of all names of process groups so that we can rename as needed. - final List groupNames = new ArrayList<>(); + final Set groupNames = new HashSet<>(); for (final ProcessGroup childGroup : group.getProcessGroups()) { groupNames.add(childGroup.getName()); } if (snippetContents.getProcessGroups() != null) { for (final ProcessGroupDTO groupDTO : snippetContents.getProcessGroups()) { - String groupName = groupDTO.getName(); - while (groupNames.contains(groupName)) { - groupName = "Copy of " + groupName; + // If Version Control Information is present, then we don't want to rename the + // Process Group - we want it to remain the same as the one in Version Control. + // However, in order to disambiguate things, we generally do want to rename to + // 'Copy of...' so we do this only if there is no Version Control Information present. + if (groupDTO.getVersionControlInformation() == null) { + String groupName = groupDTO.getName(); + while (groupNames.contains(groupName)) { + groupName = "Copy of " + groupName; + } + groupDTO.setName(groupName); } - groupDTO.setName(groupName); groupNames.add(groupDTO.getName()); } }