NIFI-4436: Removed isCurrent, isModified from VersionControlInformation and associated DTO. Bug fixes & code refactoring

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Mark Payne 2017-12-05 16:18:16 -05:00 committed by Bryan Bende
parent db2cc9fec1
commit fe8b30bf26
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
19 changed files with 384 additions and 309 deletions

View File

@ -32,8 +32,6 @@ public class VersionControlInformationDTO {
private String flowName;
private String flowDescription;
private Integer version;
private Boolean modified;
private Boolean current;
private String state;
private String stateExplanation;
@ -118,26 +116,6 @@ public class VersionControlInformationDTO {
this.version = version;
}
@ApiModelProperty(readOnly=true,
value = "Whether or not the flow has been modified since it was last synced to the Flow Registry. The value will be null if this information is not yet known.")
public Boolean getModified() {
return modified;
}
public void setModified(Boolean modified) {
this.modified = modified;
}
@ApiModelProperty(readOnly=true,
value = "Whether or not this is the most recent version of the flow in the Flow Registry. The value will be null if this information is not yet known.")
public Boolean getCurrent() {
return current;
}
public void setCurrent(Boolean current) {
this.current = current;
}
@ApiModelProperty(readOnly = true,
value = "The current state of the Process Group, as it relates to the Versioned Flow",
allowableValues = "LOCALLY_MODIFIED_DESCENDANT, LOCALLY_MODIFIED, STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE")

View File

@ -55,7 +55,7 @@ public class ProcessGroupEntityMerger implements ComponentEntityMerger<ProcessGr
if (targetVersionControl == null) {
targetGroupDto.setVersionControlInformation(toMergeGroupDto.getVersionControlInformation());
} else if (toMergeVersionControl != null) {
targetVersionControl.setCurrent(Boolean.TRUE.equals(targetVersionControl.getCurrent()) && Boolean.TRUE.equals(toMergeVersionControl.getCurrent()));
VersionControlInformationEntityMerger.updateFlowState(targetVersionControl, toMergeVersionControl);
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.nifi.cluster.manager;
import java.util.Map;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
@ -37,12 +38,54 @@ public class VersionControlInformationEntityMerger {
.forEach(entity -> {
final VersionControlInformationDTO dto = entity.getVersionControlInformation();
// We consider the flow to be current only if ALL nodes indicate that it is current
clientDto.setCurrent(Boolean.TRUE.equals(clientDto.getCurrent()) && Boolean.TRUE.equals(dto.getCurrent()));
// We consider the flow to be modified if ANY node indicates that it is modified
clientDto.setModified(Boolean.TRUE.equals(clientDto.getModified()) || Boolean.TRUE.equals(dto.getModified()));
updateFlowState(clientDto, dto);
});
}
private static boolean isCurrent(final VersionedFlowState state) {
return state == VersionedFlowState.UP_TO_DATE || state == VersionedFlowState.LOCALLY_MODIFIED;
}
private static boolean isModified(final VersionedFlowState state) {
return state == VersionedFlowState.LOCALLY_MODIFIED || state == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
}
public static void updateFlowState(final VersionControlInformationDTO clientDto, final VersionControlInformationDTO dto) {
final VersionedFlowState clientState = VersionedFlowState.valueOf(clientDto.getState());
if (clientState == VersionedFlowState.SYNC_FAILURE) {
return;
}
final VersionedFlowState dtoState = VersionedFlowState.valueOf(dto.getState());
if (dtoState == VersionedFlowState.SYNC_FAILURE) {
clientDto.setState(dto.getState());
clientDto.setStateExplanation(dto.getStateExplanation());
return;
}
final boolean clientCurrent = isCurrent(clientState);
final boolean clientModified = isModified(clientState);
final boolean dtoCurrent = isCurrent(dtoState);
final boolean dtoModified = isModified(dtoState);
final boolean current = clientCurrent && dtoCurrent;
final boolean stale = !current;
final boolean modified = clientModified && dtoModified;
final VersionedFlowState flowState;
if (modified && stale) {
flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
} else if (modified) {
flowState = VersionedFlowState.LOCALLY_MODIFIED;
} else if (stale) {
flowState = VersionedFlowState.STALE;
} else {
flowState = VersionedFlowState.UP_TO_DATE;
}
clientDto.setState(flowState.name());
clientDto.setStateExplanation(flowState.getDescription());
}
}

View File

@ -65,17 +65,6 @@ public interface VersionControlInformation {
*/
int getVersion();
/**
* @return <code>true</code> if the flow has been modified since the last time that it was updated from the Flow Registry or saved
* to the Flow Registry; <code>false</code> if the flow is in sync with the Flow Registry.
*/
boolean isModified();
/**
* @return <code>true</code> if this version of the flow is the most recent version of the flow available in the Flow Registry, <code>false</code> otherwise.
*/
boolean isCurrent();
/**
* @return the current status of the Process Group as it relates to the associated Versioned Flow.
*/

View File

@ -22,31 +22,42 @@ public enum VersionedFlowState {
/**
* We are unable to communicate with the Flow Registry in order to determine the appropriate state
*/
SYNC_FAILURE,
SYNC_FAILURE("Failed to communicate with Flow Registry"),
/**
* This Process Group (or a child/descendant Process Group that is not itself under Version Control)
* is on the latest version of the Versioned Flow, but is different than the Versioned Flow that is
* stored in the Flow Registry.
*/
LOCALLY_MODIFIED,
LOCALLY_MODIFIED("Local changes have been made"),
/**
* This Process Group has not been modified since it was last synchronized with the Flow Registry, but
* the Flow Registry has a newer version of the flow than what is contained in this Process Group.
*/
STALE,
STALE("A newer version of this flow is available"),
/**
* This Process Group (or a child/descendant Process Group that is not itself under Version Control)
* has been modified since it was last synchronized with the Flow Registry, and the Flow Registry has
* a newer version of the flow than what is contained in this Process Group.
*/
LOCALLY_MODIFIED_AND_STALE,
LOCALLY_MODIFIED_AND_STALE("Local changes have been made and a newer version of this flow is available"),
/**
* This Process Group and all child/descendant Process Groups are on the latest version of the flow in
* the Flow Registry and have no local modifications.
*/
UP_TO_DATE;
UP_TO_DATE("Flow version is current");
private final String description;
private VersionedFlowState(final String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}

View File

@ -88,6 +88,7 @@ import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
@ -1116,10 +1117,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final FlowRegistry flowRegistry = controller.getFlowRegistryClient().getFlowRegistry(versionControlInfoDto.getRegistryId());
final String registryName = flowRegistry == null ? versionControlInfoDto.getRegistryId() : flowRegistry.getName();
versionControlInfoDto.setState(VersionedFlowState.SYNC_FAILURE.name());
versionControlInfoDto.setStateExplanation("Process Group has not yet been synchronized with the Flow Registry");
final StandardVersionControlInformation versionControlInformation = StandardVersionControlInformation.Builder.fromDto(versionControlInfoDto)
.registryName(registryName)
.modified(false)
.current(true)
.build();
// pass empty map for the version control mapping because the VersionedComponentId has already been set on the components

View File

@ -169,8 +169,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private final Map<String, Template> templates = new HashMap<>();
private final StringEncryptor encryptor;
private final MutableVariableRegistry variableRegistry;
private final AtomicReference<StandardVersionedFlowStatus> flowStatus = new AtomicReference<>(
new StandardVersionedFlowStatus(null, "Not yet synchronized with Flow Registry", null));
private final VersionControlFields versionControlFields = new VersionControlFields();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@ -339,14 +338,22 @@ public final class StandardProcessGroup implements ProcessGroup {
// update the vci counts for this child group
final VersionControlInformation vci = childGroup.getVersionControlInformation();
if (vci != null) {
if (vci.isModified() && !vci.isCurrent()) {
locallyModifiedAndStale += 1;
} else if (!vci.isCurrent()) {
stale += 1;
} else if (vci.isModified()) {
locallyModified += 1;
} else {
upToDate += 1;
switch (vci.getStatus().getState()) {
case LOCALLY_MODIFIED:
locallyModified++;
break;
case LOCALLY_MODIFIED_AND_STALE:
locallyModifiedAndStale++;
break;
case STALE:
stale++;
break;
case SYNC_FAILURE:
syncFailure++;
break;
case UP_TO_DATE:
upToDate++;
break;
}
}
@ -2938,17 +2945,9 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
clearFlowDifferences();
versionControlFields.setFlowDifferences(null);
}
private void clearFlowDifferences() {
boolean updated = false;
while (!updated) {
final StandardVersionedFlowStatus status = flowStatus.get();
final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), null);
updated = flowStatus.compareAndSet(status, updatedStatus);
}
}
@Override
public void setVersionControlInformation(final VersionControlInformation versionControlInformation, final Map<String, String> versionedComponentIds) {
@ -2959,8 +2958,6 @@ public final class StandardProcessGroup implements ProcessGroup {
versionControlInformation.getFlowIdentifier(),
versionControlInformation.getVersion(),
stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true),
versionControlInformation.isModified(),
versionControlInformation.isCurrent(),
versionControlInformation.getStatus()) {
@Override
@ -2970,60 +2967,50 @@ public final class StandardProcessGroup implements ProcessGroup {
return registry == null ? registryId : registry.getName();
}
@Override
public boolean isModified() {
boolean updated = false;
while (true) {
final StandardVersionedFlowStatus status = flowStatus.get();
Set<FlowDifference> differences = status.getCurrentDifferences();
private boolean isModified() {
Set<FlowDifference> differences = versionControlFields.getFlowDifferences();
if (differences == null) {
differences = getModifications();
if (differences == null) {
differences = getModifications();
if (differences == null) {
return false;
}
final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), differences);
updated = flowStatus.compareAndSet(status, updatedStatus);
if (updated) {
return !differences.isEmpty();
}
continue;
return false;
}
return !differences.isEmpty();
versionControlFields.setFlowDifferences(differences);
}
return !differences.isEmpty();
}
@Override
public VersionedFlowStatus getStatus() {
// If current state is a sync failure, then
final StandardVersionedFlowStatus status = flowStatus.get();
final VersionedFlowState state = status.getState();
if (state == VersionedFlowState.SYNC_FAILURE) {
return status;
final String syncFailureExplanation = versionControlFields.getSyncFailureExplanation();
if (syncFailureExplanation != null) {
return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, syncFailureExplanation);
}
final boolean modified = isModified();
if (!modified) {
final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get();
if (vci.getFlowSnapshot() == null) {
return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry", null);
return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry");
}
}
final boolean stale = !isCurrent();
final boolean stale = versionControlFields.isStale();
final VersionedFlowState flowState;
if (modified && stale) {
return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED_AND_STALE, "Local changes have been made and a newer version of this flow is available", null);
flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
} else if (modified) {
return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED, "Local changes have been made", null);
flowState = VersionedFlowState.LOCALLY_MODIFIED;
} else if (stale) {
return new StandardVersionedFlowStatus(VersionedFlowState.STALE, "A newer version of this flow is available", null);
flowState = VersionedFlowState.STALE;
} else {
return new StandardVersionedFlowStatus(VersionedFlowState.UP_TO_DATE, "Flow version is current", null);
flowState = VersionedFlowState.UP_TO_DATE;
}
return new StandardVersionedFlowStatus(flowState, flowState.getDescription());
}
};
@ -3031,11 +3018,23 @@ public final class StandardProcessGroup implements ProcessGroup {
svci.setFlowName(versionControlInformation.getFlowName());
svci.setFlowDescription(versionControlInformation.getFlowDescription());
final VersionedFlowState flowState = versionControlInformation.getStatus().getState();
versionControlFields.setStale(flowState == VersionedFlowState.STALE || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE);
versionControlFields.setLocallyModified(flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE);
versionControlFields.setSyncFailureExplanation(null);
writeLock.lock();
try {
updateVersionedComponentIds(this, versionedComponentIds);
this.versionControlInfo.set(svci);
clearFlowDifferences();
versionControlFields.setFlowDifferences(null);
final ProcessGroup parent = getParent();
if (parent != null) {
parent.onComponentModified();
}
scheduler.submitFrameworkTask(() -> synchronizeWithFlowRegistry(flowController.getFlowRegistryClient()));
} finally {
writeLock.unlock();
}
@ -3156,14 +3155,6 @@ public final class StandardProcessGroup implements ProcessGroup {
.forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup));
}
private void setSyncFailedState(final String explanation) {
boolean updated = false;
while (!updated) {
final StandardVersionedFlowStatus status = flowStatus.get();
final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, explanation, status.getCurrentDifferences());
updated = flowStatus.compareAndSet(status, updatedStatus);
}
}
@Override
public void synchronizeWithFlowRegistry(final FlowRegistryClient flowRegistryClient) {
@ -3177,7 +3168,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (flowRegistry == null) {
final String message = String.format("Unable to synchronize Process Group with Flow Registry because Process Group was placed under Version Control using Flow Registry "
+ "with identifier %s but cannot find any Flow Registry with this identifier", registryId);
setSyncFailedState(message);
versionControlFields.setSyncFailureExplanation(message);
LOG.error("Unable to synchronize {} with Flow Registry because Process Group was placed under Version Control using Flow Registry "
+ "with identifier {} but cannot find any Flow Registry with this identifier", this, registryId);
@ -3195,7 +3186,7 @@ public final class StandardProcessGroup implements ProcessGroup {
} catch (final IOException | NiFiRegistryException e) {
final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not retrieve version %s of flow with identifier %s in bucket %s",
vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier());
setSyncFailedState(message);
versionControlFields.setSyncFailureExplanation(message);
LOG.error("Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}",
this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier(), e);
@ -3213,22 +3204,17 @@ public final class StandardProcessGroup implements ProcessGroup {
if (latestVersion == vci.getVersion()) {
LOG.debug("{} is currently at the most recent version ({}) of the flow that is under Version Control", this, latestVersion);
vci.setCurrent(true);
versionControlFields.setStale(false);
} else {
vci.setCurrent(false);
LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}",
new Object[] {this, vci.getVersion(), latestVersion});
versionControlFields.setStale(true);
}
boolean updated = false;
while (!updated) {
final StandardVersionedFlowStatus status = flowStatus.get();
final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(null, null, status.getCurrentDifferences());
updated = flowStatus.compareAndSet(status, updatedStatus);
}
versionControlFields.setSyncFailureExplanation(null);
} catch (final IOException | NiFiRegistryException e) {
final String message = String.format("Failed to synchronize Process Group with Flow Registry : " + e.getMessage());
setSyncFailedState(message);
versionControlFields.setSyncFailureExplanation(message);
LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e);
}
@ -3253,10 +3239,6 @@ public final class StandardProcessGroup implements ProcessGroup {
final Set<String> updatedVersionedComponentIds = new HashSet<>();
for (final FlowDifference diff : flowComparison.getDifferences()) {
if (diff.getDifferenceType() == DifferenceType.POSITION_CHANGED) {
continue;
}
// If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
// and if so compare our VersionedControllerService to the existing service.
if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
@ -3393,6 +3375,8 @@ public final class StandardProcessGroup implements ProcessGroup {
final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
final VersionedFlowState flowState = remoteCoordinates.getLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
final VersionControlInformation vci = new StandardVersionControlInformation.Builder()
.registryId(registryId)
.registryName(registryName)
@ -3402,8 +3386,7 @@ public final class StandardProcessGroup implements ProcessGroup {
.flowName(flowId)
.version(version)
.flowSnapshot(proposed)
.modified(false)
.current(remoteCoordinates.getLatest())
.status(new StandardVersionedFlowStatus(flowState, flowState.getDescription()))
.build();
group.setVersionControlInformation(vci, Collections.emptyMap());
@ -4149,13 +4132,9 @@ public final class StandardProcessGroup implements ProcessGroup {
final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorGroupServiceIds(), new EvolvingDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare();
final Set<FlowDifference> differences = comparison.getDifferences();
final Set<FlowDifference> functionalDifferences = differences.stream()
.filter(diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED)
.filter(diff -> diff.getDifferenceType() != DifferenceType.STYLE_CHANGED)
.collect(Collectors.toSet());
LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
return functionalDifferences;
return differences;
}
@ -4170,7 +4149,8 @@ public final class StandardProcessGroup implements ProcessGroup {
}
if (verifyNotDirty) {
final boolean modified = versionControlInfo.isModified();
final VersionedFlowState flowState = versionControlInfo.getStatus().getState();
final boolean modified = flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
final Set<FlowDifference> modifications = getModifications();
@ -4186,7 +4166,7 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException("Cannot change the Version of the flow for " + this
+ " because the Process Group has been modified (" + modifications.size()
+ " modifications) since it was last synchronized with the Flow Registry. The Process Group must be"
+ " reverted to its original form before changing the version. See logs for more information on what has changed.");
+ " reverted to its original form before changing the version.");
}
}
}
@ -4393,8 +4373,8 @@ public final class StandardProcessGroup implements ProcessGroup {
if (flowId != null && flowId.equals(vci.getFlowIdentifier())) {
// Flow ID is the same. We want to publish the Process Group as the next version of the Flow.
// In order to do this, we have to ensure that the Process Group is 'current'.
final boolean current = vci.isCurrent();
if (!current) {
final VersionedFlowState state = vci.getStatus().getState();
if (state == VersionedFlowState.STALE || state == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+ " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. "
+ "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry.");
@ -4441,10 +4421,15 @@ public final class StandardProcessGroup implements ProcessGroup {
private void verifyNoDescendantsWithLocalModifications(final String action) {
for (final ProcessGroup descendant : findAllProcessGroups()) {
final VersionControlInformation descendantVci = descendant.getVersionControlInformation();
if (descendantVci != null && descendantVci.isModified()) {
throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and "
+ "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before "
+ "this action can be performed on the parent Process Group.");
if (descendantVci != null) {
final VersionedFlowState flowState = descendantVci.getStatus().getState();
final boolean modified = flowState == VersionedFlowState.LOCALLY_MODIFIED || flowState == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
if (modified) {
throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and "
+ "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before "
+ "this action can be performed on the parent Process Group.");
}
}
}
}

View File

@ -17,21 +17,16 @@
package org.apache.nifi.groups;
import java.util.Set;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedFlowStatus;
import org.apache.nifi.registry.flow.diff.FlowDifference;
class StandardVersionedFlowStatus implements VersionedFlowStatus {
private final VersionedFlowState state;
private final String explanation;
private final Set<FlowDifference> currentDifferences;
StandardVersionedFlowStatus(final VersionedFlowState state, final String explanation, final Set<FlowDifference> differences) {
StandardVersionedFlowStatus(final VersionedFlowState state, final String explanation) {
this.state = state;
this.explanation = explanation;
this.currentDifferences = differences;
}
@Override
@ -43,8 +38,4 @@ class StandardVersionedFlowStatus implements VersionedFlowStatus {
public String getStateExplanation() {
return explanation;
}
Set<FlowDifference> getCurrentDifferences() {
return currentDifferences;
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.groups;
import java.util.Set;
import org.apache.nifi.registry.flow.diff.FlowDifference;
public class VersionControlFields {
private volatile boolean locallyModified;
private volatile boolean stale;
private volatile String syncFailureExplanation = "Not yet synchronized with Flow Registry";
private volatile Set<FlowDifference> flowDifferences;
boolean isLocallyModified() {
return locallyModified;
}
void setLocallyModified(final boolean locallyModified) {
this.locallyModified = locallyModified;
}
boolean isStale() {
return stale;
}
void setStale(final boolean stale) {
this.stale = stale;
}
String getSyncFailureExplanation() {
return syncFailureExplanation;
}
void setSyncFailureExplanation(final String syncFailureExplanation) {
this.syncFailureExplanation = syncFailureExplanation;
}
Set<FlowDifference> getFlowDifferences() {
return flowDifferences;
}
void setFlowDifferences(final Set<FlowDifference> flowDifferences) {
this.flowDifferences = flowDifferences;
}
}

View File

@ -32,8 +32,6 @@ public class StandardVersionControlInformation implements VersionControlInformat
private volatile String flowDescription;
private final int version;
private volatile VersionedProcessGroup flowSnapshot;
private volatile boolean modified;
private volatile boolean current;
private final VersionedFlowStatus status;
public static class Builder {
@ -46,8 +44,6 @@ public class StandardVersionControlInformation implements VersionControlInformat
private String flowDescription;
private int version;
private VersionedProcessGroup flowSnapshot;
private Boolean modified = null;
private Boolean current = null;
private VersionedFlowStatus status;
public Builder registryId(String registryId) {
@ -90,16 +86,6 @@ public class StandardVersionControlInformation implements VersionControlInformat
return this;
}
public Builder modified(boolean modified) {
this.modified = modified;
return this;
}
public Builder current(boolean current) {
this.current = current;
return this;
}
public Builder flowSnapshot(VersionedProcessGroup snapshot) {
this.flowSnapshot = snapshot;
return this;
@ -119,8 +105,17 @@ public class StandardVersionControlInformation implements VersionControlInformat
.flowId(dto.getFlowId())
.flowName(dto.getFlowName())
.flowDescription(dto.getFlowDescription())
.current(dto.getCurrent() == null ? true : dto.getCurrent())
.modified(dto.getModified() == null ? false : dto.getModified())
.status(new VersionedFlowStatus() {
@Override
public VersionedFlowState getState() {
return VersionedFlowState.valueOf(dto.getState());
}
@Override
public String getStateExplanation() {
return dto.getStateExplanation();
}
})
.version(dto.getVersion());
return builder;
@ -133,7 +128,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
Objects.requireNonNull(version, "Version must be specified");
final StandardVersionControlInformation svci = new StandardVersionControlInformation(registryIdentifier, registryName,
bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current, status);
bucketIdentifier, flowIdentifier, version, flowSnapshot, status);
svci.setBucketName(bucketName);
svci.setFlowName(flowName);
@ -145,15 +140,13 @@ public class StandardVersionControlInformation implements VersionControlInformat
public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version,
final VersionedProcessGroup snapshot, final boolean modified, final boolean current, final VersionedFlowStatus status) {
final VersionedProcessGroup snapshot, final VersionedFlowStatus status) {
this.registryIdentifier = registryId;
this.registryName = registryName;
this.bucketIdentifier = bucketId;
this.flowIdentifier = flowId;
this.version = version;
this.flowSnapshot = snapshot;
this.modified = modified;
this.current = current;
this.status = status;
}
@ -214,29 +207,11 @@ public class StandardVersionControlInformation implements VersionControlInformat
return version;
}
@Override
public boolean isModified() {
return modified;
}
@Override
public boolean isCurrent() {
return current;
}
@Override
public VersionedProcessGroup getFlowSnapshot() {
return flowSnapshot;
}
public void setModified(final boolean modified) {
this.modified = modified;
}
public void setCurrent(final boolean current) {
this.current = current;
}
public void setFlowSnapshot(final VersionedProcessGroup flowSnapshot) {
this.flowSnapshot = flowSnapshot;
}

View File

@ -178,9 +178,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
};
final Runnable checkAuthorizations = new InitializationTask();
backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id, true);
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 30L, 30L, TimeUnit.SECONDS);
}
@Override
@ -197,6 +195,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
logger.warn("Unable to communicate with remote instance {}", new Object[] {this, e});
}
});
final Runnable checkAuthorizations = new InitializationTask();
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 0L, 60L, TimeUnit.SECONDS);
}
@Override

