NIFI-4436: Bug fixes - Checkpoint before allowing multiple Process Groups with same Versioned Component ID and same parent - Ensure that if flow update is cancelled while processors are being stopped/services disabled that we stop waiting for that to occur. Also ensure that if we fail to update flow that we re-enable/restart the processors and services - Updated verbiage to use a ConciseEvolvingDifferentDescriptor when getting local modifications for a versioned flow - Do not allow outer process group to be saved to flow registry or have local modifications reverted if it has a descendant process group that is under version control and is dirty. Fixed bug where ComponentDifferenceDTO was populated with wrong component id and group id

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Mark Payne 2017-11-17 11:02:33 -05:00 committed by Bryan Bende
parent 3d8b1e4890
commit adacb204a8
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
18 changed files with 496 additions and 264 deletions

View File

@ -28,6 +28,7 @@ public class VersionedFlowSnapshotEntity extends Entity {
private VersionedFlowSnapshot versionedFlowSnapshot;
private RevisionDTO processGroupRevision;
private String registryId;
private Boolean updateDescendantVersionedFlows;
@ApiModelProperty("The versioned flow snapshot")
public VersionedFlowSnapshot getVersionedFlowSnapshot() {
@ -55,4 +56,14 @@ public class VersionedFlowSnapshotEntity extends Entity {
public void setRegistryId(String registryId) {
this.registryId = registryId;
}
@ApiModelProperty("If the Process Group to be updated has a child or descendant Process Group that is also under "
+ "Version Control, this specifies whether or not the contents of that child/descendant Process Group should be updated.")
public Boolean getUpdateDescendantVersionedFlows() {
return updateDescendantVersionedFlows;
}
public void setUpdateDescendantVersionedFlows(Boolean update) {
this.updateDescendantVersionedFlows = update;
}
}

View File

@ -784,8 +784,10 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will
* throw an IllegalStateException
* @param updateSettings whether or not to update the process group's name and positions
* @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
* update the contents of that Process Group
*/
void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings);
void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVersionedFlows);
/**
* Verifies a template with the specified name can be created.
@ -848,7 +850,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
void verifyCanUpdateVariables(Map<String, String> updatedVariables);
/**
* Ensure that the contents of the Process Group can be update to match the given new flow
* Ensures that the contents of the Process Group can be update to match the given new flow
*
* @param updatedFlow the updated version of the flow
* @param verifyConnectionRemoval whether or not to verify that connections that are not present in the updated flow can be removed
@ -859,6 +861,27 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
*/
void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty);
/**
* Ensures that the Process Group can have any local changes reverted
*
* @throws IllegalStateException if the Process Group is not in a state that will allow local changes to be reverted
*/
void verifyCanRevertLocalModifications();
/**
* Ensures that the Process Group can have its local modifications shown
*
* @throws IllegalStateException if the Process Group is not in a state that will allow local modifications to be shown
*/
void verifyCanShowLocalModifications();
/**
* Ensure that the contents of the Process Group can be saved to a Flow Registry in its current state
*
* @throws IllegalStateException if the Process Group cannot currently be saved to a Flow Registry
*/
void verifyCanSaveToFlowRegistry(String registryId, String bucketId, String flowId);
/**
* Adds the given template to this Process Group
*

View File

@ -159,6 +159,9 @@ public interface FlowRegistry {
* @param bucketId the ID of the bucket
* @param flowId the ID of the flow
* @param version the version to retrieve
* @param fetchRemoteFlows if the remote flow has a child Process Group that also tracks to a remote flow, this specifies whether or not
* the child's contents should be fetched.
* @param user the user on whose behalf the flow contents are being retrieved
* @return the contents of the Flow from the Flow Registry
*
* @throws IOException if unable to communicate with the Flow Registry
@ -167,7 +170,7 @@ public interface FlowRegistry {
* @throws NullPointerException if any of the arguments is not specified
* @throws IllegalArgumentException if the given version is less than 1
*/
VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, NiFiUser user) throws IOException, NiFiRegistryException;
VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, boolean fetchRemoteFlows, NiFiUser user) throws IOException, NiFiRegistryException;
/**
* Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry
@ -175,6 +178,8 @@ public interface FlowRegistry {
* @param bucketId the ID of the bucket
* @param flowId the ID of the flow
* @param version the version to retrieve
* @param fetchRemoteFlows if the remote flow has a child Process Group that also tracks to a remote flow, this specifies whether or not
* the child's contents should be fetched.
* @return the contents of the Flow from the Flow Registry
*
* @throws IOException if unable to communicate with the Flow Registry
@ -183,7 +188,7 @@ public interface FlowRegistry {
* @throws NullPointerException if any of the arguments is not specified
* @throws IllegalArgumentException if the given version is less than 1
*/
VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, NiFiRegistryException;
VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version, boolean fetchRemoteFlows) throws IOException, NiFiRegistryException;
/**
* Retrieves a VersionedFlow by bucket id and flow id

View File

@ -165,8 +165,11 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.remote.HttpRemoteSiteListener;
@ -1775,6 +1778,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* processor
*/
public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
instantiateSnippet(group, dto, true);
}
private void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto, final boolean topLevel) throws ProcessorInstantiationException {
writeLock.lock();
try {
validateSnippetContents(requireNonNull(group), dto);
@ -1789,6 +1796,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
serviceNode.setComments(controllerServiceDTO.getComments());
serviceNode.setName(controllerServiceDTO.getName());
if (!topLevel) {
serviceNode.setVersionedComponentId(controllerServiceDTO.getVersionedComponentId());
}
group.addControllerService(serviceNode);
}
@ -1812,6 +1822,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
label.setStyle(labelDTO.getStyle());
if (!topLevel) {
label.setVersionedComponentId(labelDTO.getVersionedComponentId());
}
group.addLabel(label);
}
@ -1819,6 +1833,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
for (final FunnelDTO funnelDTO : dto.getFunnels()) {
final Funnel funnel = createFunnel(funnelDTO.getId());
funnel.setPosition(toPosition(funnelDTO.getPosition()));
if (!topLevel) {
funnel.setVersionedComponentId(funnelDTO.getVersionedComponentId());
}
group.addFunnel(funnel);
}
@ -1840,6 +1858,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
inputPort = createLocalInputPort(portDTO.getId(), portDTO.getName());
}
if (!topLevel) {
inputPort.setVersionedComponentId(portDTO.getVersionedComponentId());
}
inputPort.setPosition(toPosition(portDTO.getPosition()));
inputPort.setProcessGroup(group);
inputPort.setComments(portDTO.getComments());
@ -1861,6 +1882,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
outputPort = createLocalOutputPort(portDTO.getId(), portDTO.getName());
}
if (!topLevel) {
outputPort.setVersionedComponentId(portDTO.getVersionedComponentId());
}
outputPort.setPosition(toPosition(portDTO.getPosition()));
outputPort.setProcessGroup(group);
outputPort.setComments(portDTO.getComments());
@ -1876,6 +1900,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
procNode.setPosition(toPosition(processorDTO.getPosition()));
procNode.setProcessGroup(group);
if (!topLevel) {
procNode.setVersionedComponentId(processorDTO.getVersionedComponentId());
}
final ProcessorConfigDTO config = processorDTO.getConfig();
procNode.setComments(config.getComments());
@ -1936,6 +1963,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition()));
remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout());
remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration());
if (!topLevel) {
remoteGroup.setVersionedComponentId(remoteGroupDTO.getVersionedComponentId());
}
if (remoteGroupDTO.getTransportProtocol() == null) {
remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.RAW);
} else {
@ -1979,6 +2010,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
childGroup.setVariables(groupDTO.getVariables());
}
// If this Process Group is 'top level' then we do not set versioned component ID's.
// We do this only if this component is the child of a Versioned Component.
if (!topLevel) {
childGroup.setVersionedComponentId(groupDTO.getVersionedComponentId());
}
group.addProcessGroup(childGroup);
final FlowSnippetDTO contents = groupDTO.getContents();
@ -1995,7 +2032,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
childTemplateDTO.setFunnels(contents.getFunnels());
childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups());
childTemplateDTO.setControllerServices(contents.getControllerServices());
instantiateSnippet(childGroup, childTemplateDTO);
instantiateSnippet(childGroup, childTemplateDTO, false);
if (groupDTO.getVersionControlInformation() != null) {
final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup versionedGroup = flowMapper.mapProcessGroup(childGroup, getFlowRegistryClient(), false);
final VersionControlInformation vci = StandardVersionControlInformation.Builder
.fromDto(groupDTO.getVersionControlInformation())
.flowSnapshot(versionedGroup)
.build();
childGroup.setVersionControlInformation(vci, Collections.emptyMap());
}
}
//
@ -2039,6 +2087,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
final Connection connection = createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships);
if (!topLevel) {
connection.setVersionedComponentId(connectionDTO.getVersionedComponentId());
}
if (connectionDTO.getBends() != null) {
final List<Position> bendPoints = new ArrayList<>();
@ -2088,6 +2139,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
for (final RemoteProcessGroupPortDTO port : ports) {
final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
descriptor.setId(port.getId());
descriptor.setVersionedComponentId(port.getVersionedComponentId());
descriptor.setTargetId(port.getTargetId());
descriptor.setName(port.getName());
descriptor.setComments(port.getComments());

View File

@ -2821,7 +2821,7 @@ public final class StandardProcessGroup implements ProcessGroup {
versionControlInformation.getBucketIdentifier(),
versionControlInformation.getFlowIdentifier(),
versionControlInformation.getVersion(),
versionControlInformation.getFlowSnapshot(),
stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true),
versionControlInformation.isModified(),
versionControlInformation.isCurrent()) {
@ -2849,6 +2849,51 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
private VersionedProcessGroup stripContentsFromRemoteDescendantGroups(final VersionedProcessGroup processGroup, final boolean topLevel) {
if (processGroup == null) {
return null;
}
final VersionedProcessGroup copy = new VersionedProcessGroup();
copy.setComments(processGroup.getComments());
copy.setComponentType(processGroup.getComponentType());
copy.setGroupIdentifier(processGroup.getGroupIdentifier());
copy.setIdentifier(processGroup.getIdentifier());
copy.setName(processGroup.getName());
copy.setPosition(processGroup.getPosition());
copy.setVersionedFlowCoordinates(topLevel ? null : processGroup.getVersionedFlowCoordinates());
copy.setConnections(processGroup.getConnections());
copy.setControllerServices(processGroup.getControllerServices());
copy.setFunnels(processGroup.getFunnels());
copy.setInputPorts(processGroup.getInputPorts());
copy.setOutputPorts(processGroup.getOutputPorts());
copy.setProcessors(processGroup.getProcessors());
copy.setRemoteProcessGroups(processGroup.getRemoteProcessGroups());
copy.setVariables(processGroup.getVariables());
final Set<VersionedProcessGroup> copyChildren = new HashSet<>();
for (final VersionedProcessGroup childGroup : processGroup.getProcessGroups()) {
if (childGroup.getVersionedFlowCoordinates() == null) {
copyChildren.add(stripContentsFromRemoteDescendantGroups(childGroup, false));
} else {
final VersionedProcessGroup childCopy = new VersionedProcessGroup();
childCopy.setComments(childGroup.getComments());
childCopy.setComponentType(childGroup.getComponentType());
childCopy.setGroupIdentifier(childGroup.getGroupIdentifier());
childCopy.setIdentifier(childGroup.getIdentifier());
childCopy.setName(childGroup.getName());
childCopy.setPosition(childGroup.getPosition());
childCopy.setVersionedFlowCoordinates(childGroup.getVersionedFlowCoordinates());
copyChildren.add(childCopy);
}
}
copy.setProcessGroups(copyChildren);
return copy;
}
@Override
public void disconnectVersionControl() {
writeLock.lock();
@ -2900,7 +2945,7 @@ public final class StandardProcessGroup implements ProcessGroup {
});
processGroup.getProcessGroups().stream()
.filter(childGroup -> childGroup.getVersionControlInformation() != null)
.filter(childGroup -> childGroup.getVersionControlInformation() == null)
.forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup));
}
@ -2925,7 +2970,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry.
// This allows us to know whether or not the flow has been modified since it was last synced with the Flow Registry.
try {
final VersionedFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion());
final VersionedFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion(), false);
final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents();
vci.setFlowSnapshot(registryFlow);
} catch (final IOException | NiFiRegistryException e) {
@ -2958,7 +3003,8 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings) {
public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings,
final boolean updateDescendantVersionedFlows) {
writeLock.lock();
try {
verifyCanUpdate(proposedSnapshot, true, verifyNotDirty);
@ -2986,7 +3032,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final Set<String> knownVariables = getKnownVariableNames();
updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, knownVariables);
updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, updateDescendantVersionedFlows, knownVariables);
} catch (final ProcessorInstantiationException pie) {
throw new RuntimeException(pie);
} finally {
@ -3013,7 +3059,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed,
final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final Set<String> variablesToSkip) throws ProcessorInstantiationException {
final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final boolean updateDescendantVersionedGroups,
final Set<String> variablesToSkip) throws ProcessorInstantiationException {
group.setComments(proposed.getComments());
@ -3033,14 +3080,8 @@ public final class StandardProcessGroup implements ProcessGroup {
.map(VariableDescriptor::getName)
.collect(Collectors.toSet());
final Set<String> variablesRemoved = new HashSet<>(existingVariableNames);
if (proposed.getVariables() != null) {
variablesRemoved.removeAll(proposed.getVariables().keySet());
}
final Map<String, String> updatedVariableMap = new HashMap<>();
variablesRemoved.forEach(var -> updatedVariableMap.put(var, null));
// If any new variables exist in the proposed flow, add those to the variable registry.
for (final Map.Entry<String, String> entry : proposed.getVariables().entrySet()) {
@ -3069,6 +3110,7 @@ public final class StandardProcessGroup implements ProcessGroup {
.flowId(flowId)
.flowName(flowId) // flow id not yet known
.version(version)
.flowSnapshot(proposed)
.modified(false)
.current(true)
.build();
@ -3084,11 +3126,13 @@ public final class StandardProcessGroup implements ProcessGroup {
for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates();
if (childGroup == null) {
final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip);
LOG.info("Added {} to {}", added, this);
} else {
updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, variablesToSkip);
} else if (childCoordinates == null || updateDescendantVersionedGroups) {
updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, updateDescendantVersionedGroups, variablesToSkip);
LOG.info("Updated {}", childGroup);
}
@ -3367,7 +3411,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed));
group.setVersionedComponentId(proposed.getIdentifier());
group.setParent(destination);
updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, variablesToSkip);
updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, true, variablesToSkip);
destination.addProcessGroup(group);
return group;
}
@ -3739,7 +3783,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), true);
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false);
final ComparableDataFlow currentFlow = new ComparableDataFlow() {
@Override
@ -3765,7 +3809,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
};
final FlowComparator flowComparator = new StandardFlowComparator(currentFlow, snapshotFlow, new EvolvingDifferenceDescriptor());
final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare();
final Set<FlowDifference> differences = comparison.getDifferences();
final Set<FlowDifference> functionalDifferences = differences.stream()
@ -4002,4 +4046,69 @@ public final class StandardProcessGroup implements ProcessGroup {
findAllProcessGroups(child, map);
}
}
@Override
public void verifyCanSaveToFlowRegistry(final String registryId, final String bucketId, final String flowId) {
verifyNoDescendantsWithLocalModifications("be saved to a Flow Registry");
final StandardVersionControlInformation vci = versionControlInfo.get();
if (vci != null) {
if (flowId != null && flowId.equals(vci.getFlowIdentifier())) {
// Flow ID is the same. We want to publish the Process Group as the next version of the Flow.
// In order to do this, we have to ensure that the Process Group is 'current'.
final boolean current = vci.isCurrent();
if (!current) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+ " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. "
+ "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry.");
}
// Flow ID matches. We want to publish the Process Group as the next version of the Flow, so we must
// ensure that all other parameters match as well.
if (!bucketId.equals(vci.getBucketIdentifier())) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+ " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
}
if (!registryId.equals(vci.getRegistryIdentifier())) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+ " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
}
} else if (flowId != null) {
// Flow ID is specified but different. This is not allowed, because Flow ID's are automatically generated,
// and if the client is specifying an ID then it is either trying to assign the ID of the Flow or it is
// attempting to save a new version of a different flow. Saving a new version of a different Flow is
// not allowed because the Process Group must be in synch with the latest version of the flow before that
// can be done.
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+ " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
}
}
}
@Override
public void verifyCanRevertLocalModifications() {
final StandardVersionControlInformation svci = versionControlInfo.get();
if (svci == null) {
throw new IllegalStateException("Cannot revert local modifications to Process Group because the Process Group is not under Version Control.");
}
verifyNoDescendantsWithLocalModifications("have its local modifications reverted");
}
@Override
public void verifyCanShowLocalModifications() {
}
private void verifyNoDescendantsWithLocalModifications(final String action) {
for (final ProcessGroup descendant : findAllProcessGroups()) {
final VersionControlInformation descendantVci = descendant.getVersionControlInformation();
if (descendantVci != null && descendantVci.isModified()) {
throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and "
+ "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before "
+ "this action can be performed on the parent Process Group.");
}
}
}
}

