diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java index 944b10acb7..21864b02cb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java @@ -32,8 +32,6 @@ public class VersionControlInformationDTO { private String flowName; private String flowDescription; private Integer version; - private Boolean modified; - private Boolean current; private String state; private String stateExplanation; @@ -118,26 +116,6 @@ public class VersionControlInformationDTO { this.version = version; } - @ApiModelProperty(readOnly=true, - value = "Whether or not the flow has been modified since it was last synced to the Flow Registry. The value will be null if this information is not yet known.") - public Boolean getModified() { - return modified; - } - - public void setModified(Boolean modified) { - this.modified = modified; - } - - @ApiModelProperty(readOnly=true, - value = "Whether or not this is the most recent version of the flow in the Flow Registry. The value will be null if this information is not yet known.") - public Boolean getCurrent() { - return current; - } - - public void setCurrent(Boolean current) { - this.current = current; - } - @ApiModelProperty(readOnly = true, value = "The current state of the Process Group, as it relates to the Versioned Flow", allowableValues = "LOCALLY_MODIFIED_DESCENDANT, LOCALLY_MODIFIED, STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE") diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java index 457e75b3f9..d2eb74997e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java @@ -55,7 +55,7 @@ public class ProcessGroupEntityMerger implements ComponentEntityMerger { final VersionControlInformationDTO dto = entity.getVersionControlInformation(); - // We consider the flow to be current only if ALL nodes indicate that it is current - clientDto.setCurrent(Boolean.TRUE.equals(clientDto.getCurrent()) && Boolean.TRUE.equals(dto.getCurrent())); - - // We consider the flow to be modified if ANY node indicates that it is modified - clientDto.setModified(Boolean.TRUE.equals(clientDto.getModified()) || Boolean.TRUE.equals(dto.getModified())); + updateFlowState(clientDto, dto); }); } + + private static boolean isCurrent(final VersionedFlowState state) { + return state == VersionedFlowState.UP_TO_DATE || state == VersionedFlowState.LOCALLY_MODIFIED; + } + + private static boolean isModified(final VersionedFlowState state) { + return state == VersionedFlowState.LOCALLY_MODIFIED || state == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE; + } + + public static void updateFlowState(final VersionControlInformationDTO clientDto, final VersionControlInformationDTO dto) { + final VersionedFlowState clientState = VersionedFlowState.valueOf(clientDto.getState()); + if (clientState == VersionedFlowState.SYNC_FAILURE) { + return; + } + + final VersionedFlowState dtoState = VersionedFlowState.valueOf(dto.getState()); + if (dtoState == VersionedFlowState.SYNC_FAILURE) { + clientDto.setState(dto.getState()); + clientDto.setStateExplanation(dto.getStateExplanation()); + return; + } + + final boolean clientCurrent = isCurrent(clientState); + final boolean clientModified = isModified(clientState); + + final boolean dtoCurrent = isCurrent(dtoState); + final boolean dtoModified = isModified(dtoState); + + final boolean current = clientCurrent && dtoCurrent; + final boolean stale = !current; + final boolean modified = clientModified && dtoModified; + + final VersionedFlowState flowState; + if (modified && stale) { + flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE; + } else if (modified) { + flowState = VersionedFlowState.LOCALLY_MODIFIED; + } else if (stale) { + flowState = VersionedFlowState.STALE; + } else { + flowState = VersionedFlowState.UP_TO_DATE; + } + + clientDto.setState(flowState.name()); + clientDto.setStateExplanation(flowState.getDescription()); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java index 1f65a19ea6..bb4e0d0bca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java @@ -65,17 +65,6 @@ public interface VersionControlInformation { */ int getVersion(); - /** - * @return true if the flow has been modified since the last time that it was updated from the Flow Registry or saved - * to the Flow Registry; false if the flow is in sync with the Flow Registry. - */ - boolean isModified(); - - /** - * @return true if this version of the flow is the most recent version of the flow available in the Flow Registry, false otherwise. - */ - boolean isCurrent(); - /** * @return the current status of the Process Group as it relates to the associated Versioned Flow. */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java index d20a13f052..35b436d2ed 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionedFlowState.java @@ -22,31 +22,42 @@ public enum VersionedFlowState { /** * We are unable to communicate with the Flow Registry in order to determine the appropriate state */ - SYNC_FAILURE, + SYNC_FAILURE("Failed to communicate with Flow Registry"), /** * This Process Group (or a child/descendant Process Group that is not itself under Version Control) * is on the latest version of the Versioned Flow, but is different than the Versioned Flow that is * stored in the Flow Registry. */ - LOCALLY_MODIFIED, + LOCALLY_MODIFIED("Local changes have been made"), /** * This Process Group has not been modified since it was last synchronized with the Flow Registry, but * the Flow Registry has a newer version of the flow than what is contained in this Process Group. */ - STALE, + STALE("A newer version of this flow is available"), /** * This Process Group (or a child/descendant Process Group that is not itself under Version Control) * has been modified since it was last synchronized with the Flow Registry, and the Flow Registry has * a newer version of the flow than what is contained in this Process Group. */ - LOCALLY_MODIFIED_AND_STALE, + LOCALLY_MODIFIED_AND_STALE("Local changes have been made and a newer version of this flow is available"), /** * This Process Group and all child/descendant Process Groups are on the latest version of the flow in * the Flow Registry and have no local modifications. */ - UP_TO_DATE; + UP_TO_DATE("Flow version is current"); + + + private final String description; + + private VersionedFlowState(final String description) { + this.description = description; + } + + public String getDescription() { + return description; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 9cbf3234f0..9bb3d2f565 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -88,6 +88,7 @@ import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.StandardVersionControlInformation; +import org.apache.nifi.registry.flow.VersionedFlowState; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; @@ -1116,10 +1117,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final FlowRegistry flowRegistry = controller.getFlowRegistryClient().getFlowRegistry(versionControlInfoDto.getRegistryId()); final String registryName = flowRegistry == null ? versionControlInfoDto.getRegistryId() : flowRegistry.getName(); + versionControlInfoDto.setState(VersionedFlowState.SYNC_FAILURE.name()); + versionControlInfoDto.setStateExplanation("Process Group has not yet been synchronized with the Flow Registry"); final StandardVersionControlInformation versionControlInformation = StandardVersionControlInformation.Builder.fromDto(versionControlInfoDto) .registryName(registryName) - .modified(false) - .current(true) .build(); // pass empty map for the version control mapping because the VersionedComponentId has already been set on the components diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index fb3d3a6a34..7d184dfe7b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -169,8 +169,7 @@ public final class StandardProcessGroup implements ProcessGroup { private final Map templates = new HashMap<>(); private final StringEncryptor encryptor; private final MutableVariableRegistry variableRegistry; - private final AtomicReference flowStatus = new AtomicReference<>( - new StandardVersionedFlowStatus(null, "Not yet synchronized with Flow Registry", null)); + private final VersionControlFields versionControlFields = new VersionControlFields(); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -339,14 +338,22 @@ public final class StandardProcessGroup implements ProcessGroup { // update the vci counts for this child group final VersionControlInformation vci = childGroup.getVersionControlInformation(); if (vci != null) { - if (vci.isModified() && !vci.isCurrent()) { - locallyModifiedAndStale += 1; - } else if (!vci.isCurrent()) { - stale += 1; - } else if (vci.isModified()) { - locallyModified += 1; - } else { - upToDate += 1; + switch (vci.getStatus().getState()) { + case LOCALLY_MODIFIED: + locallyModified++; + break; + case LOCALLY_MODIFIED_AND_STALE: + locallyModifiedAndStale++; + break; + case STALE: + stale++; + break; + case SYNC_FAILURE: + syncFailure++; + break; + case UP_TO_DATE: + upToDate++; + break; } } @@ -2938,17 +2945,9 @@ public final class StandardProcessGroup implements ProcessGroup { } } - clearFlowDifferences(); + versionControlFields.setFlowDifferences(null); } - private void clearFlowDifferences() { - boolean updated = false; - while (!updated) { - final StandardVersionedFlowStatus status = flowStatus.get(); - final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), null); - updated = flowStatus.compareAndSet(status, updatedStatus); - } - } @Override public void setVersionControlInformation(final VersionControlInformation versionControlInformation, final Map versionedComponentIds) { @@ -2959,8 +2958,6 @@ public final class StandardProcessGroup implements ProcessGroup { versionControlInformation.getFlowIdentifier(), versionControlInformation.getVersion(), stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true), - versionControlInformation.isModified(), - versionControlInformation.isCurrent(), versionControlInformation.getStatus()) { @Override @@ -2970,60 +2967,50 @@ public final class StandardProcessGroup implements ProcessGroup { return registry == null ? registryId : registry.getName(); } - @Override - public boolean isModified() { - boolean updated = false; - while (true) { - final StandardVersionedFlowStatus status = flowStatus.get(); - Set differences = status.getCurrentDifferences(); + private boolean isModified() { + Set differences = versionControlFields.getFlowDifferences(); + if (differences == null) { + differences = getModifications(); if (differences == null) { - differences = getModifications(); - if (differences == null) { - return false; - } - - final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), differences); - updated = flowStatus.compareAndSet(status, updatedStatus); - - if (updated) { - return !differences.isEmpty(); - } - - continue; + return false; } - return !differences.isEmpty(); + versionControlFields.setFlowDifferences(differences); } + + return !differences.isEmpty(); } @Override public VersionedFlowStatus getStatus() { // If current state is a sync failure, then - final StandardVersionedFlowStatus status = flowStatus.get(); - final VersionedFlowState state = status.getState(); - if (state == VersionedFlowState.SYNC_FAILURE) { - return status; + final String syncFailureExplanation = versionControlFields.getSyncFailureExplanation(); + if (syncFailureExplanation != null) { + return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, syncFailureExplanation); } final boolean modified = isModified(); if (!modified) { final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get(); if (vci.getFlowSnapshot() == null) { - return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry", null); + return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry"); } } - final boolean stale = !isCurrent(); + final boolean stale = versionControlFields.isStale(); + final VersionedFlowState flowState; if (modified && stale) { - return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED_AND_STALE, "Local changes have been made and a newer version of this flow is available", null); + flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE; } else if (modified) { - return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED, "Local changes have been made", null); + flowState = VersionedFlowState.LOCALLY_MODIFIED; } else if (stale) { - return new StandardVersionedFlowStatus(VersionedFlowState.STALE, "A newer version of this flow is available", null); + flowState = VersionedFlowState.STALE; } else { - return new StandardVersionedFlowStatus(VersionedFlowState.UP_TO_DATE, "Flow version is current", null); + flowState = VersionedFlowState.UP_TO_DATE; } + + return new StandardVersionedFlowStatus(flowState, flowState.getDescription()); } }; @@ -3031,11 +3018,23 @@ public final class StandardProcessGroup implements ProcessGroup { svci.setFlowName(versionControlInformation.getFlowName()); svci.setFlowDescription(versionControlInformation.getFlowDescription()); + final VersionedFlowState flowState = versionControlInformation.getStatus().getState(); + versionControlFields.setStale(flowState == VersionedFlowState.STALE || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE); + versionControlFields.setLocallyModified(flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE); + versionControlFields.setSyncFailureExplanation(null); + writeLock.lock(); try { updateVersionedComponentIds(this, versionedComponentIds); this.versionControlInfo.set(svci); - clearFlowDifferences(); + versionControlFields.setFlowDifferences(null); + + final ProcessGroup parent = getParent(); + if (parent != null) { + parent.onComponentModified(); + } + + scheduler.submitFrameworkTask(() -> synchronizeWithFlowRegistry(flowController.getFlowRegistryClient())); } finally { writeLock.unlock(); } @@ -3156,14 +3155,6 @@ public final class StandardProcessGroup implements ProcessGroup { .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup)); } - private void setSyncFailedState(final String explanation) { - boolean updated = false; - while (!updated) { - final StandardVersionedFlowStatus status = flowStatus.get(); - final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, explanation, status.getCurrentDifferences()); - updated = flowStatus.compareAndSet(status, updatedStatus); - } - } @Override public void synchronizeWithFlowRegistry(final FlowRegistryClient flowRegistryClient) { @@ -3177,7 +3168,7 @@ public final class StandardProcessGroup implements ProcessGroup { if (flowRegistry == null) { final String message = String.format("Unable to synchronize Process Group with Flow Registry because Process Group was placed under Version Control using Flow Registry " + "with identifier %s but cannot find any Flow Registry with this identifier", registryId); - setSyncFailedState(message); + versionControlFields.setSyncFailureExplanation(message); LOG.error("Unable to synchronize {} with Flow Registry because Process Group was placed under Version Control using Flow Registry " + "with identifier {} but cannot find any Flow Registry with this identifier", this, registryId); @@ -3195,7 +3186,7 @@ public final class StandardProcessGroup implements ProcessGroup { } catch (final IOException | NiFiRegistryException e) { final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not retrieve version %s of flow with identifier %s in bucket %s", vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier()); - setSyncFailedState(message); + versionControlFields.setSyncFailureExplanation(message); LOG.error("Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}", this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier(), e); @@ -3213,22 +3204,17 @@ public final class StandardProcessGroup implements ProcessGroup { if (latestVersion == vci.getVersion()) { LOG.debug("{} is currently at the most recent version ({}) of the flow that is under Version Control", this, latestVersion); - vci.setCurrent(true); + versionControlFields.setStale(false); } else { - vci.setCurrent(false); LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}", new Object[] {this, vci.getVersion(), latestVersion}); + versionControlFields.setStale(true); } - boolean updated = false; - while (!updated) { - final StandardVersionedFlowStatus status = flowStatus.get(); - final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(null, null, status.getCurrentDifferences()); - updated = flowStatus.compareAndSet(status, updatedStatus); - } + versionControlFields.setSyncFailureExplanation(null); } catch (final IOException | NiFiRegistryException e) { final String message = String.format("Failed to synchronize Process Group with Flow Registry : " + e.getMessage()); - setSyncFailedState(message); + versionControlFields.setSyncFailureExplanation(message); LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e); } @@ -3253,10 +3239,6 @@ public final class StandardProcessGroup implements ProcessGroup { final Set updatedVersionedComponentIds = new HashSet<>(); for (final FlowDifference diff : flowComparison.getDifferences()) { - if (diff.getDifferenceType() == DifferenceType.POSITION_CHANGED) { - continue; - } - // If this update adds a new Controller Service, then we need to check if the service already exists at a higher level // and if so compare our VersionedControllerService to the existing service. if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) { @@ -3393,6 +3375,8 @@ public final class StandardProcessGroup implements ProcessGroup { final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId); final String registryName = flowRegistry == null ? registryId : flowRegistry.getName(); + final VersionedFlowState flowState = remoteCoordinates.getLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE; + final VersionControlInformation vci = new StandardVersionControlInformation.Builder() .registryId(registryId) .registryName(registryName) @@ -3402,8 +3386,7 @@ public final class StandardProcessGroup implements ProcessGroup { .flowName(flowId) .version(version) .flowSnapshot(proposed) - .modified(false) - .current(remoteCoordinates.getLatest()) + .status(new StandardVersionedFlowStatus(flowState, flowState.getDescription())) .build(); group.setVersionControlInformation(vci, Collections.emptyMap()); @@ -4149,13 +4132,9 @@ public final class StandardProcessGroup implements ProcessGroup { final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorGroupServiceIds(), new EvolvingDifferenceDescriptor()); final FlowComparison comparison = flowComparator.compare(); final Set differences = comparison.getDifferences(); - final Set functionalDifferences = differences.stream() - .filter(diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED) - .filter(diff -> diff.getDifferenceType() != DifferenceType.STYLE_CHANGED) - .collect(Collectors.toSet()); LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences); - return functionalDifferences; + return differences; } @@ -4170,7 +4149,8 @@ public final class StandardProcessGroup implements ProcessGroup { } if (verifyNotDirty) { - final boolean modified = versionControlInfo.isModified(); + final VersionedFlowState flowState = versionControlInfo.getStatus().getState(); + final boolean modified = flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE; final Set modifications = getModifications(); @@ -4186,7 +4166,7 @@ public final class StandardProcessGroup implements ProcessGroup { throw new IllegalStateException("Cannot change the Version of the flow for " + this + " because the Process Group has been modified (" + modifications.size() + " modifications) since it was last synchronized with the Flow Registry. The Process Group must be" - + " reverted to its original form before changing the version. See logs for more information on what has changed."); + + " reverted to its original form before changing the version."); } } } @@ -4393,8 +4373,8 @@ public final class StandardProcessGroup implements ProcessGroup { if (flowId != null && flowId.equals(vci.getFlowIdentifier())) { // Flow ID is the same. We want to publish the Process Group as the next version of the Flow. // In order to do this, we have to ensure that the Process Group is 'current'. - final boolean current = vci.isCurrent(); - if (!current) { + final VersionedFlowState state = vci.getStatus().getState(); + if (state == VersionedFlowState.STALE || state == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE) { throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier() + " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. " + "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry."); @@ -4441,10 +4421,15 @@ public final class StandardProcessGroup implements ProcessGroup { private void verifyNoDescendantsWithLocalModifications(final String action) { for (final ProcessGroup descendant : findAllProcessGroups()) { final VersionControlInformation descendantVci = descendant.getVersionControlInformation(); - if (descendantVci != null && descendantVci.isModified()) { - throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and " - + "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before " - + "this action can be performed on the parent Process Group."); + if (descendantVci != null) { + final VersionedFlowState flowState = descendantVci.getStatus().getState(); + final boolean modified = flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE; + + if (modified) { + throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and " + + "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before " + + "this action can be performed on the parent Process Group."); + } } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java index f362c1e25d..4be9898d2f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardVersionedFlowStatus.java @@ -17,21 +17,16 @@ package org.apache.nifi.groups; -import java.util.Set; - import org.apache.nifi.registry.flow.VersionedFlowState; import org.apache.nifi.registry.flow.VersionedFlowStatus; -import org.apache.nifi.registry.flow.diff.FlowDifference; class StandardVersionedFlowStatus implements VersionedFlowStatus { private final VersionedFlowState state; private final String explanation; - private final Set currentDifferences; - StandardVersionedFlowStatus(final VersionedFlowState state, final String explanation, final Set differences) { + StandardVersionedFlowStatus(final VersionedFlowState state, final String explanation) { this.state = state; this.explanation = explanation; - this.currentDifferences = differences; } @Override @@ -43,8 +38,4 @@ class StandardVersionedFlowStatus implements VersionedFlowStatus { public String getStateExplanation() { return explanation; } - - Set getCurrentDifferences() { - return currentDifferences; - } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/VersionControlFields.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/VersionControlFields.java new file mode 100644 index 0000000000..50c640be41 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/VersionControlFields.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import java.util.Set; + +import org.apache.nifi.registry.flow.diff.FlowDifference; + +public class VersionControlFields { + private volatile boolean locallyModified; + private volatile boolean stale; + private volatile String syncFailureExplanation = "Not yet synchronized with Flow Registry"; + private volatile Set flowDifferences; + + boolean isLocallyModified() { + return locallyModified; + } + + void setLocallyModified(final boolean locallyModified) { + this.locallyModified = locallyModified; + } + + boolean isStale() { + return stale; + } + + void setStale(final boolean stale) { + this.stale = stale; + } + + String getSyncFailureExplanation() { + return syncFailureExplanation; + } + + void setSyncFailureExplanation(final String syncFailureExplanation) { + this.syncFailureExplanation = syncFailureExplanation; + } + + Set getFlowDifferences() { + return flowDifferences; + } + + void setFlowDifferences(final Set flowDifferences) { + this.flowDifferences = flowDifferences; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java index 106d19a342..feef0e0ae2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java @@ -32,8 +32,6 @@ public class StandardVersionControlInformation implements VersionControlInformat private volatile String flowDescription; private final int version; private volatile VersionedProcessGroup flowSnapshot; - private volatile boolean modified; - private volatile boolean current; private final VersionedFlowStatus status; public static class Builder { @@ -46,8 +44,6 @@ public class StandardVersionControlInformation implements VersionControlInformat private String flowDescription; private int version; private VersionedProcessGroup flowSnapshot; - private Boolean modified = null; - private Boolean current = null; private VersionedFlowStatus status; public Builder registryId(String registryId) { @@ -90,16 +86,6 @@ public class StandardVersionControlInformation implements VersionControlInformat return this; } - public Builder modified(boolean modified) { - this.modified = modified; - return this; - } - - public Builder current(boolean current) { - this.current = current; - return this; - } - public Builder flowSnapshot(VersionedProcessGroup snapshot) { this.flowSnapshot = snapshot; return this; @@ -119,8 +105,17 @@ public class StandardVersionControlInformation implements VersionControlInformat .flowId(dto.getFlowId()) .flowName(dto.getFlowName()) .flowDescription(dto.getFlowDescription()) - .current(dto.getCurrent() == null ? true : dto.getCurrent()) - .modified(dto.getModified() == null ? false : dto.getModified()) + .status(new VersionedFlowStatus() { + @Override + public VersionedFlowState getState() { + return VersionedFlowState.valueOf(dto.getState()); + } + + @Override + public String getStateExplanation() { + return dto.getStateExplanation(); + } + }) .version(dto.getVersion()); return builder; @@ -133,7 +128,7 @@ public class StandardVersionControlInformation implements VersionControlInformat Objects.requireNonNull(version, "Version must be specified"); final StandardVersionControlInformation svci = new StandardVersionControlInformation(registryIdentifier, registryName, - bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current, status); + bucketIdentifier, flowIdentifier, version, flowSnapshot, status); svci.setBucketName(bucketName); svci.setFlowName(flowName); @@ -145,15 +140,13 @@ public class StandardVersionControlInformation implements VersionControlInformat public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version, - final VersionedProcessGroup snapshot, final boolean modified, final boolean current, final VersionedFlowStatus status) { + final VersionedProcessGroup snapshot, final VersionedFlowStatus status) { this.registryIdentifier = registryId; this.registryName = registryName; this.bucketIdentifier = bucketId; this.flowIdentifier = flowId; this.version = version; this.flowSnapshot = snapshot; - this.modified = modified; - this.current = current; this.status = status; } @@ -214,29 +207,11 @@ public class StandardVersionControlInformation implements VersionControlInformat return version; } - @Override - public boolean isModified() { - return modified; - } - - @Override - public boolean isCurrent() { - return current; - } - @Override public VersionedProcessGroup getFlowSnapshot() { return flowSnapshot; } - public void setModified(final boolean modified) { - this.modified = modified; - } - - public void setCurrent(final boolean current) { - this.current = current; - } - public void setFlowSnapshot(final VersionedProcessGroup flowSnapshot) { this.flowSnapshot = flowSnapshot; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 5808500f38..0b9c6f29bd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -178,9 +178,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } }; - final Runnable checkAuthorizations = new InitializationTask(); backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id, true); - backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 30L, 30L, TimeUnit.SECONDS); } @Override @@ -197,6 +195,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { logger.warn("Unable to communicate with remote instance {}", new Object[] {this, e}); } }); + + final Runnable checkAuthorizations = new InitializationTask(); + backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 0L, 60L, TimeUnit.SECONDS); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 78335f4f29..514cd1848a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -1442,20 +1442,6 @@ public interface NiFiServiceFacade { */ void verifyCanRevertLocalModifications(String groupId, VersionedFlowSnapshot versionedFlowSnapshot); - /** - * Updates the Process group with the given ID to match the new snapshot - * - * @param revision the revision of the Process Group - * @param groupId the ID of the Process Group - * @param versionControlInfo the Version Control information - * @param snapshot the new snapshot - * @param componentIdSeed the seed to use for generating new component ID's - * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to - * update the contents of that Process Group - * @return the Process Group - */ - ProcessGroupEntity updateProcessGroup(Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed, - boolean verifyNotModified, boolean updateDescendantVersionedFlows); /** * Updates the Process group with the given ID to match the new snapshot diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index ae89ef0af0..a2d6e41746 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -97,6 +97,7 @@ import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.flow.VersionedFlowState; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor; @@ -292,6 +293,7 @@ import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Implementation of NiFiServiceFacade that performs revision checking. @@ -1592,7 +1594,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final D dto = dtoCreation.apply(component); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate(dto, lastMod); + return new StandardRevisionUpdate<>(dto, lastMod); }); } @@ -1779,7 +1781,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); - final RevisionUpdate snapshot = new StandardRevisionUpdate(dto, null); + final RevisionUpdate snapshot = new StandardRevisionUpdate<>(dto, null); return entityFactory.createSnippetEntity(snapshot.getComponent()); } @@ -2088,7 +2090,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate(dto, lastMod); + return new StandardRevisionUpdate<>(dto, lastMod); }); } else { snapshot = revisionManager.updateRevision(claim, user, () -> { @@ -2098,7 +2100,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate(dto, lastMod); + return new StandardRevisionUpdate<>(dto, lastMod); }); } @@ -2440,7 +2442,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId()); final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); - return new StandardRevisionUpdate(registry, lastModification); + return new StandardRevisionUpdate<>(registry, lastModification); }); final FlowRegistry updatedReg = revisionUpdate.getComponent(); @@ -2483,7 +2485,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask); final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity()); - return new StandardRevisionUpdate(dto, lastMod); + return new StandardRevisionUpdate<>(dto, lastMod); }); final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId()); @@ -3649,6 +3651,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final StartVersionControlRequestEntity requestEntity) { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + final VersionControlInformation currentVci = processGroup.getVersionControlInformation(); final int expectedVersion = currentVci == null ? 1 : currentVci.getVersion() + 1; @@ -3697,15 +3700,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final VersionControlInformationDTO vci = new VersionControlInformationDTO(); vci.setBucketId(bucket.getIdentifier()); vci.setBucketName(bucket.getName()); - vci.setCurrent(true); vci.setFlowId(flow.getIdentifier()); vci.setFlowName(flow.getName()); vci.setFlowDescription(flow.getDescription()); vci.setGroupId(groupId); - vci.setModified(false); vci.setRegistryId(registryId); vci.setRegistryName(getFlowRegistryName(registryId)); vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion()); + vci.setState(VersionedFlowState.UP_TO_DATE.name()); final Map mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup); @@ -3777,8 +3779,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor()); final FlowComparison flowComparison = flowComparator.compare(); - final Set differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison, - diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED && diff.getDifferenceType() != DifferenceType.STYLE_CHANGED); + final Set differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison); final FlowComparisonEntity entity = new FlowComparisonEntity(); entity.setComponentDifferences(differenceDtos); @@ -4079,30 +4080,88 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return flowRegistry == null ? flowRegistryId : flowRegistry.getName(); } - @Override - public ProcessGroupEntity updateProcessGroup(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo, - final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) { + private List getComponentRevisions(final ProcessGroup processGroup, final boolean includeGroupRevision) { + final List revisions = new ArrayList<>(); + if (includeGroupRevision) { + revisions.add(revisionManager.getRevision(processGroup.getIdentifier())); + } - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true, updateDescendantVersionedFlows); + processGroup.findAllConnections().stream() + .map(component -> revisionManager.getRevision(component.getIdentifier())) + .forEach(revisions::add); + processGroup.findAllControllerServices().stream() + .map(component -> revisionManager.getRevision(component.getIdentifier())) + .forEach(revisions::add); + processGroup.findAllFunnels().stream() + .map(component -> revisionManager.getRevision(component.getIdentifier())) + .forEach(revisions::add); + processGroup.findAllInputPorts().stream() + .map(component -> revisionManager.getRevision(component.getIdentifier())) + .forEach(revisions::add); + processGroup.findAllOutputPorts().stream() + .map(component -> revisionManager.getRevision(component.getIdentifier())) + .forEach(revisions::add); + processGroup.findAllLabels().stream() + .map(component -> revisionManager.getRevision(component.getIdentifier())) + .forEach(revisions::add); + processGroup.findAllProcessGroups().stream() + .map(component -> revisionManager.getRevision(component.getIdentifier())) + .forEach(revisions::add); + processGroup.findAllProcessors().stream() + .map(component -> revisionManager.getRevision(component.getIdentifier())) + .forEach(revisions::add); + processGroup.findAllRemoteProcessGroups().stream() + .map(component -> revisionManager.getRevision(component.getIdentifier())) + .forEach(revisions::add); + processGroup.findAllRemoteProcessGroups().stream() + .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream())) + .map(component -> revisionManager.getRevision(component.getIdentifier())) + .forEach(revisions::add); + + return revisions; } @Override public ProcessGroupEntity updateProcessGroupContents(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo, final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) { - final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId); - final RevisionUpdate snapshot = updateComponent(user, revision, - processGroupNode, - () -> processGroupDAO.updateProcessGroupFlow(groupId, user, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows), - processGroup -> dtoFactory.createProcessGroupDto(processGroup)); + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + final List revisions = getComponentRevisions(processGroup, false); + revisions.add(revision); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); - final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); - final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier())); - final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier())); + final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions); + + final RevisionUpdate revisionUpdate = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask() { + @Override + public RevisionUpdate update() { + // update the Process Group + processGroupDAO.updateProcessGroupFlow(groupId, user, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows); + + // update the revisions + final Set updatedRevisions = revisions.stream() + .map(rev -> revisionManager.getRevision(rev.getComponentId()).incrementRevision(revision.getClientId())) + .collect(Collectors.toSet()); + + // save + controllerFacade.save(); + + // gather details for response + final ProcessGroupDTO dto = dtoFactory.createProcessGroupDto(processGroup); + + final Revision updatedRevision = revisionManager.getRevision(groupId).incrementRevision(revision.getClientId()); + final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity()); + return new StandardRevisionUpdate<>(dto, lastModification, updatedRevisions); + } + }); + + final FlowModification lastModification = revisionUpdate.getLastModification(); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); + final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(lastModification); + final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier())); + final List bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier())); final List bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); - return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities); + return entityFactory.createProcessGroupEntity(revisionUpdate.getComponent(), updatedRevision, permissions, status, bulletinEntities); } private AuthorizationResult authorizeAction(final Action action) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index a3bb5b2576..b3ccefb450 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -16,12 +16,32 @@ */ package org.apache.nifi.web.api; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiParam; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import io.swagger.annotations.Authorization; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriBuilder; +import javax.ws.rs.core.UriInfo; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.stream.XMLStreamReader; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AuthorizableLookup; import org.apache.nifi.authorization.AuthorizeAccess; @@ -47,6 +67,7 @@ import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.FlowRegistryUtils; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowState; import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest; import org.apache.nifi.registry.variable.VariableRegistryUpdateStep; import org.apache.nifi.remote.util.SiteToSiteRestApiClient; @@ -111,31 +132,6 @@ import org.slf4j.LoggerFactory; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedHashMap; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.UriInfo; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBElement; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; -import javax.xml.stream.XMLStreamReader; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -161,6 +157,13 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; + /** * RESTful endpoint for managing a Group. */ @@ -1657,8 +1660,8 @@ public class ProcessGroupResource extends ApplicationResource { versionControlInfo.setFlowDescription(flow.getDescription()); versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId())); - versionControlInfo.setModified(false); - versionControlInfo.setCurrent(flowSnapshot.isLatest()); + final VersionedFlowState flowState = flowSnapshot.isLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE; + versionControlInfo.setState(flowState.name()); // Step 3: Resolve Bundle info BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents()); @@ -1689,8 +1692,12 @@ public class ProcessGroupResource extends ApplicationResource { } } }, - () -> { - }, + () -> { + final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot(); + if (versionedFlowSnapshot != null) { + serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents()); + } + }, processGroupEntity -> { final ProcessGroupDTO processGroup = processGroupEntity.getComponent(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index 5dc732538d..950bd97bf1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -40,6 +40,7 @@ import org.apache.nifi.registry.flow.FlowRegistryUtils; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.flow.VersionedFlowState; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.util.BundleUtils; import org.apache.nifi.web.NiFiServiceFacade; @@ -166,14 +167,14 @@ public class VersionsResource extends ApplicationResource { @POST - @Consumes(MediaType.WILDCARD) - @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.TEXT_PLAIN) @Path("active-requests") @ApiOperation( value = "Creates a request so that a Process Group can be placed under Version Control or have its Version Control configuration changed. Creating this request will " + "prevent any other threads from simultaneously saving local changes to Version Control. It will not, however, actually save the local flow to the Flow Registry. A " + "POST to /versions/process-groups/{id} should be used to initiate saving of the local flow to the Flow Registry.", - response = VersionControlInformationEntity.class, + response = String.class, notes = NON_GUARANTEED_ENDPOINT) @ApiResponses(value = { @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @@ -186,7 +187,7 @@ public class VersionsResource extends ApplicationResource { @ApiParam(value = "The versioned flow details.", required = true) final CreateActiveRequestEntity requestEntity) { if (isReplicateRequest()) { - return replicate(HttpMethod.POST); + return replicate(HttpMethod.POST, requestEntity); } if (requestEntity.getProcessGroupId() == null) { @@ -548,11 +549,14 @@ public class VersionsResource extends ApplicationResource { final CreateActiveRequestEntity activeRequestEntity = new CreateActiveRequestEntity(); activeRequestEntity.setProcessGroupId(groupId); + final Map headers = new HashMap<>(); + headers.put("content-type", MediaType.APPLICATION_JSON); + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { - clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, activeRequestEntity, Collections.emptyMap()).awaitMergedResponse(); + clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, activeRequestEntity, headers).awaitMergedResponse(); } else { clusterResponse = getRequestReplicator().forwardToCoordinator( - getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, activeRequestEntity, Collections.emptyMap()).awaitMergedResponse(); + getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, activeRequestEntity, headers).awaitMergedResponse(); } } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); @@ -761,18 +765,20 @@ public class VersionsResource extends ApplicationResource { final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO(); versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier()); versionControlInfoDto.setBucketName(bucket.getName()); - versionControlInfoDto.setCurrent(snapshotMetadata.getVersion() == flow.getVersionCount()); versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier()); versionControlInfoDto.setFlowName(flow.getName()); versionControlInfoDto.setFlowDescription(flow.getDescription()); versionControlInfoDto.setGroupId(groupId); - versionControlInfoDto.setModified(false); versionControlInfoDto.setVersion(snapshotMetadata.getVersion()); versionControlInfoDto.setRegistryId(entity.getRegistryId()); versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(entity.getRegistryId())); - final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false, - entity.getUpdateDescendantVersionedFlows()); + final VersionedFlowState flowState = snapshotMetadata.getVersion() == flow.getVersionCount() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE; + versionControlInfoDto.setState(flowState.name()); + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroupContents(user, rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false, + true, entity.getUpdateDescendantVersionedFlows()); final VersionControlInformationDTO updatedVci = updatedGroup.getComponent().getVersionControlInformation(); final VersionControlInformationEntity responseEntity = new VersionControlInformationEntity(); @@ -1103,7 +1109,7 @@ public class VersionsResource extends ApplicationResource { affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, true, true); vcur.markComplete(updatedVersionControlEntity); - } catch (final LifecycleManagementException e) { + } catch (final Exception e) { logger.error("Failed to update flow to new version", e); vcur.setFailureReason("Failed to update flow to new version due to " + e); } @@ -1268,7 +1274,7 @@ public class VersionsResource extends ApplicationResource { affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, false, true); vcur.markComplete(updatedVersionControlEntity); - } catch (final LifecycleManagementException e) { + } catch (final Exception e) { logger.error("Failed to update flow to new version", e); vcur.setFailureReason("Failed to update flow to new version due to " + e.getMessage()); } @@ -1403,15 +1409,14 @@ public class VersionsResource extends ApplicationResource { final VersionControlInformationDTO vci = new VersionControlInformationDTO(); vci.setBucketId(metadata.getBucketIdentifier()); vci.setBucketName(bucket.getName()); - vci.setCurrent(flowSnapshot.isLatest()); vci.setFlowDescription(flow.getDescription()); vci.setFlowId(flow.getIdentifier()); vci.setFlowName(flow.getName()); vci.setGroupId(groupId); - vci.setModified(false); vci.setRegistryId(requestVci.getRegistryId()); vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId())); vci.setVersion(metadata.getVersion()); + vci.setState(flowSnapshot.isLatest() ? VersionedFlowState.UP_TO_DATE.name() : VersionedFlowState.STALE.name()); serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false, updateDescendantVersionedFlows); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index ca781a4021..5bdb0400bf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.web.api.dto; +import javax.ws.rs.WebApplicationException; + import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -114,7 +116,6 @@ import org.apache.nifi.provenance.lineage.LineageNode; import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.flow.FlowRegistry; -import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedComponent; import org.apache.nifi.registry.flow.VersionedFlowState; @@ -140,7 +141,6 @@ import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.FlowModification; import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.dto.action.ActionDTO; @@ -192,7 +192,6 @@ import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.revision.RevisionManager; -import javax.ws.rs.WebApplicationException; import java.text.Collator; import java.util.ArrayList; import java.util.Arrays; @@ -215,7 +214,6 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -234,8 +232,6 @@ public final class DtoFactory { private ControllerServiceProvider controllerServiceProvider; private EntityFactory entityFactory; private Authorizer authorizer; - private NiFiProperties properties; - private FlowRegistryClient flowRegistryClient; public ControllerConfigurationDTO createControllerConfigurationDto(final ControllerFacade controllerFacade) { final ControllerConfigurationDTO dto = new ControllerConfigurationDTO(); @@ -2190,23 +2186,17 @@ public final class DtoFactory { public Set createComponentDifferenceDtos(final FlowComparison comparison) { - return createComponentDifferenceDtos(comparison, null); - } - - public Set createComponentDifferenceDtos(final FlowComparison comparison, final Predicate filter) { final Map> differencesByComponent = new HashMap<>(); for (final FlowDifference difference : comparison.getDifferences()) { - if (filter == null || filter.test(difference)) { - final ComponentDifferenceDTO componentDiff = createComponentDifference(difference); - final List differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>()); + final ComponentDifferenceDTO componentDiff = createComponentDifference(difference); + final List differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>()); - final DifferenceDTO dto = new DifferenceDTO(); - dto.setDifferenceType(difference.getDifferenceType().getDescription()); - dto.setDifference(difference.getDescription()); + final DifferenceDTO dto = new DifferenceDTO(); + dto.setDifferenceType(difference.getDifferenceType().getDescription()); + dto.setDifference(difference.getDescription()); - differences.add(dto); - } + differences.add(dto); } for (final Map.Entry> entry : differencesByComponent.entrySet()) { @@ -2259,8 +2249,6 @@ public final class DtoFactory { dto.setFlowName(versionControlInfo.getFlowName()); dto.setFlowDescription(versionControlInfo.getFlowDescription()); dto.setVersion(versionControlInfo.getVersion()); - dto.setCurrent(versionControlInfo.isCurrent()); - dto.setModified(versionControlInfo.isModified()); final VersionedFlowStatus status = versionControlInfo.getStatus(); final VersionedFlowState state = status.getState(); @@ -3501,8 +3489,6 @@ public final class DtoFactory { copy.setFlowName(original.getFlowName()); copy.setFlowDescription(original.getFlowDescription()); copy.setVersion(original.getVersion()); - copy.setCurrent(original.getCurrent()); - copy.setModified(original.getModified()); copy.setState(original.getState()); copy.setStateExplanation(original.getStateExplanation()); return copy; @@ -3833,12 +3819,4 @@ public final class DtoFactory { public void setBulletinRepository(BulletinRepository bulletinRepository) { this.bulletinRepository = bulletinRepository; } - - public void setProperties(final NiFiProperties properties) { - this.properties = properties; - } - - public void setFlowRegistryClient(FlowRegistryClient flowRegistryClient) { - this.flowRegistryClient = flowRegistryClient; - } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index d25f294dfc..52a18dc1dc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -26,6 +26,7 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; @@ -234,6 +235,10 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } if (isNotNull(processGroupDTO.getPosition())) { group.setPosition(new Position(processGroupDTO.getPosition().getX(), processGroupDTO.getPosition().getY())); + final ProcessGroup parent = group.getParent(); + if (parent != null) { + parent.onComponentModified(); + } } if (isNotNull(comments)) { group.setComments(comments); @@ -258,8 +263,6 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation) .registryName(registryName) .flowSnapshot(flowSnapshot) - .modified(false) - .current(true) .build(); group.setVersionControlInformation(vci, versionedComponentMapping); @@ -281,6 +284,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou final ProcessGroup group = locateProcessGroup(flowController, groupId); group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows); + group.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize); final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation) .flowSnapshot(proposedSnapshot.getFlowContents()) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index 941aae07b5..8228a7781b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -448,7 +448,10 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot } } - remoteProcessGroup.getProcessGroup().onComponentModified(); + final ProcessGroup group = remoteProcessGroup.getProcessGroup(); + if (group != null) { + group.onComponentModified(); + } return remoteProcessGroup; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml index 43843ef265..ebab24e71f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml @@ -52,8 +52,6 @@ - -