View File

@ -1442,20 +1442,6 @@ public interface NiFiServiceFacade {
*/
void verifyCanRevertLocalModifications(String groupId, VersionedFlowSnapshot versionedFlowSnapshot);
/**
* Updates the Process group with the given ID to match the new snapshot
*
* @param revision the revision of the Process Group
* @param groupId the ID of the Process Group
* @param versionControlInfo the Version Control information
* @param snapshot the new snapshot
* @param componentIdSeed the seed to use for generating new component ID's
* @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
* update the contents of that Process Group
* @return the Process Group
*/
ProcessGroupEntity updateProcessGroup(Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
boolean verifyNotModified, boolean updateDescendantVersionedFlows);
/**
* Updates the Process group with the given ID to match the new snapshot

View File

@ -97,6 +97,7 @@ import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
@ -292,6 +293,7 @@ import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
@ -1592,7 +1594,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final D dto = dtoCreation.apply(component);
final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
return new StandardRevisionUpdate<D>(dto, lastMod);
return new StandardRevisionUpdate<>(dto, lastMod);
});
}
@ -1779,7 +1781,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
final SnippetDTO dto = dtoFactory.createSnippetDto(snippet);
final RevisionUpdate<SnippetDTO> snapshot = new StandardRevisionUpdate<SnippetDTO>(dto, null);
final RevisionUpdate<SnippetDTO> snapshot = new StandardRevisionUpdate<>(dto, null);
return entityFactory.createSnippetEntity(snapshot.getComponent());
}
@ -2088,7 +2090,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
return new StandardRevisionUpdate<ControllerServiceDTO>(dto, lastMod);
return new StandardRevisionUpdate<>(dto, lastMod);
});
} else {
snapshot = revisionManager.updateRevision(claim, user, () -> {
@ -2098,7 +2100,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
controllerFacade.save();
final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
return new StandardRevisionUpdate<ControllerServiceDTO>(dto, lastMod);
return new StandardRevisionUpdate<>(dto, lastMod);
});
}
@ -2440,7 +2442,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final Revision updatedRevision = revisionManager.getRevision(revision.getComponentId()).incrementRevision(revision.getClientId());
final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
return new StandardRevisionUpdate<FlowRegistry>(registry, lastModification);
return new StandardRevisionUpdate<>(registry, lastModification);
});
final FlowRegistry updatedReg = revisionUpdate.getComponent();
@ -2483,7 +2485,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask);
final FlowModification lastMod = new FlowModification(revision.incrementRevision(revision.getClientId()), user.getIdentity());
return new StandardRevisionUpdate<ReportingTaskDTO>(dto, lastMod);
return new StandardRevisionUpdate<>(dto, lastMod);
});
final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskDTO.getId());
@ -3649,6 +3651,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final StartVersionControlRequestEntity requestEntity) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
final VersionControlInformation currentVci = processGroup.getVersionControlInformation();
final int expectedVersion = currentVci == null ? 1 : currentVci.getVersion() + 1;
@ -3697,15 +3700,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final VersionControlInformationDTO vci = new VersionControlInformationDTO();
vci.setBucketId(bucket.getIdentifier());
vci.setBucketName(bucket.getName());
vci.setCurrent(true);
vci.setFlowId(flow.getIdentifier());
vci.setFlowName(flow.getName());
vci.setFlowDescription(flow.getDescription());
vci.setGroupId(groupId);
vci.setModified(false);
vci.setRegistryId(registryId);
vci.setRegistryName(getFlowRegistryName(registryId));
vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion());
vci.setState(VersionedFlowState.UP_TO_DATE.name());
final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup);
@ -3777,8 +3779,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare();
final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison,
diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED && diff.getDifferenceType() != DifferenceType.STYLE_CHANGED);
final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison);
final FlowComparisonEntity entity = new FlowComparisonEntity();
entity.setComponentDifferences(differenceDtos);
@ -4079,30 +4080,88 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return flowRegistry == null ? flowRegistryId : flowRegistry.getName();
}
@Override
public ProcessGroupEntity updateProcessGroup(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) {
private List<Revision> getComponentRevisions(final ProcessGroup processGroup, final boolean includeGroupRevision) {
final List<Revision> revisions = new ArrayList<>();
if (includeGroupRevision) {
revisions.add(revisionManager.getRevision(processGroup.getIdentifier()));
}
final NiFiUser user = NiFiUserUtils.getNiFiUser();
return updateProcessGroupContents(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified, true, updateDescendantVersionedFlows);
processGroup.findAllConnections().stream()
.map(component -> revisionManager.getRevision(component.getIdentifier()))
.forEach(revisions::add);
processGroup.findAllControllerServices().stream()
.map(component -> revisionManager.getRevision(component.getIdentifier()))
.forEach(revisions::add);
processGroup.findAllFunnels().stream()
.map(component -> revisionManager.getRevision(component.getIdentifier()))
.forEach(revisions::add);
processGroup.findAllInputPorts().stream()
.map(component -> revisionManager.getRevision(component.getIdentifier()))
.forEach(revisions::add);
processGroup.findAllOutputPorts().stream()
.map(component -> revisionManager.getRevision(component.getIdentifier()))
.forEach(revisions::add);
processGroup.findAllLabels().stream()
.map(component -> revisionManager.getRevision(component.getIdentifier()))
.forEach(revisions::add);
processGroup.findAllProcessGroups().stream()
.map(component -> revisionManager.getRevision(component.getIdentifier()))
.forEach(revisions::add);
processGroup.findAllProcessors().stream()
.map(component -> revisionManager.getRevision(component.getIdentifier()))
.forEach(revisions::add);
processGroup.findAllRemoteProcessGroups().stream()
.map(component -> revisionManager.getRevision(component.getIdentifier()))
.forEach(revisions::add);
processGroup.findAllRemoteProcessGroups().stream()
.flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
.map(component -> revisionManager.getRevision(component.getIdentifier()))
.forEach(revisions::add);
return revisions;
}
@Override
public ProcessGroupEntity updateProcessGroupContents(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId);
final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(user, revision,
processGroupNode,
() -> processGroupDAO.updateProcessGroupFlow(groupId, user, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows),
processGroup -> dtoFactory.createProcessGroupDto(processGroup));
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
final List<Revision> revisions = getComponentRevisions(processGroup, false);
revisions.add(revision);
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier()));
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier()));
final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions);
final RevisionUpdate<ProcessGroupDTO> revisionUpdate = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<ProcessGroupDTO>() {
@Override
public RevisionUpdate<ProcessGroupDTO> update() {
// update the Process Group
processGroupDAO.updateProcessGroupFlow(groupId, user, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows);
// update the revisions
final Set<Revision> updatedRevisions = revisions.stream()
.map(rev -> revisionManager.getRevision(rev.getComponentId()).incrementRevision(revision.getClientId()))
.collect(Collectors.toSet());
// save
controllerFacade.save();
// gather details for response
final ProcessGroupDTO dto = dtoFactory.createProcessGroupDto(processGroup);
final Revision updatedRevision = revisionManager.getRevision(groupId).incrementRevision(revision.getClientId());
final FlowModification lastModification = new FlowModification(updatedRevision, user.getIdentity());
return new StandardRevisionUpdate<>(dto, lastModification, updatedRevisions);
}
});
final FlowModification lastModification = revisionUpdate.getLastModification();
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(lastModification);
final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroup.getIdentifier()));
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroup.getIdentifier()));
final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities);
return entityFactory.createProcessGroupEntity(revisionUpdate.getComponent(), updatedRevision, permissions, status, bulletinEntities);
}
private AuthorizationResult authorizeAction(final Action action) {

View File

@ -16,12 +16,32 @@
*/
package org.apache.nifi.web.api;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLStreamReader;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizeAccess;
@ -47,6 +67,7 @@ import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
@ -111,31 +132,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLStreamReader;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@ -161,6 +157,13 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
/**
* RESTful endpoint for managing a Group.
*/
@ -1657,8 +1660,8 @@ public class ProcessGroupResource extends ApplicationResource {
versionControlInfo.setFlowDescription(flow.getDescription());
versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(versionControlInfo.getRegistryId()));
versionControlInfo.setModified(false);
versionControlInfo.setCurrent(flowSnapshot.isLatest());
final VersionedFlowState flowState = flowSnapshot.isLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
versionControlInfo.setState(flowState.name());
// Step 3: Resolve Bundle info
BundleUtils.discoverCompatibleBundles(flowSnapshot.getFlowContents());
@ -1689,8 +1692,12 @@ public class ProcessGroupResource extends ApplicationResource {
}
}
},
() -> {
},
() -> {
final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot();
if (versionedFlowSnapshot != null) {
serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents());
}
},
processGroupEntity -> {
final ProcessGroupDTO processGroup = processGroupEntity.getComponent();

View File

@ -40,6 +40,7 @@ import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.web.NiFiServiceFacade;
@ -166,14 +167,14 @@ public class VersionsResource extends ApplicationResource {
@POST
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.TEXT_PLAIN)
@Path("active-requests")
@ApiOperation(
value = "Creates a request so that a Process Group can be placed under Version Control or have its Version Control configuration changed. Creating this request will "
+ "prevent any other threads from simultaneously saving local changes to Version Control. It will not, however, actually save the local flow to the Flow Registry. A "
+ "POST to /versions/process-groups/{id} should be used to initiate saving of the local flow to the Flow Registry.",
response = VersionControlInformationEntity.class,
response = String.class,
notes = NON_GUARANTEED_ENDPOINT)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ -186,7 +187,7 @@ public class VersionsResource extends ApplicationResource {
@ApiParam(value = "The versioned flow details.", required = true) final CreateActiveRequestEntity requestEntity) {
if (isReplicateRequest()) {
return replicate(HttpMethod.POST);
return replicate(HttpMethod.POST, requestEntity);
}
if (requestEntity.getProcessGroupId() == null) {
@ -548,11 +549,14 @@ public class VersionsResource extends ApplicationResource {
final CreateActiveRequestEntity activeRequestEntity = new CreateActiveRequestEntity();
activeRequestEntity.setProcessGroupId(groupId);
final Map<String, String> headers = new HashMap<>();
headers.put("content-type", MediaType.APPLICATION_JSON);
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, activeRequestEntity, Collections.emptyMap()).awaitMergedResponse();
clusterResponse = getRequestReplicator().replicate(HttpMethod.POST, createRequestUri, activeRequestEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, activeRequestEntity, Collections.emptyMap()).awaitMergedResponse();
getClusterCoordinatorNode(), HttpMethod.POST, createRequestUri, activeRequestEntity, headers).awaitMergedResponse();
}
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
@ -761,18 +765,20 @@ public class VersionsResource extends ApplicationResource {
final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO();
versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier());
versionControlInfoDto.setBucketName(bucket.getName());
versionControlInfoDto.setCurrent(snapshotMetadata.getVersion() == flow.getVersionCount());
versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier());
versionControlInfoDto.setFlowName(flow.getName());
versionControlInfoDto.setFlowDescription(flow.getDescription());
versionControlInfoDto.setGroupId(groupId);
versionControlInfoDto.setModified(false);
versionControlInfoDto.setVersion(snapshotMetadata.getVersion());
versionControlInfoDto.setRegistryId(entity.getRegistryId());
versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(entity.getRegistryId()));
final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false,
entity.getUpdateDescendantVersionedFlows());
final VersionedFlowState flowState = snapshotMetadata.getVersion() == flow.getVersionCount() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
versionControlInfoDto.setState(flowState.name());
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroupContents(user, rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false,
true, entity.getUpdateDescendantVersionedFlows());
final VersionControlInformationDTO updatedVci = updatedGroup.getComponent().getVersionControlInformation();
final VersionControlInformationEntity responseEntity = new VersionControlInformationEntity();
@ -1103,7 +1109,7 @@ public class VersionsResource extends ApplicationResource {
affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, true, true);
vcur.markComplete(updatedVersionControlEntity);
} catch (final LifecycleManagementException e) {
} catch (final Exception e) {
logger.error("Failed to update flow to new version", e);
vcur.setFailureReason("Failed to update flow to new version due to " + e);
}
@ -1268,7 +1274,7 @@ public class VersionsResource extends ApplicationResource {
affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, false, true);
vcur.markComplete(updatedVersionControlEntity);
} catch (final LifecycleManagementException e) {
} catch (final Exception e) {
logger.error("Failed to update flow to new version", e);
vcur.setFailureReason("Failed to update flow to new version due to " + e.getMessage());
}
@ -1403,15 +1409,14 @@ public class VersionsResource extends ApplicationResource {
final VersionControlInformationDTO vci = new VersionControlInformationDTO();
vci.setBucketId(metadata.getBucketIdentifier());
vci.setBucketName(bucket.getName());
vci.setCurrent(flowSnapshot.isLatest());
vci.setFlowDescription(flow.getDescription());
vci.setFlowId(flow.getIdentifier());
vci.setFlowName(flow.getName());
vci.setGroupId(groupId);
vci.setModified(false);
vci.setRegistryId(requestVci.getRegistryId());
vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId()));
vci.setVersion(metadata.getVersion());
vci.setState(flowSnapshot.isLatest() ? VersionedFlowState.UP_TO_DATE.name() : VersionedFlowState.STALE.name());
serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false, updateDescendantVersionedFlows);
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.web.api.dto;
import javax.ws.rs.WebApplicationException;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@ -114,7 +116,6 @@ import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedFlowState;
@ -140,7 +141,6 @@ import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.FlowModification;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.action.ActionDTO;
@ -192,7 +192,6 @@ import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.revision.RevisionManager;
import javax.ws.rs.WebApplicationException;
import java.text.Collator;
import java.util.ArrayList;
import java.util.Arrays;
@ -215,7 +214,6 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -234,8 +232,6 @@ public final class DtoFactory {
private ControllerServiceProvider controllerServiceProvider;
private EntityFactory entityFactory;
private Authorizer authorizer;
private NiFiProperties properties;
private FlowRegistryClient flowRegistryClient;
public ControllerConfigurationDTO createControllerConfigurationDto(final ControllerFacade controllerFacade) {
final ControllerConfigurationDTO dto = new ControllerConfigurationDTO();
@ -2190,23 +2186,17 @@ public final class DtoFactory {
public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison) {
return createComponentDifferenceDtos(comparison, null);
}
public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison, final Predicate<FlowDifference> filter) {
final Map<ComponentDifferenceDTO, List<DifferenceDTO>> differencesByComponent = new HashMap<>();
for (final FlowDifference difference : comparison.getDifferences()) {
if (filter == null || filter.test(difference)) {
final ComponentDifferenceDTO componentDiff = createComponentDifference(difference);
final List<DifferenceDTO> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>());
final ComponentDifferenceDTO componentDiff = createComponentDifference(difference);
final List<DifferenceDTO> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>());
final DifferenceDTO dto = new DifferenceDTO();
dto.setDifferenceType(difference.getDifferenceType().getDescription());
dto.setDifference(difference.getDescription());
final DifferenceDTO dto = new DifferenceDTO();
dto.setDifferenceType(difference.getDifferenceType().getDescription());
dto.setDifference(difference.getDescription());
differences.add(dto);
}
differences.add(dto);
}
for (final Map.Entry<ComponentDifferenceDTO, List<DifferenceDTO>> entry : differencesByComponent.entrySet()) {
@ -2259,8 +2249,6 @@ public final class DtoFactory {
dto.setFlowName(versionControlInfo.getFlowName());
dto.setFlowDescription(versionControlInfo.getFlowDescription());
dto.setVersion(versionControlInfo.getVersion());
dto.setCurrent(versionControlInfo.isCurrent());
dto.setModified(versionControlInfo.isModified());
final VersionedFlowStatus status = versionControlInfo.getStatus();
final VersionedFlowState state = status.getState();
@ -3501,8 +3489,6 @@ public final class DtoFactory {
copy.setFlowName(original.getFlowName());
copy.setFlowDescription(original.getFlowDescription());
copy.setVersion(original.getVersion());
copy.setCurrent(original.getCurrent());
copy.setModified(original.getModified());
copy.setState(original.getState());
copy.setStateExplanation(original.getStateExplanation());
return copy;
@ -3833,12 +3819,4 @@ public final class DtoFactory {
public void setBulletinRepository(BulletinRepository bulletinRepository) {
this.bulletinRepository = bulletinRepository;
}
public void setProperties(final NiFiProperties properties) {
this.properties = properties;
}
public void setFlowRegistryClient(FlowRegistryClient flowRegistryClient) {
this.flowRegistryClient = flowRegistryClient;
}
}

View File

@ -26,6 +26,7 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
@ -234,6 +235,10 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
}
if (isNotNull(processGroupDTO.getPosition())) {
group.setPosition(new Position(processGroupDTO.getPosition().getX(), processGroupDTO.getPosition().getY()));
final ProcessGroup parent = group.getParent();
if (parent != null) {
parent.onComponentModified();
}
}
if (isNotNull(comments)) {
group.setComments(comments);
@ -258,8 +263,6 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.registryName(registryName)
.flowSnapshot(flowSnapshot)
.modified(false)
.current(true)
.build();
group.setVersionControlInformation(vci, versionedComponentMapping);
@ -281,6 +284,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final ProcessGroup group = locateProcessGroup(flowController, groupId);
group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows);
group.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.flowSnapshot(proposedSnapshot.getFlowContents())

View File

@ -448,7 +448,10 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
}
}
remoteProcessGroup.getProcessGroup().onComponentModified();
final ProcessGroup group = remoteProcessGroup.getProcessGroup();
if (group != null) {
group.onComponentModified();
}
return remoteProcessGroup;
}

View File

@ -52,8 +52,6 @@
<property name="entityFactory" ref="entityFactory"/>
<property name="authorizer" ref="authorizer"/>
<property name="bulletinRepository" ref="bulletinRepository"/>
<property name="properties" ref="nifiProperties"/>
<property name="flowRegistryClient" ref="flowRegistryClient" />
</bean>
<!-- snippet utils -->