View File

@ -178,21 +178,24 @@ public class RestBasedFlowRegistry implements FlowRegistry {
}
@Override
public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final NiFiUser user) throws IOException, NiFiRegistryException {
public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows, final NiFiUser user)
throws IOException, NiFiRegistryException {
final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user));
final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version);
if (fetchRemoteFlows) {
final VersionedProcessGroup contents = flowSnapshot.getFlowContents();
for (final VersionedProcessGroup child : contents.getProcessGroups()) {
populateVersionedContentsRecursively(child, user);
}
}
return flowSnapshot;
}
@Override
public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version) throws IOException, NiFiRegistryException {
return getFlowContents(bucketId, flowId, version, null);
public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows) throws IOException, NiFiRegistryException {
return getFlowContents(bucketId, flowId, version, fetchRemoteFlows, null);
}
private void populateVersionedContentsRecursively(final VersionedProcessGroup group, final NiFiUser user) throws NiFiRegistryException, IOException {
@ -214,7 +217,7 @@ public class RestBasedFlowRegistry implements FlowRegistry {
}
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version, user);
final VersionedFlowSnapshot snapshot = flowRegistry.getFlowContents(bucketId, flowId, version, true, user);
final VersionedProcessGroup contents = snapshot.getFlowContents();
group.setComments(contents.getComments());

View File

@ -649,12 +649,16 @@ public class MockProcessGroup implements ProcessGroup {
public void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty) {
}
@Override
public void verifyCanSaveToFlowRegistry(String registryId, String bucketId, String flowId) {
}
@Override
public void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry) {
}
@Override
public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings) {
public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVerisonedFlows) {
}
@Override
@ -666,4 +670,12 @@ public class MockProcessGroup implements ProcessGroup {
public void disconnectVersionControl() {
this.versionControlInfo = null;
}
@Override
public void verifyCanRevertLocalModifications() {
}
@Override
public void verifyCanShowLocalModifications() {
}
}

View File

@ -1368,11 +1368,13 @@ public interface NiFiServiceFacade {
* Retrieves the Versioned Flow Snapshot for the coordinates provided by the given Version Control Information DTO
*
* @param versionControlInfo the coordinates of the versioned flow
* @param fetchRemoteFlows if the contents of Versioned Flow that is fetched contains a child/descendant Process Group
* that is also under Version Control, this indicates whether that remote flow should also be fetched
* @return the VersionedFlowSnapshot that corresponds to the given coordinates
*
* @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found
*/
VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo) throws IOException;
VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows) throws IOException;
/**
* Returns the name of the Flow Registry that is registered with the given ID. If no Flow Registry exists with the given ID, will return
@ -1406,6 +1408,28 @@ public interface NiFiServiceFacade {
*/
void verifyCanUpdate(String groupId, VersionedFlowSnapshot proposedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty);
/**
* Verifies that the Process Group with the given identifier can be saved to the flow registry
*
* @param groupId the ID of the Process Group
* @param registryId the ID of the Flow Registry
* @param bucketId the ID of the bucket
* @param flowId the ID of the flow
*
* @throws IllegalStateException if the Process Group cannot be saved to the flow registry with the coordinates specified
*/
void verifyCanSaveToFlowRegistry(String groupId, String registryId, String bucketId, String flowId);
/**
* Verifies that the Process Group with the given identifier can have its local modifications reverted to the given VersionedFlowSnapshot
*
* @param groupId the ID of the Process Group
* @param versionedFlowSnapshot the Versioned Flow Snapshot
*
* @throws IllegalStateException if the Process Group cannot have its local modifications reverted
*/
void verifyCanRevertLocalModifications(String groupId, VersionedFlowSnapshot versionedFlowSnapshot);
/**
* Updates the Process group with the given ID to match the new snapshot
*
@ -1414,10 +1438,12 @@ public interface NiFiServiceFacade {
* @param versionControlInfo the Version Control information
* @param snapshot the new snapshot
* @param componentIdSeed the seed to use for generating new component ID's
* @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
* update the contents of that Process Group
* @return the Process Group
*/
ProcessGroupEntity updateProcessGroup(Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
boolean verifyNotModified);
boolean verifyNotModified, boolean updateDescendantVersionedFlows);
/**
* Updates the Process group with the given ID to match the new snapshot
@ -1429,10 +1455,12 @@ public interface NiFiServiceFacade {
* @param snapshot the new snapshot
* @param componentIdSeed the seed to use for generating new component ID's
* @param updateSettings whether or not the process group's name and position should be updated
* @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
* update the contents of that Process Group
* @return the Process Group
*/
ProcessGroupEntity updateProcessGroupContents(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
boolean verifyNotModified, boolean updateSettings);
boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows);
// ----------------------------------------
// Component state methods

View File

@ -97,13 +97,12 @@ import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.EvolvingDifferenceDescriptor;
import org.apache.nifi.registry.flow.diff.FlowComparator;
import org.apache.nifi.registry.flow.diff.FlowComparison;
import org.apache.nifi.registry.flow.diff.FlowDifference;
@ -3751,10 +3750,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
final VersionedFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketIdentifier(),
versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser());
versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), false, NiFiUserUtils.getNiFiUser());
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, true);
final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false);
final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
final ComparableDataFlow localFlow = new ComparableDataFlow() {
@ -3781,7 +3780,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
};
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new EvolvingDifferenceDescriptor());
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare();
final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison);
@ -3852,6 +3851,23 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty);
}
@Override
public void verifyCanSaveToFlowRegistry(final String groupId, final String registryId, final String bucketId, final String flowId) {
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
group.verifyCanSaveToFlowRegistry(registryId, bucketId, flowId);
}
@Override
public void verifyCanRevertLocalModifications(final String groupId, final VersionedFlowSnapshot versionedFlowSnapshot) {
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
group.verifyCanRevertLocalModifications();
// verify that the process group can be updated to the given snapshot. We do not verify that connections can
// be removed, because the flow may still be running, and it only matters that the connections can be removed once the components
// have been stopped.
group.verifyCanUpdate(versionedFlowSnapshot, false, false);
}
@Override
public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) throws IOException {
final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
@ -4028,7 +4044,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo) throws IOException {
public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) throws IOException {
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId());
if (flowRegistry == null) {
throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId());
@ -4036,15 +4052,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final VersionedFlowSnapshot snapshot;
try {
snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), NiFiUserUtils.getNiFiUser());
snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion(), fetchRemoteFlows, NiFiUserUtils.getNiFiUser());
} catch (final NiFiRegistryException e) {
throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
+ versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
}
// If this Flow has a reference to a remote flow, we need to pull that remote flow as well
populateVersionedChildFlows(snapshot);
return snapshot;
}
@ -4054,74 +4067,22 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return flowRegistry == null ? flowRegistryId : flowRegistry.getName();
}
private void populateVersionedChildFlows(final VersionedFlowSnapshot snapshot) throws IOException {
final VersionedProcessGroup group = snapshot.getFlowContents();
for (final VersionedProcessGroup child : group.getProcessGroups()) {
populateVersionedFlows(child);
}
}
private void populateVersionedFlows(final VersionedProcessGroup group) throws IOException {
final VersionedFlowCoordinates remoteCoordinates = group.getVersionedFlowCoordinates();
if (remoteCoordinates != null) {
final String registryUrl = remoteCoordinates.getRegistryUrl();
final String registryId = flowRegistryClient.getFlowRegistryId(registryUrl);
if (registryId == null) {
throw new IllegalArgumentException("Process Group with ID " + group.getIdentifier() + " is under Version Control, referencing a Flow Registry at URL [" + registryUrl
+ "], but no Flow Registry is currently registered for that URL.");
}
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
final VersionedFlowSnapshot childSnapshot;
try {
childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion(), NiFiUserUtils.getNiFiUser());
} catch (final NiFiRegistryException e) {
throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
+ remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion());
}
final VersionedProcessGroup fetchedGroup = childSnapshot.getFlowContents();
group.setComments(fetchedGroup.getComments());
group.setPosition(fetchedGroup.getPosition());
group.setName(fetchedGroup.getName());
group.setVariables(fetchedGroup.getVariables());
group.setConnections(new LinkedHashSet<>(fetchedGroup.getConnections()));
group.setControllerServices(new LinkedHashSet<>(fetchedGroup.getControllerServices()));
group.setFunnels(new LinkedHashSet<>(fetchedGroup.getFunnels()));
group.setInputPorts(new LinkedHashSet<>(fetchedGroup.getInputPorts()));
group.setLabels(new LinkedHashSet<>(fetchedGroup.getLabels()));
group.setOutputPorts(new LinkedHashSet<>(fetchedGroup.getOutputPorts()));
group.setProcessGroups(new LinkedHashSet<>(fetchedGroup.getProcessGroups()));
group.setProcessors(new LinkedHashSet<>(fetchedGroup.getProcessors()));
group.setRemoteProcessGroups(new LinkedHashSet<>(fetchedGroup.getRemoteProcessGroups()));
}
for (final VersionedProcessGroup child : group.getProcessGroups()) {
populateVersionedFlows(child);
}
}
@Override
public ProcessGroupEntity updateProcessGroup(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) {
final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true);
return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true, updateDescendantVersionedFlows);
}
@Override
public ProcessGroupEntity updateProcessGroupContents(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) {
final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId);
final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(user, revision,
processGroupNode,
() -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings),
() -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows),
processGroup -> dtoFactory.createProcessGroupDto(processGroup));
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);

View File

@ -1644,7 +1644,7 @@ public class ProcessGroupResource extends ApplicationResource {
if (versionControlInfo != null) {
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
// Step 2: Retrieve flow from Flow Registry
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo);
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true);
final Bucket bucket = flowSnapshot.getBucket();
final VersionedFlow flow = flowSnapshot.getFlow();
@ -1653,6 +1653,8 @@ public class ProcessGroupResource extends ApplicationResource {
versionControlInfo.setFlowDescription(flow.getDescription());
versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId()));
versionControlInfo.setModified(false);
versionControlInfo.setCurrent(flowSnapshot.isLatest());
// Step 3: Resolve Bundle info
BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents());
@ -1709,8 +1711,13 @@ public class ProcessGroupResource extends ApplicationResource {
final RevisionDTO revisionDto = entity.getRevision();
final String newGroupId = entity.getComponent().getId();
final Revision newGroupRevision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), newGroupId);
// We don't want the Process Group's position to be updated because we want to keep the position where the user
// placed the Process Group. However, we do want to use the name of the Process Group that is in the Flow Contents.
// To accomplish this, we call updateProcessGroupContents() passing 'true' for the updateSettings flag but null out the position.
flowSnapshot.getFlowContents().setPosition(null);
entity = serviceFacade.updateProcessGroupContents(NiFiUserUtils.getNiFiUser(), newGroupRevision, newGroupId,
versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, false);
versionControlInfo, flowSnapshot, getIdGenerationSeed().orElse(null), false, true, true);
}
populateRemainingProcessGroupEntityContent(entity);

View File

@ -17,21 +17,6 @@
package org.apache.nifi.web.api;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@ -92,10 +77,24 @@ import org.apache.nifi.web.util.AffectedComponentUtils;
import org.apache.nifi.web.util.CancellableTimedPause;
import org.apache.nifi.web.util.ComponentLifecycle;
import org.apache.nifi.web.util.LifecycleManagementException;
import org.apache.nifi.web.util.Pause;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@ -454,51 +453,15 @@ public class VersionsResource extends ApplicationResource {
super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
},
() -> {
final VersionControlInformationEntity entity = serviceFacade.getVersionControlInformation(groupId);
if (entity != null) {
final String flowId = requestEntity.getVersionedFlow().getFlowId();
if (flowId != null && flowId.equals(entity.getVersionControlInformation().getFlowId())) {
// Flow ID is the same. We want to publish the Process Group as the next version of the Flow.
// In order to do this, we have to ensure that the Process Group is 'current'.
final Boolean current = entity.getVersionControlInformation().getCurrent();
if (current == null) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
+ " because it is not yet known whether or not this Process Group is the most recent version of the flow. "
+ "Please try the request again after the Process Group has been synchronized with the Flow Registry.");
}
if (current == Boolean.FALSE) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
+ " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. "
+ "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry.");
}
// Flow ID matches. We want to publish the Process Group as the next version of the Flow, so we must
// ensure that all other parameters match as well.
if (!requestEntity.getVersionedFlow().getBucketId().equals(entity.getVersionControlInformation().getBucketId())) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
+ " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
}
if (!requestEntity.getVersionedFlow().getRegistryId().equals(entity.getVersionControlInformation().getRegistryId())) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
+ " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
}
} else if (flowId != null) {
// Flow ID is specified but different. This is not allowed, because Flow ID's are automatically generated,
// and if the client is specifying an ID then it is either trying to assign the ID of the Flow or it is
// attempting to save a new version of a different flow. Saving a new version of a different Flow is
// not allowed because the Process Group must be in synch with the latest version of the flow before that
// can be done.
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + groupId
+ " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
}
}
final VersionedFlowDTO versionedFlow = requestEntity.getVersionedFlow();
final String registryId = versionedFlow.getRegistryId();
final String bucketId = versionedFlow.getBucketId();
final String flowId = versionedFlow.getFlowId();
serviceFacade.verifyCanSaveToFlowRegistry(groupId, registryId, bucketId, flowId);
},
(rev, flowEntity) -> {
// Register the current flow with the Flow Registry.
final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, requestEntity);
final VersionControlComponentMappingEntity mappingEntity = serviceFacade.registerFlowWithFlowRegistry(groupId, flowEntity);
// Update the Process Group's Version Control Information
final VersionControlInformationEntity responseEntity = serviceFacade.setVersionControlInformation(rev, groupId,
@ -756,7 +719,8 @@ public class VersionsResource extends ApplicationResource {
versionControlInfoDto.setRegistryId(requestEntity.getRegistryId());
versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(requestEntity.getRegistryId()));
final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false);
final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false,
entity.getUpdateDescendantVersionedFlows());
final VersionControlInformationDTO updatedVci = updatedGroup.getComponent().getVersionControlInformation();
final VersionControlInformationEntity responseEntity = new VersionControlInformationEntity();
@ -1039,7 +1003,7 @@ public class VersionsResource extends ApplicationResource {
// 14. Re-Start all Processors, Funnels, Ports that are affected and not removed.
// Step 0: Get the Versioned Flow Snapshot from the Flow Registry
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation());
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true);
// The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
@ -1085,7 +1049,7 @@ public class VersionsResource extends ApplicationResource {
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
try {
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, true);
affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, true, true);
vcur.markComplete(updatedVersionControlEntity);
} catch (final LifecycleManagementException e) {
@ -1188,7 +1152,7 @@ public class VersionsResource extends ApplicationResource {
final String idGenerationSeed = getIdGenerationSeed().orElse(null);
// Step 0: Get the Versioned Flow Snapshot from the Flow Registry
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation());
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), false);
// The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
@ -1221,8 +1185,7 @@ public class VersionsResource extends ApplicationResource {
() -> {
// Step 3: Verify that all components in the snapshot exist on all nodes
// Step 4: Verify that Process Group is already under version control. If not, must start Version Control instead of updating flow
// Step 5: Verify that Process Group is not 'dirty'
serviceFacade.verifyCanUpdate(groupId, flowSnapshot, false, false);
serviceFacade.verifyCanRevertLocalModifications(groupId, flowSnapshot);
},
(revision, processGroupEntity) -> {
// Ensure that the information passed in is correct
@ -1254,7 +1217,7 @@ public class VersionsResource extends ApplicationResource {
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
try {
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false);
affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false, false);
vcur.markComplete(updatedVersionControlEntity);
} catch (final LifecycleManagementException e) {
@ -1288,7 +1251,7 @@ public class VersionsResource extends ApplicationResource {
private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri,
final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user, final boolean replicateRequest, final VersionControlInformationEntity requestEntity,
final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest, final String idGenerationSeed,
final boolean verifyNotModified) throws LifecycleManagementException {
final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException {
// Steps 6-7: Determine which components must be stopped and stop them.
final Set<String> stoppableReferenceTypes = new HashSet<>();
@ -1302,7 +1265,8 @@ public class VersionsResource extends ApplicationResource {
.collect(Collectors.toSet());
logger.info("Stopping {} Processors", runningComponents.size());
final Pause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(stopComponentsPause::cancel);
componentLifecycle.scheduleComponents(exampleUri, user, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause);
if (asyncRequest.isCancelled()) {
@ -1317,7 +1281,8 @@ public class VersionsResource extends ApplicationResource {
.collect(Collectors.toSet());
logger.info("Disabling {} Controller Services", enabledServices.size());
final Pause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(disableServicesPause::cancel);
componentLifecycle.activateControllerServices(exampleUri, user, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause);
if (asyncRequest.isCancelled()) {
@ -1328,6 +1293,7 @@ public class VersionsResource extends ApplicationResource {
logger.info("Updating Process Group with ID {} to version {} of the Versioned Flow", groupId, flowSnapshot.getSnapshotMetadata().getVersion());
// If replicating request, steps 10-12 are performed on each node individually, and this is accomplished
// by replicating a PUT to /nifi-api/versions/process-groups/{groupId}
try {
if (replicateRequest) {
final URI updateUri;
@ -1345,9 +1311,12 @@ public class VersionsResource extends ApplicationResource {
snapshotEntity.setProcessGroupRevision(requestEntity.getProcessGroupRevision());
snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId());
snapshotEntity.setVersionedFlow(flowSnapshot);
snapshotEntity.setUpdateDescendantVersionedFlows(updateDescendantVersionedFlows);
final NodeResponse clusterResponse;
try {
logger.debug("Replicating PUT request to {} for user {}", updateUri, user);
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse();
} else {
@ -1355,13 +1324,16 @@ public class VersionsResource extends ApplicationResource {
getClusterCoordinatorNode(), user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse();
}
} catch (final InterruptedException ie) {
logger.warn("Interrupted while replicating PUT request to {} for user {}", updateUri, user);
Thread.currentThread().interrupt();
throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
}
final int disableServicesStatus = clusterResponse.getStatus();
if (disableServicesStatus != Status.OK.getStatusCode()) {
final int updateFlowStatus = clusterResponse.getStatus();
if (updateFlowStatus != Status.OK.getStatusCode()) {
final String explanation = getResponseEntity(clusterResponse, String.class);
logger.error("Failed to update flow across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}",
updateUri, user, updateFlowStatus, explanation);
throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation);
}
@ -1393,31 +1365,41 @@ public class VersionsResource extends ApplicationResource {
vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId()));
vci.setVersion(metadata.getVersion());
serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false);
serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false, updateDescendantVersionedFlows);
}
} finally {
if (!asyncRequest.isCancelled()) {
if (logger.isDebugEnabled()) {
logger.debug("Re-Enabling {} Controller Services: {}", enabledServices.size(), enabledServices);
}
if (asyncRequest.isCancelled()) {
return null;
}
asyncRequest.update(new Date(), "Re-Enabling Controller Services", 60);
// Step 13. Re-enable all disabled controller services
final Pause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
final CancellableTimedPause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(enableServicesPause::cancel);
final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user);
logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause);
if (asyncRequest.isCancelled()) {
return null;
}
if (!asyncRequest.isCancelled()) {
if (logger.isDebugEnabled()) {
logger.debug("Restart {} Processors: {}", runningComponents.size(), runningComponents);
}
asyncRequest.update(new Date(), "Restarting Processors", 80);
// Step 14. Restart all components
final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents, user);
final Pause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(startComponentsPause::cancel);
logger.info("Restarting {} Processors", componentsToStart.size());
componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause);
}
}
asyncRequest.setCancelCallback(null);
if (asyncRequest.isCancelled()) {
return null;
}
@ -1426,6 +1408,7 @@ public class VersionsResource extends ApplicationResource {
return serviceFacade.getVersionControlInformation(groupId);
}
/**
* Extracts the response entity from the specified node response.
*

View File

@ -99,4 +99,12 @@ public interface AsynchronousWebRequest<T> {
* @return <code>true</code> if the request has been canceled, <code>false</code> otherwise
*/
boolean isCancelled();
/**
* Sets the cancel callback to the given runnable, so that if {@link #cancel()} is called, the given {@link Runnable} will be triggered.
* If <code>null</code> is passed, no operation will be triggered when the task is cancelled.
*
* @param runnable the callback
*/
void setCancelCallback(Runnable runnable);
}

View File

@ -34,6 +34,7 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
private volatile String failureReason;
private volatile boolean cancelled;
private volatile T results;
private volatile Runnable cancelCallback;
public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user, final String state) {
this.id = requestId;
@ -56,6 +57,11 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
return processGroupId;
}
@Override
public void setCancelCallback(final Runnable runnable) {
this.cancelCallback = runnable;
}
@Override
public void markComplete(final T results) {
this.complete = true;
@ -130,6 +136,7 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
percentComplete = 100;
state = "Canceled by user";
setFailureReason("Request cancelled by user");
cancelCallback.run();
}
@Override

View File

@ -118,6 +118,7 @@ import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.diff.FlowComparison;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedFunnel;
@ -2202,15 +2203,23 @@ public final class DtoFactory {
private ComponentDifferenceDTO createComponentDifference(final FlowDifference difference) {
VersionedComponent component = difference.getComponentA();
if (component == null) {
if (component == null || difference.getComponentB() instanceof InstantiatedVersionedComponent) {
component = difference.getComponentB();
}
final ComponentDifferenceDTO dto = new ComponentDifferenceDTO();
dto.setComponentId(component.getIdentifier());
dto.setComponentName(component.getName());
dto.setComponentType(component.getComponentType().name());
if (component instanceof InstantiatedVersionedComponent) {
final InstantiatedVersionedComponent instantiatedComponent = (InstantiatedVersionedComponent) component;
dto.setComponentId(instantiatedComponent.getInstanceId());
dto.setProcessGroupId(instantiatedComponent.getInstanceGroupId());
} else {
dto.setComponentId(component.getIdentifier());
dto.setProcessGroupId(dto.getProcessGroupId());
}
return dto;
}

View File

@ -114,10 +114,12 @@ public interface ProcessGroupDAO {
* @param versionControlInformation the new Version Control Information
* @param componentIdSeed the seed value to use for generating ID's for new components
* @param updateSettings whether or not to update the process group's name and position
* @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
* update the contents of that Process Group
* @return the process group
*/
ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed,
boolean verifyNotModified, boolean updateSettings);
boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows);
/**
* Applies the given Version Control Information to the Process Group

View File

@ -29,6 +29,8 @@ import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
@ -244,8 +246,12 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController.getFlowRegistryClient(), false);
final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.registryName(registryName)
.flowSnapshot(flowSnapshot)
.modified(false)
.current(true)
.build();
@ -264,9 +270,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
@Override
public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation,
final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings) {
final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings);
group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows);
final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.flowSnapshot(proposedSnapshot.getFlowContents())

View File

@ -43,7 +43,7 @@ public class CancellableTimedPause implements Pause {
long sysTime = System.nanoTime();
final long maxWaitTime = System.nanoTime() + pauseNanos;
while (sysTime < maxWaitTime) {
while (sysTime < maxWaitTime && !cancelled) {
try {
TimeUnit.NANOSECONDS.sleep(pauseNanos);
} catch (final InterruptedException ie) {

View File

@ -421,18 +421,24 @@ public final class SnippetUtils {
}
// get a list of all names of process groups so that we can rename as needed.
final List<String> groupNames = new ArrayList<>();
final Set<String> groupNames = new HashSet<>();
for (final ProcessGroup childGroup : group.getProcessGroups()) {
groupNames.add(childGroup.getName());
}
if (snippetContents.getProcessGroups() != null) {
for (final ProcessGroupDTO groupDTO : snippetContents.getProcessGroups()) {
// If Version Control Information is present, then we don't want to rename the
// Process Group - we want it to remain the same as the one in Version Control.
// However, in order to disambiguate things, we generally do want to rename to
// 'Copy of...' so we do this only if there is no Version Control Information present.
if (groupDTO.getVersionControlInformation() == null) {
String groupName = groupDTO.getName();
while (groupNames.contains(groupName)) {
groupName = "Copy of " + groupName;
}
groupDTO.setName(groupName);
}
groupNames.add(groupDTO.getName());
}
}