NIFI-6873: Added support for replacing a process group

- decoupled flow update request behavior from VersionsResource into new abstract FlowUpdateResource
 - added replace process group functionality in ProcessGroupResource
 - parameterized FlowUpdateResource and created entity hierarchies to allow for maximum code sharing across different update types
 - refactored flow update methods to make use of commonality across different update types whenever possible
 - fixed issues in StandardProcessGroup verify update methods where same components existed in different ancestry chains but were considered a match when they shouldn't be
 - improved StandardProcessGroup to properly match up components on update using generated versioned component ids, when necessary to allow for update flow to efficiently match common components on flow import

This closes #4023.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Joe Ferner 2019-12-20 12:57:36 -05:00 committed by Mark Payne
parent a80b2475f7
commit 62606ff89a
25 changed files with 1758 additions and 1111 deletions

View File

@ -23,7 +23,8 @@ public interface VersionedComponent {
/**
* @return the unique identifier that maps this component to a component that is versioned
* in a Flow Registry, or <code>Optional.empty</code> if this component has not been saved to a Flow Registry.
* in a Flow Registry or has been imported, or <code>Optional.empty</code> if this component has not
* been saved to a Flow Registry or imported.
*/
Optional<String> getVersionedComponentId();

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.web.api.dto.util.TimestampAdapter;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import java.util.Date;
public abstract class FlowUpdateRequestDTO {
protected String requestId;
protected String processGroupId;
protected String uri;
protected Date lastUpdated;
protected boolean complete = false;
protected String failureReason;
protected int percentCompleted;
protected String state;
@ApiModelProperty("The unique ID of the Process Group being updated")
public String getProcessGroupId() {
return processGroupId;
}
public void setProcessGroupId(String processGroupId) {
this.processGroupId = processGroupId;
}
@ApiModelProperty(value = "The unique ID of this request.", readOnly = true)
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
@ApiModelProperty(value = "The URI for future requests to this drop request.", readOnly = true)
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
@XmlJavaTypeAdapter(TimestampAdapter.class)
@ApiModelProperty(value = "The last time this request was updated.", dataType = "string", readOnly = true)
public Date getLastUpdated() {
return lastUpdated;
}
public void setLastUpdated(Date lastUpdated) {
this.lastUpdated = lastUpdated;
}
@ApiModelProperty(value = "Whether or not this request has completed", readOnly = true)
public boolean isComplete() {
return complete;
}
public void setComplete(boolean complete) {
this.complete = complete;
}
@ApiModelProperty(value = "An explanation of why this request failed, or null if this request has not failed", readOnly = true)
public String getFailureReason() {
return failureReason;
}
public void setFailureReason(String reason) {
this.failureReason = reason;
}
@ApiModelProperty(value = "The state of the request", readOnly = true)
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
@ApiModelProperty(value = "The percentage complete for the request, between 0 and 100", readOnly = true)
public int getPercentCompleted() {
return percentCompleted;
}
public void setPercentCompleted(int percentCompleted) {
this.percentCompleted = percentCompleted;
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;
import javax.xml.bind.annotation.XmlType;
@XmlType(name = "processGroupReplaceRequest")
public class ProcessGroupReplaceRequestDTO extends FlowUpdateRequestDTO {
}

View File

@ -18,79 +18,13 @@
package org.apache.nifi.web.api.dto;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.web.api.dto.util.TimestampAdapter;
import javax.xml.bind.annotation.XmlType;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import java.util.Date;
@XmlType(name = "versionedFlowUpdateRequest")
public class VersionedFlowUpdateRequestDTO {
private String requestId;
private String processGroupId;
private String uri;
private Date lastUpdated;
private boolean complete = false;
private String failureReason;
private int percentCompleted;
private String state;
public class VersionedFlowUpdateRequestDTO extends FlowUpdateRequestDTO {
private VersionControlInformationDTO versionControlInformation;
@ApiModelProperty("The unique ID of the Process Group that the variable registry belongs to")
public String getProcessGroupId() {
return processGroupId;
}
public void setProcessGroupId(String processGroupId) {
this.processGroupId = processGroupId;
}
@ApiModelProperty(value = "The unique ID of this request.", readOnly = true)
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
@ApiModelProperty(value = "The URI for future requests to this drop request.", readOnly = true)
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
@XmlJavaTypeAdapter(TimestampAdapter.class)
@ApiModelProperty(value = "The last time this request was updated.", dataType = "string", readOnly = true)
public Date getLastUpdated() {
return lastUpdated;
}
public void setLastUpdated(Date lastUpdated) {
this.lastUpdated = lastUpdated;
}
@ApiModelProperty(value = "Whether or not this request has completed", readOnly = true)
public boolean isComplete() {
return complete;
}
public void setComplete(boolean complete) {
this.complete = complete;
}
@ApiModelProperty(value = "An explanation of why this request failed, or null if this request has not failed", readOnly = true)
public String getFailureReason() {
return failureReason;
}
public void setFailureReason(String reason) {
this.failureReason = reason;
}
@ApiModelProperty(value = "The VersionControlInformation that describes where the Versioned Flow is located; this may not be populated until the request is completed.", readOnly = true)
public VersionControlInformationDTO getVersionControlInformation() {
return versionControlInformation;
@ -99,22 +33,4 @@ public class VersionedFlowUpdateRequestDTO {
public void setVersionControlInformation(VersionControlInformationDTO versionControlInformation) {
this.versionControlInformation = versionControlInformation;
}
@ApiModelProperty(value = "The state of the request", readOnly = true)
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
@ApiModelProperty(value = "The percentage complete for the request, between 0 and 100", readOnly = true)
public int getPercentCompleted() {
return percentCompleted;
}
public void setPercentCompleted(int percentCompleted) {
this.percentCompleted = percentCompleted;
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.web.api.dto.FlowUpdateRequestDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
public abstract class FlowUpdateRequestEntity<T extends FlowUpdateRequestDTO> extends Entity {
protected RevisionDTO processGroupRevision;
protected T request;
@ApiModelProperty("The revision for the Process Group being updated.")
public RevisionDTO getProcessGroupRevision() {
return processGroupRevision;
}
public void setProcessGroupRevision(RevisionDTO revision) {
this.processGroupRevision = revision;
}
@ApiModelProperty("The Process Group Update Request")
public abstract T getRequest();
public abstract void setRequest(T request);
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.web.api.dto.RevisionDTO;
/**
* Common abstract entity shared by VersionControlInformationEntity and ProcessGroupImportEntity for
* generically processing/replicating process group update requests
*/
public abstract class ProcessGroupDescriptorEntity extends Entity {
private RevisionDTO processGroupRevision;
private Boolean disconnectedNodeAcknowledged;
@ApiModelProperty("The Revision for the Process Group")
public RevisionDTO getProcessGroupRevision() {
return processGroupRevision;
}
public void setProcessGroupRevision(RevisionDTO revision) {
this.processGroupRevision = revision;
}
@ApiModelProperty(
value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed."
)
public Boolean isDisconnectedNodeAcknowledged() {
return disconnectedNodeAcknowledged;
}
public void setDisconnectedNodeAcknowledged(Boolean disconnectedNodeAcknowledged) {
this.disconnectedNodeAcknowledged = disconnectedNodeAcknowledged;
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Entity for importing a process group that has been previously downloaded
*/
@XmlRootElement(name = "processGroupImportEntity")
public class ProcessGroupImportEntity extends ProcessGroupDescriptorEntity {
private VersionedFlowSnapshot versionedFlowSnapshot;
@ApiModelProperty("The Versioned Flow Snapshot to import")
public VersionedFlowSnapshot getVersionedFlowSnapshot() {
return versionedFlowSnapshot;
}
public void setVersionedFlowSnapshot(VersionedFlowSnapshot versionedFlowSnapshot) {
this.versionedFlowSnapshot = versionedFlowSnapshot;
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.web.api.dto.ProcessGroupReplaceRequestDTO;
import javax.xml.bind.annotation.XmlRootElement;
/**
* Entity for capturing the status of a Process Group replace request
*/
@XmlRootElement(name = "processGroupReplaceRequestEntity")
public class ProcessGroupReplaceRequestEntity extends FlowUpdateRequestEntity<ProcessGroupReplaceRequestDTO> {
private VersionedFlowSnapshot versionedFlowSnapshot;
@ApiModelProperty(value = "Returns the Versioned Flow to replace with", readOnly = true)
public VersionedFlowSnapshot getVersionedFlowSnapshot() {
return versionedFlowSnapshot;
}
public void setVersionedFlowSnapshot(VersionedFlowSnapshot versionedFlowSnapshot) {
this.versionedFlowSnapshot = versionedFlowSnapshot;
}
@ApiModelProperty("The Process Group Change Request")
@Override
public ProcessGroupReplaceRequestDTO getRequest() {
if (request == null) {
request = new ProcessGroupReplaceRequestDTO();
}
return request;
}
public void setRequest(ProcessGroupReplaceRequestDTO request) {
this.request = request;
}
}

View File

@ -18,16 +18,13 @@
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "versionControlInformationEntity")
public class VersionControlInformationEntity extends Entity {
public class VersionControlInformationEntity extends ProcessGroupDescriptorEntity {
private VersionControlInformationDTO versionControlInformation;
private RevisionDTO processGroupRevision;
private Boolean disconnectedNodeAcknowledged;
@ApiModelProperty("The Version Control information")
public VersionControlInformationDTO getVersionControlInformation() {
@ -37,24 +34,4 @@ public class VersionControlInformationEntity extends Entity {
public void setVersionControlInformation(VersionControlInformationDTO versionControlDto) {
this.versionControlInformation = versionControlDto;
}
@ApiModelProperty("The Revision for the Process Group")
public RevisionDTO getProcessGroupRevision() {
return processGroupRevision;
}
public void setProcessGroupRevision(RevisionDTO revision) {
this.processGroupRevision = revision;
}
@ApiModelProperty(
value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed."
)
public Boolean isDisconnectedNodeAcknowledged() {
return disconnectedNodeAcknowledged;
}
public void setDisconnectedNodeAcknowledged(Boolean disconnectedNodeAcknowledged) {
this.disconnectedNodeAcknowledged = disconnectedNodeAcknowledged;
}
}

View File

@ -18,27 +18,18 @@
package org.apache.nifi.web.api.entity;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.VersionedFlowUpdateRequestDTO;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement(name = "versionedFlowUpdateRequestEntity")
public class VersionedFlowUpdateRequestEntity extends Entity {
private VersionedFlowUpdateRequestDTO request;
private RevisionDTO processGroupRevision;
public class VersionedFlowUpdateRequestEntity extends FlowUpdateRequestEntity<VersionedFlowUpdateRequestDTO> {
@ApiModelProperty("The revision for the Process Group that owns this variable registry.")
public RevisionDTO getProcessGroupRevision() {
return processGroupRevision;
}
public void setProcessGroupRevision(RevisionDTO revision) {
this.processGroupRevision = revision;
}
@ApiModelProperty("The Versioned Flow Update Request")
@ApiModelProperty("The Flow Update Request")
public VersionedFlowUpdateRequestDTO getRequest() {
if (request == null) {
request = new VersionedFlowUpdateRequestDTO();
}
return request;
}

View File

@ -491,6 +491,13 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
*/
Funnel findFunnel(String id);
/**
* Gets a collection of identifiers representing all ancestor controller services
*
* @return collection of ancestor controller service identifiers
*/
Set<String> getAncestorServiceIds();
/**
* @param id of the Controller Service
* @param includeDescendantGroups whether or not to include descendant process groups
@ -881,12 +888,13 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
void verifyCanUpdateVariables(Map<String, String> updatedVariables);
/**
* Ensures that the contents of the Process Group can be update to match the given new flow
* Ensures that the contents of the Process Group can be updated to match the given new flow
*
* @param updatedFlow the updated version of the flow
* @param updatedFlow the proposed updated flow
* @param verifyConnectionRemoval whether or not to verify that connections that are not present in the updated flow can be removed
* @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If <code>true</code> and the Process Group has been changed since
* it was last synchronized with the FlowRegistry, then this method will throw an IllegalStateException
* @param verifyNotDirty for versioned flows only, whether or not to verify that the Process Group is not 'dirty'. If <code>true</code>
* and the Process Group has been changed since it was last synchronized with the FlowRegistry, then this method will throw
* an IllegalStateException
*
* @throws IllegalStateException if the Process Group is not in a state that will allow the update
*/

View File

@ -2054,7 +2054,7 @@ public final class StandardProcessGroup implements ProcessGroup {
return findAllControllerServices(this);
}
public Set<ControllerServiceNode> findAllControllerServices(ProcessGroup start) {
private Set<ControllerServiceNode> findAllControllerServices(ProcessGroup start) {
final Set<ControllerServiceNode> services = start.getControllerServices(false);
for (final ProcessGroup group : start.getProcessGroups()) {
services.addAll(findAllControllerServices(group));
@ -3534,10 +3534,10 @@ public final class StandardProcessGroup implements ProcessGroup {
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), true);
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents());
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Current Flow", versionedGroup);
final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("New Flow", proposedSnapshot.getFlowContents());
final FlowComparator flowComparator = new StandardFlowComparator(remoteFlow, localFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor());
final FlowComparator flowComparator = new StandardFlowComparator(proposedFlow, localFlow, getAncestorServiceIds(), new StaticDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare();
final Set<String> updatedVersionedComponentIds = new HashSet<>();
@ -3581,7 +3581,11 @@ public final class StandardProcessGroup implements ProcessGroup {
.map(FlowDifference::toString)
.collect(Collectors.joining("\n"));
LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshot, flowComparison.getDifferences().size(), differencesByLine);
// TODO: Until we move to NiFi Registry 0.6.0, avoid using proposedSnapshot.toString() because it throws a NullPointerException
final String proposedSnapshotDetails = "VersionedFlowSnapshot[flowContentsId=" + proposedSnapshot.getFlowContents().getIdentifier()
+ ", flowContentsName=" + proposedSnapshot.getFlowContents().getName() + ", NoMetadataAvailable]";
LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshotDetails,
flowComparison.getDifferences().size(), differencesByLine);
}
final Set<String> knownVariables = getKnownVariableNames();
@ -3610,25 +3614,20 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
private Set<String> getAncestorGroupServiceIds() {
@Override
public Set<String> getAncestorServiceIds() {
final Set<String> ancestorServiceIds;
ProcessGroup parentGroup = getParent();
if (parentGroup == null) {
ancestorServiceIds = Collections.emptySet();
} else {
// We want to map the Controller Service to its Versioned Component ID, if it has one.
// If it does not have one, we want to generate it in the same way that our Flow Mapper does
// because this allows us to find the Controller Service when doing a Flow Diff.
ancestorServiceIds = parentGroup.getControllerServices(true).stream()
.map(cs -> {
// We want to map the Controller Service to its Versioned Component ID, if it has one.
// If it does not have one, we want to generate it in the same way that our Flow Mapper does
// because this allows us to find the Controller Service when doing a Flow Diff.
final Optional<String> versionedId = cs.getVersionedComponentId();
if (versionedId.isPresent()) {
return versionedId.get();
}
return UUID.nameUUIDFromBytes(cs.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
})
.map(cs -> cs.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(cs.getIdentifier())))
.collect(Collectors.toSet());
}
@ -3641,7 +3640,9 @@ public final class StandardProcessGroup implements ProcessGroup {
}
for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) {
if (serviceNode.getVersionedComponentId().isPresent() && serviceNode.getVersionedComponentId().get().equals(versionedComponentId)) {
final String serviceNodeVersionedComponentId = serviceNode.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(serviceNode.getIdentifier()));
if (serviceNodeVersionedComponentId.equals(versionedComponentId)) {
return serviceNode;
}
}
@ -3729,7 +3730,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// Controller Service. This way, we ensure that all services have been created before setting the properties. This allows us to
// properly obtain the correct mapping of Controller Service VersionedComponentID to Controller Service instance id.
final Map<String, ControllerServiceNode> servicesByVersionedId = group.getControllerServices(false).stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
final Set<String> controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet());
@ -3775,7 +3777,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// Child groups
final Map<String, ProcessGroup> childGroupsByVersionedId = group.getProcessGroups().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
final Set<String> childGroupsRemoved = new HashSet<>(childGroupsByVersionedId.keySet());
for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
@ -3805,7 +3808,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// Funnels
final Map<String, Funnel> funnelsByVersionedId = group.getFunnels().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
final Set<String> funnelsRemoved = new HashSet<>(funnelsByVersionedId.keySet());
for (final VersionedFunnel proposedFunnel : proposed.getFunnels()) {
@ -3827,7 +3831,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// Input Ports
final Map<String, Port> inputPortsByVersionedId = group.getInputPorts().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
final Set<String> inputPortsRemoved = new HashSet<>(inputPortsByVersionedId.keySet());
for (final VersionedPort proposedPort : proposed.getInputPorts()) {
@ -3852,7 +3857,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// Output Ports
final Map<String, Port> outputPortsByVersionedId = group.getOutputPorts().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
final Set<String> outputPortsRemoved = new HashSet<>(outputPortsByVersionedId.keySet());
for (final VersionedPort proposedPort : proposed.getOutputPorts()) {
@ -3878,7 +3884,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// Labels
final Map<String, Label> labelsByVersionedId = group.getLabels().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
final Set<String> labelsRemoved = new HashSet<>(labelsByVersionedId.keySet());
for (final VersionedLabel proposedLabel : proposed.getLabels()) {
@ -3899,7 +3906,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// Processors
final Map<String, ProcessorNode> processorsByVersionedId = group.getProcessors().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
final Set<String> processorsRemoved = new HashSet<>(processorsByVersionedId.keySet());
final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships = new HashMap<>();
@ -3938,7 +3946,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// Remote Groups
final Map<String, RemoteProcessGroup> rpgsByVersionedId = group.getRemoteProcessGroups().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
final Set<String> rpgsRemoved = new HashSet<>(rpgsByVersionedId.keySet());
for (final VersionedRemoteProcessGroup proposedRpg : proposed.getRemoteProcessGroups()) {
@ -3959,7 +3968,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// Connections
final Map<String, Connection> connectionsByVersionedId = group.getConnections().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())), Function.identity()));
final Set<String> connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet());
for (final VersionedConnection proposedConnection : proposed.getConnections()) {
@ -4336,14 +4346,14 @@ public final class StandardProcessGroup implements ProcessGroup {
switch (connectableComponent.getType()) {
case FUNNEL:
return group.getFunnels().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.filter(component -> id.equals(component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
.findAny()
.orElse(null);
case INPUT_PORT: {
final Optional<Port> port = group.getInputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.filter(component -> id.equals(component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
.findAny();
if (port.isPresent()) {
@ -4352,15 +4362,15 @@ public final class StandardProcessGroup implements ProcessGroup {
// Attempt to locate child group by versioned component id
final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream()
.filter(child -> child.getVersionedComponentId().isPresent())
.filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId()))
.filter(child -> child.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(child.getIdentifier())).equals(connectableComponent.getGroupId()))
.findFirst();
if (optionalSpecifiedGroup.isPresent()) {
final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get();
return specifiedGroup.getInputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.filter(component -> id.equals(component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
.findAny()
.orElse(null);
}
@ -4370,15 +4380,15 @@ public final class StandardProcessGroup implements ProcessGroup {
// if the flow doesn't contain the properly mapped group id, we need to search all child groups.
return group.getProcessGroups().stream()
.flatMap(gr -> gr.getInputPorts().stream())
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.filter(component -> id.equals(component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
.findAny()
.orElse(null);
}
case OUTPUT_PORT: {
final Optional<Port> port = group.getOutputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.filter(component -> id.equals(component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
.findAny();
if (port.isPresent()) {
@ -4387,15 +4397,15 @@ public final class StandardProcessGroup implements ProcessGroup {
// Attempt to locate child group by versioned component id
final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream()
.filter(child -> child.getVersionedComponentId().isPresent())
.filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId()))
.filter(child -> child.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(child.getIdentifier())).equals(connectableComponent.getGroupId()))
.findFirst();
if (optionalSpecifiedGroup.isPresent()) {
final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get();
return specifiedGroup.getOutputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.filter(component -> id.equals(component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
.findAny()
.orElse(null);
}
@ -4405,22 +4415,22 @@ public final class StandardProcessGroup implements ProcessGroup {
// if the flow doesn't contain the properly mapped group id, we need to search all child groups.
return group.getProcessGroups().stream()
.flatMap(gr -> gr.getOutputPorts().stream())
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.filter(component -> id.equals(component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
.findAny()
.orElse(null);
}
case PROCESSOR:
return group.getProcessors().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.filter(component -> id.equals(component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
.findAny()
.orElse(null);
case REMOTE_INPUT_PORT: {
final String rpgId = connectableComponent.getGroupId();
final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> rpgId.equals(component.getVersionedComponentId().get()))
.filter(component -> rpgId.equals(component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
.findAny();
if (!rpgOption.isPresent()) {
@ -4430,8 +4440,8 @@ public final class StandardProcessGroup implements ProcessGroup {
final RemoteProcessGroup rpg = rpgOption.get();
final Optional<RemoteGroupPort> portByIdOption = rpg.getInputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.filter(component -> id.equals(component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
.findAny();
if (portByIdOption.isPresent()) {
@ -4446,8 +4456,8 @@ public final class StandardProcessGroup implements ProcessGroup {
case REMOTE_OUTPUT_PORT: {
final String rpgId = connectableComponent.getGroupId();
final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> rpgId.equals(component.getVersionedComponentId().get()))
.filter(component -> rpgId.equals(component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
.findAny();
if (!rpgOption.isPresent()) {
@ -4457,8 +4467,8 @@ public final class StandardProcessGroup implements ProcessGroup {
final RemoteProcessGroup rpg = rpgOption.get();
final Optional<RemoteGroupPort> portByIdOption = rpg.getOutputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.filter(component -> id.equals(component.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier()))))
.findAny();
if (portByIdOption.isPresent()) {
@ -4738,8 +4748,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) {
for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) {
final Optional<String> optionalVersionedId = serviceNode.getVersionedComponentId();
final String versionedId = optionalVersionedId.orElseGet(() -> UUID.nameUUIDFromBytes(serviceNode.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString());
final String versionedId = serviceNode.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(serviceNode.getIdentifier()));
if (versionedId.equals(serviceVersionedComponentId)) {
return serviceNode.getIdentifier();
}
@ -4832,7 +4842,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());
final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorGroupServiceIds(), new EvolvingDifferenceDescriptor());
final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorServiceIds(), new EvolvingDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare();
final Set<FlowDifference> differences = comparison.getDifferences().stream()
.filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
@ -4853,6 +4863,7 @@ public final class StandardProcessGroup implements ProcessGroup {
public void verifyCanUpdate(final VersionedFlowSnapshot updatedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
readLock.lock();
try {
// flow id match and not dirty check concepts are only applicable to versioned flows
final VersionControlInformation versionControlInfo = getVersionControlInformation();
if (versionControlInfo != null) {
if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
@ -4885,40 +4896,18 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final VersionedProcessGroup flowContents = updatedFlow.getFlowContents();
if (verifyConnectionRemoval) {
// Determine which Connections have been removed.
final Map<String, Connection> removedConnectionByVersionedId = new HashMap<>();
// Populate the 'removedConnectionByVersionId' map with all Connections. We key off of the connection's VersionedComponentID
// if it is populated. Otherwise, we key off of its actual ID. We do this because it allows us to then remove from this Map
// any connection that does exist in the proposed flow. This results in us having a Map whose values are those Connections
// that were removed. We can then check for any connections that have data in them. If any Connection is to be removed but
// has data, then we should throw an IllegalStateException.
findAllConnections().forEach(conn -> removedConnectionByVersionedId.put(conn.getVersionedComponentId().orElse(conn.getIdentifier()), conn));
final Set<String> proposedFlowConnectionIds = new HashSet<>();
findAllConnectionIds(flowContents, proposedFlowConnectionIds);
for (final String proposedConnectionId : proposedFlowConnectionIds) {
removedConnectionByVersionedId.remove(proposedConnectionId);
}
// If any connection that was removed has data in it, throw an IllegalStateException
for (final Connection connection : removedConnectionByVersionedId.values()) {
final FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
if (!flowFileQueue.isEmpty()) {
throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the "
+ "proposed version does not contain "
+ connection + " and the connection currently has data in the queue.");
}
}
}
// Ensure no deleted child process groups contain templates and optionally no deleted connections contain data
// in their queue. Note that this check enforces ancestry among the group components to avoid a scenario where
// a component is matched by id, but it does not exist in the same hierarchy and thus will be removed and
// re-added when the update is performed
verifyCanRemoveMissingComponents(this, flowContents, verifyConnectionRemoval);
// Determine which input ports were removed from this process group
final Map<String, Port> removedInputPortsByVersionId = new HashMap<>();
getInputPorts().stream()
.filter(port -> port.getVersionedComponentId().isPresent())
.forEach(port -> removedInputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
.forEach(port -> removedInputPortsByVersionId.put(port.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(port.getIdentifier())), port));
flowContents.getInputPorts().stream()
.map(VersionedPort::getIdentifier)
.forEach(removedInputPortsByVersionId::remove);
@ -4927,16 +4916,16 @@ public final class StandardProcessGroup implements ProcessGroup {
for (final Port inputPort : removedInputPortsByVersionId.values()) {
final List<Connection> incomingConnections = inputPort.getIncomingConnections();
if (!incomingConnections.isEmpty()) {
throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the proposed version does not contain the Input Port "
+ inputPort + " and the Input Port currently has an incoming connections");
throw new IllegalStateException(this + " cannot be updated to the proposed flow because the proposed flow "
+ "does not contain the Input Port " + inputPort + " and the Input Port currently has an incoming connection");
}
}
// Determine which output ports were removed from this process group
final Map<String, Port> removedOutputPortsByVersionId = new HashMap<>();
getOutputPorts().stream()
.filter(port -> port.getVersionedComponentId().isPresent())
.forEach(port -> removedOutputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
.forEach(port -> removedOutputPortsByVersionId.put(port.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(port.getIdentifier())), port));
flowContents.getOutputPorts().stream()
.map(VersionedPort::getIdentifier)
.forEach(removedOutputPortsByVersionId::remove);
@ -4945,42 +4934,18 @@ public final class StandardProcessGroup implements ProcessGroup {
for (final Port outputPort : removedOutputPortsByVersionId.values()) {
final Set<Connection> outgoingConnections = outputPort.getConnections();
if (!outgoingConnections.isEmpty()) {
throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the proposed version does not contain the Output Port "
+ outputPort + " and the Output Port currently has an outgoing connections");
}
}
// Find any Process Groups that may have been deleted. If we find any Process Group that was deleted, and that Process Group
// has Templates, then we fail because the Templates have to be removed first.
final Map<String, VersionedProcessGroup> proposedProcessGroups = new HashMap<>();
findAllProcessGroups(updatedFlow.getFlowContents(), proposedProcessGroups);
for (final ProcessGroup childGroup : findAllProcessGroups()) {
if (childGroup.getTemplates().isEmpty()) {
continue;
}
final Optional<String> versionedIdOption = childGroup.getVersionedComponentId();
if (!versionedIdOption.isPresent()) {
continue;
}
final String versionedId = versionedIdOption.get();
if (!proposedProcessGroups.containsKey(versionedId)) {
// Process Group was removed.
throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the child " + childGroup
+ " that exists locally has one or more Templates, and the proposed flow does not contain these templates. "
+ "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before attempting to change the version of the flow.");
throw new IllegalStateException(this + " cannot be updated to the proposed flow because the proposed flow "
+ "does not contain the Output Port " + outputPort + " and the Output Port currently has an outgoing connection");
}
}
// Ensure that all Processors are instantiable
final Map<String, VersionedProcessor> proposedProcessors = new HashMap<>();
findAllProcessors(updatedFlow.getFlowContents(), proposedProcessors);
findAllProcessors(flowContents, proposedProcessors);
findAllProcessors().stream()
.filter(proc -> proc.getVersionedComponentId().isPresent())
.forEach(proc -> proposedProcessors.remove(proc.getVersionedComponentId().get()));
.forEach(proc -> proposedProcessors.remove(proc.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(proc.getIdentifier()))));
for (final VersionedProcessor processorToAdd : proposedProcessors.values()) {
final String processorToAddClass = processorToAdd.getType();
@ -4996,11 +4961,11 @@ public final class StandardProcessGroup implements ProcessGroup {
// Ensure that all Controller Services are instantiable
final Map<String, VersionedControllerService> proposedServices = new HashMap<>();
findAllControllerServices(updatedFlow.getFlowContents(), proposedServices);
findAllControllerServices(flowContents, proposedServices);
findAllControllerServices().stream()
.filter(service -> service.getVersionedComponentId().isPresent())
.forEach(service -> proposedServices.remove(service.getVersionedComponentId().get()));
.forEach(service -> proposedServices.remove(service.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(service.getIdentifier()))));
for (final VersionedControllerService serviceToAdd : proposedServices.values()) {
final String serviceToAddClass = serviceToAdd.getType();
@ -5015,12 +4980,15 @@ public final class StandardProcessGroup implements ProcessGroup {
}
// Ensure that all Prioritizers are instantiate-able and that any load balancing configuration is correct
// Enforcing ancestry on connection matching here is not important because all we're interested in is locating
// new prioritizers and load balance strategy types so if a matching connection existed anywhere in the current
// flow, then its prioritizer and load balance strategy are already validated
final Map<String, VersionedConnection> proposedConnections = new HashMap<>();
findAllConnections(updatedFlow.getFlowContents(), proposedConnections);
findAllConnections(flowContents, proposedConnections);
findAllConnections().stream()
.filter(conn -> conn.getVersionedComponentId().isPresent())
.forEach(conn -> proposedConnections.remove(conn.getVersionedComponentId().get()));
.forEach(conn -> proposedConnections.remove(conn.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(conn.getIdentifier()))));
for (final VersionedConnection connectionToAdd : proposedConnections.values()) {
if (connectionToAdd.getPrioritizers() != null) {
@ -5048,16 +5016,6 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
private void findAllConnectionIds(final VersionedProcessGroup group, final Set<String> ids) {
for (final VersionedConnection connection : group.getConnections()) {
ids.add(connection.getIdentifier());
}
for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
findAllConnectionIds(childGroup, ids);
}
}
private void findAllProcessors(final VersionedProcessGroup group, final Map<String, VersionedProcessor> map) {
for (final VersionedProcessor processor : group.getProcessors()) {
map.put(processor.getIdentifier(), processor);
@ -5088,11 +5046,67 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
private void findAllProcessGroups(final VersionedProcessGroup group, final Map<String, VersionedProcessGroup> map) {
map.put(group.getIdentifier(), group);
/**
* Match components of the given process group to the proposed versioned process group and verify missing components
* are in a state that they can be safely removed. Specifically, check for removed child process groups and descendants.
* Disallow removal of groups with attached templates. Optionally also check for removed connections with data in their
* queue, either because the connections were removed from a matched process group or their group itself was removed.
*
* @param processGroup the current process group to examine
* @param proposedGroup the proposed versioned process group to match with
* @param verifyConnectionRemoval whether or not to verify that connections that are not present in the proposed flow can be removed
*/
private void verifyCanRemoveMissingComponents(final ProcessGroup processGroup, final VersionedProcessGroup proposedGroup,
final boolean verifyConnectionRemoval) {
if (verifyConnectionRemoval) {
final Map<String, VersionedConnection> proposedConnectionsByVersionedId = proposedGroup.getConnections().stream()
.collect(Collectors.toMap(component -> component.getIdentifier(), Function.identity()));
for (final VersionedProcessGroup child : group.getProcessGroups()) {
findAllProcessGroups(child, map);
// match group's current connections to proposed connections to determine if they've been removed
for (final Connection connection : processGroup.getConnections()) {
final String versionedId = connection.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(connection.getIdentifier()));
final VersionedConnection proposedConnection = proposedConnectionsByVersionedId.get(versionedId);
if (proposedConnection == null) {
// connection doesn't exist in proposed connections, make sure it doesn't have any data in it
final FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
if (!flowFileQueue.isEmpty()) {
throw new IllegalStateException(this + " cannot be updated to the proposed flow because the proposed flow "
+ "does not contain a match for " + connection + " and the connection currently has data in the queue.");
}
}
}
}
final Map<String, VersionedProcessGroup> proposedGroupsByVersionedId = proposedGroup.getProcessGroups().stream()
.collect(Collectors.toMap(component -> component.getIdentifier(), Function.identity()));
// match current child groups to proposed child groups to determine if they've been removed
for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
final String versionedId = childGroup.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(childGroup.getIdentifier()));
final VersionedProcessGroup proposedChildGroup = proposedGroupsByVersionedId.get(versionedId);
if (proposedChildGroup == null) {
// child group will be removed, check group and descendants for attached templates
final Template removedTemplate = findAllTemplates(childGroup).stream().findFirst().orElse(null);
if (removedTemplate != null) {
throw new IllegalStateException(this + " cannot be updated to the proposed flow because the child " + removedTemplate.getProcessGroup()
+ " that exists locally has one or more Templates, and the proposed flow does not contain these templates. "
+ "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before re-attempting.");
}
if (verifyConnectionRemoval) {
// check removed group and its descendants for connections with data in the queue
final Connection removedConnection = findAllConnections(childGroup).stream()
.filter(connection -> !connection.getFlowFileQueue().isEmpty()).findFirst().orElse(null);
if (removedConnection != null) {
throw new IllegalStateException(this + " cannot be updated to the proposed flow because the proposed flow "
+ "does not contain a match for " + removedConnection + " and the connection currently has data in the queue.");
}
}
} else {
// child group successfully matched, recurse into verification of its contents
verifyCanRemoveMissingComponents(childGroup, proposedChildGroup, verifyConnectionRemoval);
}
}
}

View File

@ -293,13 +293,24 @@ public class NiFiRegistryFlowMapper {
if (currentVersionedId.isPresent()) {
versionedId = currentVersionedId.get();
} else {
versionedId = UUID.nameUUIDFromBytes(componentId.getBytes(StandardCharsets.UTF_8)).toString();
versionedId = generateVersionedComponentId(componentId);
}
versionedComponentIds.put(componentId, versionedId);
return versionedId;
}
/**
* Generate a versioned component identifier based on the given component identifier. The result for a given
* component identifier is deterministic.
*
* @param componentId the component identifier to generate a versioned component identifier for
* @return a deterministic versioned component identifier
*/
public static String generateVersionedComponentId(final String componentId) {
return UUID.nameUUIDFromBytes(componentId.getBytes(StandardCharsets.UTF_8)).toString();
}
private <E extends Exception> String getIdOrThrow(final Optional<String> currentVersionedId, final String componentId, final Supplier<E> exceptionSupplier) throws E {
if (currentVersionedId.isPresent()) {
return currentVersionedId.get();

View File

@ -353,6 +353,11 @@ public class MockProcessGroup implements ProcessGroup {
return null;
}
@Override
public Set<String> getAncestorServiceIds() {
return null;
}
@Override
public ControllerServiceNode findControllerService(final String id, final boolean includeDescendants, final boolean includeAncestors) {
return serviceMap.get(id);

View File

@ -57,16 +57,13 @@ import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.util.FlowDifferenceFilters;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@ -398,7 +395,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);
final Set<String> ancestorServiceIds = getAncestorGroupServiceIds(processGroup);
final Set<String> ancestorServiceIds = processGroup.getAncestorServiceIds();
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare();
final Set<FlowDifference> differences = flowComparison.getDifferences().stream()
@ -411,32 +408,6 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
return differences;
}
private Set<String> getAncestorGroupServiceIds(final ProcessGroup processGroup) {
final Set<String> ancestorServiceIds;
ProcessGroup parentGroup = processGroup.getParent();
if (parentGroup == null) {
ancestorServiceIds = Collections.emptySet();
} else {
ancestorServiceIds = parentGroup.getControllerServices(true).stream()
.map(cs -> {
// We want to map the Controller Service to its Versioned Component ID, if it has one.
// If it does not have one, we want to generate it in the same way that our Flow Mapper does
// because this allows us to find the Controller Service when doing a Flow Diff.
final Optional<String> versionedId = cs.getVersionedComponentId();
if (versionedId.isPresent()) {
return versionedId.get();
}
return UUID.nameUUIDFromBytes(cs.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
})
.collect(Collectors.toSet());
}
return ancestorServiceIds;
}
private VersionedFlowSnapshot createFlowSnapshot(final List<ControllerServiceNode> controllerServices, final List<ProcessorNode> processors, final Set<Parameter> parameters) {
final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
snapshotMetadata.setAuthor("unit-test");

View File

@ -134,7 +134,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* Defines the NiFiServiceFacade interface.
@ -1407,6 +1406,14 @@ public interface NiFiServiceFacade {
*/
FlowComparisonEntity getLocalModifications(String processGroupId);
/**
* Determines whether the process group with the given id or any of its descendants are under version control.
*
* @param groupId the ID of the Process Group
* @return <code>true</code> if any process group in the hierarchy is under version control, <code>false</code> otherwise.
*/
boolean isAnyProcessGroupUnderVersionControl(final String groupId);
/**
* Returns the Version Control information for the Process Group with the given ID
*
@ -1416,7 +1423,6 @@ public interface NiFiServiceFacade {
*/
VersionControlInformationEntity getVersionControlInformation(String processGroupId);
/**
* Adds the given Versioned Flow to the registry specified by the given ID
*
@ -1538,7 +1544,7 @@ public interface NiFiServiceFacade {
* @param updatedSnapshot the snapshot to update the Process Group to
* @return the set of all components that would be affected by updating the Process Group
*/
Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(String processGroupId, VersionedFlowSnapshot updatedSnapshot);
Set<AffectedComponentEntity> getComponentsAffectedByFlowUpdate(String processGroupId, VersionedFlowSnapshot updatedSnapshot);
/**
* Verifies that the Process Group with the given identifier can be updated to the proposed flow
@ -1575,7 +1581,6 @@ public interface NiFiServiceFacade {
*/
void verifyCanRevertLocalModifications(String groupId, VersionedFlowSnapshot versionedFlowSnapshot);
/**
* Updates the Process group with the given ID to match the new snapshot
*
@ -1587,12 +1592,10 @@ public interface NiFiServiceFacade {
* @param updateSettings whether or not the process group's name and position should be updated
* @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
* update the contents of that Process Group
* @param idGenerator the id generator
* @return the Process Group
*/
ProcessGroupEntity updateProcessGroupContents(Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot,
String componentIdSeed, boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows, Supplier<String> idGenerator);
String componentIdSeed, boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows);
/**
* Returns a Set representing all components that will be affected by updating the Parameter Context that is represented by the given DTO.

View File

@ -4399,7 +4399,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public VersionedFlowSnapshot getCurrentFlowSnapshotByGroupId(final String processGroupId) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
// Create a complete (include descendant flows) VersionedProcessGroup snapshot of the flow as it is
// currently without any registry related fields populated, even if the flow is currently versioned.
@ -4480,6 +4479,23 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
}
@Override
public boolean isAnyProcessGroupUnderVersionControl(final String groupId) {
return isProcessGroupUnderVersionControl(processGroupDAO.getProcessGroup(groupId));
}
private boolean isProcessGroupUnderVersionControl(final ProcessGroup processGroup) {
if (processGroup.getVersionControlInformation() != null) {
return true;
}
final Set<ProcessGroup> childGroups = processGroup.getProcessGroups();
if (childGroups != null) {
return childGroups.stream()
.anyMatch(childGroup -> isProcessGroupUnderVersionControl(childGroup));
}
return false;
}
@Override
public VersionControlInformationEntity getVersionControlInformation(final String groupId) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
@ -4534,7 +4550,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);
final Set<String> ancestorServiceIds = getAncestorGroupServiceIds(processGroup);
final Set<String> ancestorServiceIds = processGroup.getAncestorServiceIds();
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare();
@ -4545,31 +4561,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return entity;
}
private Set<String> getAncestorGroupServiceIds(final ProcessGroup group) {
final Set<String> ancestorServiceIds;
ProcessGroup parentGroup = group.getParent();
if (parentGroup == null) {
ancestorServiceIds = Collections.emptySet();
} else {
ancestorServiceIds = parentGroup.getControllerServices(true).stream()
.map(cs -> {
// We want to map the Controller Service to its Versioned Component ID, if it has one.
// If it does not have one, we want to generate it in the same way that our Flow Mapper does
// because this allows us to find the Controller Service when doing a Flow Diff.
final Optional<String> versionedId = cs.getVersionedComponentId();
if (versionedId.isPresent()) {
return versionedId.get();
}
return UUID.nameUUIDFromBytes(cs.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
})
.collect(Collectors.toSet());
}
return ancestorServiceIds;
}
@Override
public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
@ -4660,17 +4651,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot) {
public Set<AffectedComponentEntity> getComponentsAffectedByFlowUpdate(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot) {
final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents);
final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Versioned Flow", updatedSnapshot.getFlowContents());
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Current Flow", localContents);
final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("New Flow", updatedSnapshot.getFlowContents());
final Set<String> ancestorGroupServiceIds = getAncestorGroupServiceIds(group);
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorGroupServiceIds, new StaticDifferenceDescriptor());
final Set<String> ancestorServiceIds = group.getAncestorServiceIds();
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorServiceIds, new StaticDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare();
final FlowManager flowManager = controllerFacade.getFlowManager();
@ -4847,7 +4838,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
if (versionedIdOption.isPresent()) {
versionedId = versionedIdOption.get();
} else {
versionedId = UUID.nameUUIDFromBytes(connectable.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
versionedId = NiFiRegistryFlowMapper.generateVersionedComponentId(connectable.getIdentifier());
}
final List<Connectable> byVersionedId = destination.computeIfAbsent(versionedId, key -> new ArrayList<>());
@ -5027,7 +5018,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public ProcessGroupEntity updateProcessGroupContents(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified,
final boolean updateSettings, final boolean updateDescendantVersionedFlows, final Supplier<String> idGenerator) {
final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();

View File

@ -0,0 +1,708 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizeParameterReference;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.ComponentAuthorizable;
import org.apache.nifi.authorization.ProcessGroupAuthorizable;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.ResumeFlowException;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.concurrent.AsyncRequestManager;
import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest;
import org.apache.nifi.web.api.concurrent.RequestManager;
import org.apache.nifi.web.api.concurrent.StandardAsynchronousWebRequest;
import org.apache.nifi.web.api.concurrent.StandardUpdateStep;
import org.apache.nifi.web.api.concurrent.UpdateStep;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.FlowUpdateRequestDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.FlowUpdateRequestEntity;
import org.apache.nifi.web.api.entity.ProcessGroupDescriptorEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.util.AffectedComponentUtils;
import org.apache.nifi.web.util.CancellableTimedPause;
import org.apache.nifi.web.util.ComponentLifecycle;
import org.apache.nifi.web.util.InvalidComponentAction;
import org.apache.nifi.web.util.LifecycleManagementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Parameterized abstract resource for use in updating flows.
*
* @param <T> Entity to use for describing a process group for update purposes
* @param <U> Entity to capture the status and result of an update request
*/
public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity, U extends FlowUpdateRequestEntity> extends ApplicationResource {
private static final Logger logger = LoggerFactory.getLogger(FlowUpdateResource.class);
protected NiFiServiceFacade serviceFacade;
protected Authorizer authorizer;
protected DtoFactory dtoFactory;
protected ComponentLifecycle clusterComponentLifecycle;
protected ComponentLifecycle localComponentLifecycle;
protected RequestManager<T, T> requestManager =
new AsyncRequestManager<>(100, TimeUnit.MINUTES.toMillis(1L),
"Process Group Update Thread");
/**
* Perform actual flow update
*/
protected abstract ProcessGroupEntity performUpdateFlow(final String groupId, final Revision revision, final T requestEntity,
final VersionedFlowSnapshot flowSnapshot, final String idGenerationSeed,
final boolean verifyNotModified, final boolean updateDescendantVersionedFlows);
/**
* Create the entity that is passed for update flow replication
*/
protected abstract Entity createReplicateUpdateFlowEntity(final Revision revision, final T requestEntity,
final VersionedFlowSnapshot flowSnapshot);
/**
* Create the entity that captures the status and result of an update request
*/
protected abstract U createUpdateRequestEntity();
/**
* Perform additional logic to finalize an update request entity
*/
protected abstract void finalizeCompletedUpdateRequest(U updateRequestEntity);
/**
* Initiate a flow update. Return a response containing an entity that reflects the status of the async request.
*
* This is used by both import-based flow updates and registry-based flow updates.
*
* @param groupId the id of the process group to update
* @param requestEntity the entity containing the request, either versioning info or the flow contents
* @param allowDirtyFlowUpdate allow updating a flow with versioned changes present
* @param requestType the type of request ("replace-requests" or "update-requests")
* @param replicateUriPath the uri path to use for replicating the request (differs from initial request uri)
* @param flowSnapshotSupplier provides access to the flow snapshot to be used for replacement
* @return response containing status of the async request
*/
protected Response initiateFlowUpdate(final String groupId, final T requestEntity, final boolean allowDirtyFlowUpdate,
final String requestType, final String replicateUriPath,
final Supplier<VersionedFlowSnapshot> flowSnapshotSupplier) {
// Verify the request
final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
if (revisionDto == null) {
throw new IllegalArgumentException("Process Group Revision must be specified");
}
if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestEntity.isDisconnectedNodeAcknowledged());
}
// We will perform the updating of the flow in a background thread because it can be a long-running process.
// In order to do this, we will need some parameters that are only available as Thread-Local variables to the current
// thread, so we will gather the values for these parameters up front.
final boolean replicateRequest = isReplicateRequest();
final ComponentLifecycle componentLifecycle = replicateRequest ? clusterComponentLifecycle : localComponentLifecycle;
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// Workflow for this process:
// 0. Obtain the versioned flow snapshot to use for the update
// a. Retrieve flow snapshot from request entity (import) or from registry (version change)
// 1. Determine which components would be affected (and are enabled/running)
// a. Component itself is modified in some way, other than position changing.
// b. Source and Destination of any Connection that is modified.
// c. Any Processor or Controller Service that references a Controller Service that is modified.
// 2. Verify READ and WRITE permissions for user, for every component.
// 3. Verify that all components in the snapshot exist on all nodes (i.e., the NAR exists)?
// 4: Verify that Process Group can be updated. Only versioned flows care about the verifyNotDirty flag.
// 5. Stop all Processors, Funnels, Ports that are affected.
// 6. Wait for all of the components to finish stopping.
// 7. Disable all Controller Services that are affected.
// 8. Wait for all Controller Services to finish disabling.
// 9. Ensure that if any connection was deleted, that it has no data in it. Ensure that no Input Port
// was removed, unless it currently has no incoming connections. Ensure that no Output Port was removed,
// unless it currently has no outgoing connections. Checking ports & connections could be done before
// stopping everything, but removal of Connections cannot.
// 10. Update variable registry to include new variables
// (only new variables so don't have to worry about affected components? Or do we need to in case a processor
// is already referencing the variable? In which case we need to include the affected components above in the
// set of affected components before stopping/disabling.).
// 11. Update components in the Process Group; update Version Control Information (registry version change only).
// 12. Re-Enable all affected Controller Services that were not removed.
// 13. Re-Start all Processors, Funnels, Ports that are affected and not removed.
// Step 0: Obtain the versioned flow snapshot to use for the update
final VersionedFlowSnapshot flowSnapshot = flowSnapshotSupplier.get();
// The new flow may not contain the same versions of components in existing flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// If there are any Controller Services referenced that are inherited from the parent group, resolve those to point to the appropriate Controller Service, if we are able to.
serviceFacade.resolveInheritedControllerServices(flowSnapshot, groupId, user);
// Step 1: Determine which components will be affected by updating the flow
final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByFlowUpdate(groupId, flowSnapshot);
// build a request wrapper
final InitiateUpdateFlowRequestWrapper requestWrapper =
new InitiateUpdateFlowRequestWrapper(requestEntity, componentLifecycle, requestType, getAbsolutePath(), replicateUriPath,
affectedComponents, replicateRequest, flowSnapshot);
final Revision requestRevision = getRevision(revisionDto, groupId);
return withWriteLock(
serviceFacade,
requestWrapper,
requestRevision,
lookup -> authorizeFlowUpdate(lookup, user, groupId, flowSnapshot),
() -> {
// Step 3: Verify that all components in the snapshot exist on all nodes
// Step 4: Verify that Process Group can be updated. Only versioned flows care about the verifyNotDirty flag
serviceFacade.verifyCanUpdate(groupId, flowSnapshot, false, !allowDirtyFlowUpdate);
},
(revision, wrapper) -> submitFlowUpdateRequest(user, groupId, revision, wrapper, allowDirtyFlowUpdate)
);
}
/**
* Authorize read/write permissions for the given user on every component of the given flow in support of flow update.
*
* @param lookup A lookup instance to use for retrieving components for authorization purposes
* @param user the user to authorize
* @param groupId the id of the process group being evaluated
* @param flowSnapshot the new flow contents to examine for restricted components
*/
protected void authorizeFlowUpdate(final AuthorizableLookup lookup, final NiFiUser user, final String groupId,
final VersionedFlowSnapshot flowSnapshot) {
// Step 2: Verify READ and WRITE permissions for user, for every component.
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true,
false, true, true, true);
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true,
false, true, true, false);
final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
final Set<ConfigurableComponent> restrictedComponents = FlowRegistryUtils.getRestrictedComponents(groupContents, serviceFacade);
restrictedComponents.forEach(restrictedComponent -> {
final ComponentAuthorizable restrictedComponentAuthorizable = lookup.getConfigurableComponent(restrictedComponent);
authorizeRestrictions(authorizer, restrictedComponentAuthorizable);
});
final Map<String, VersionedParameterContext> parameterContexts = flowSnapshot.getParameterContexts();
if (parameterContexts != null) {
parameterContexts.values().forEach(
context -> AuthorizeParameterReference.authorizeParameterContextAddition(context, serviceFacade, authorizer, lookup, user)
);
}
}
/**
* Create and submit the flow update request. Return response containing an entity reflecting the status of the async request.
*
* This is used by import-based flow replacements, registry-based flow updates and registry-based flow reverts
*
* @param user the user that submitted the update request
* @param groupId the id of the process group to update
* @param revision a revision object representing a unique request to update a specific process group
* @param wrapper wrapper object containing many variables needed for performing the flow update
* @param allowDirtyFlowUpdate allow updating a flow with versioned changes present
* @return response containing status of the update flow request
*/
protected Response submitFlowUpdateRequest(final NiFiUser user, final String groupId, final Revision revision,
final InitiateUpdateFlowRequestWrapper wrapper, final boolean allowDirtyFlowUpdate) {
final String requestType = wrapper.getRequestType();
final String idGenerationSeed = getIdGenerationSeed().orElse(null);
// Steps 5+ occur asynchronously
// Create an asynchronous request that will occur in the background, because this request may
// result in stopping components, which can take an indeterminate amount of time.
final String requestId = UUID.randomUUID().toString();
final AsynchronousWebRequest<T, T> request =
new StandardAsynchronousWebRequest<>(requestId, wrapper.getRequestEntity(), groupId, user, getUpdateFlowSteps());
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<T, T>> updateTask =
vcur -> {
try {
updateFlow(groupId, wrapper.getComponentLifecycle(), wrapper.getRequestUri(),
wrapper.getAffectedComponents(), wrapper.isReplicateRequest(), wrapper.getReplicateUriPath(),
revision, wrapper.getRequestEntity(), wrapper.getFlowSnapshot(), request,
idGenerationSeed, allowDirtyFlowUpdate);
// no need to store any result of above flow update because it's not used
vcur.markStepComplete();
} catch (final ResumeFlowException rfe) {
// Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow
// since in this case the flow was successfully updated - we just couldn't re-enable the components.
logger.warn(rfe.getMessage(), rfe);
vcur.fail(rfe.getMessage());
} catch (final Exception e) {
logger.error("Failed to perform update flow request ", e);
vcur.fail("Failed to perform update flow request due to " + e.getMessage());
}
};
requestManager.submitRequest(requestType, requestId, request, updateTask);
return createUpdateRequestResponse(requestType, requestId, request, false);
}
/**
* Perform the specified flow update
*/
private void updateFlow(final String groupId, final ComponentLifecycle componentLifecycle, final URI requestUri,
final Set<AffectedComponentEntity> affectedComponents, final boolean replicateRequest,
final String replicateUriPath, final Revision revision, final T requestEntity,
final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest<T, T> asyncRequest,
final String idGenerationSeed, final boolean allowDirtyFlowUpdate)
throws LifecycleManagementException, ResumeFlowException {
// Steps 5-6: Determine which components must be stopped and stop them.
final Set<String> stoppableReferenceTypes = new HashSet<>();
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
final Set<AffectedComponentEntity> runningComponents = affectedComponents.stream()
.filter(dto -> stoppableReferenceTypes.contains(dto.getComponent().getReferenceType()))
.filter(dto -> "Running".equalsIgnoreCase(dto.getComponent().getState()))
.collect(Collectors.toSet());
logger.info("Stopping {} Processors", runningComponents.size());
final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(stopComponentsPause::cancel);
componentLifecycle.scheduleComponents(requestUri, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.SKIP);
if (asyncRequest.isCancelled()) {
return;
}
asyncRequest.markStepComplete();
// Steps 7-8. Disable enabled controller services that are affected
final Set<AffectedComponentEntity> enabledServices = affectedComponents.stream()
.filter(dto -> AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(dto.getComponent().getReferenceType()))
.filter(dto -> "Enabled".equalsIgnoreCase(dto.getComponent().getState()))
.collect(Collectors.toSet());
logger.info("Disabling {} Controller Services", enabledServices.size());
final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(disableServicesPause::cancel);
componentLifecycle.activateControllerServices(requestUri, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.SKIP);
if (asyncRequest.isCancelled()) {
return;
}
asyncRequest.markStepComplete();
try {
if (replicateRequest) {
// If replicating request, steps 9-11 are performed on each node individually
final NiFiUser user = NiFiUserUtils.getNiFiUser();
URI replicateUri = null;
try {
replicateUri = new URI(requestUri.getScheme(), requestUri.getUserInfo(), requestUri.getHost(), requestUri.getPort(),
replicateUriPath, null, requestUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final Map<String, String> headers = new HashMap<>();
headers.put("content-type", MediaType.APPLICATION_JSON);
// each concrete class creates its own type of entity for replication
final Entity replicateEntity = createReplicateUpdateFlowEntity(revision, requestEntity, flowSnapshot);
final NodeResponse clusterResponse;
try {
logger.debug("Replicating PUT request to {} for user {}", replicateUri, user);
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), user, HttpMethod.PUT, replicateUri, replicateEntity, headers).awaitMergedResponse();
}
} catch (final InterruptedException ie) {
logger.warn("Interrupted while replicating PUT request to {} for user {}", replicateUri, user);
Thread.currentThread().interrupt();
throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
}
final int updateFlowStatus = clusterResponse.getStatus();
if (updateFlowStatus != Status.OK.getStatusCode()) {
final String explanation = getResponseEntity(clusterResponse, String.class);
logger.error("Failed to update flow across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}",
replicateUri, user, updateFlowStatus, explanation);
throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation);
}
} else {
// Step 9: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot,
// that it has no data in it. Ensure that no Input Port was removed, unless it currently has no incoming connections.
// Ensure that no Output Port was removed, unless it currently has no outgoing connections.
serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, !allowDirtyFlowUpdate);
// Step 10-11. Update Process Group to the new flow and update variable registry with any Variables that were added or removed.
// Each concrete class defines its own update flow functionality
performUpdateFlow(groupId, revision, requestEntity, flowSnapshot, idGenerationSeed, !allowDirtyFlowUpdate, true);
}
} finally {
if (!asyncRequest.isCancelled()) {
if (logger.isDebugEnabled()) {
logger.debug("Re-Enabling {} Controller Services: {}", enabledServices.size(), enabledServices);
}
asyncRequest.markStepComplete();
// Step 12. Re-enable all disabled controller services
final CancellableTimedPause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(enableServicesPause::cancel);
final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices);
logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
try {
componentLifecycle.activateControllerServices(requestUri, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
} catch (final IllegalStateException ise) {
// Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
throw new ResumeFlowException("Successfully updated flow but could not re-enable all Controller Services because " + ise.getMessage(), ise);
}
}
if (!asyncRequest.isCancelled()) {
if (logger.isDebugEnabled()) {
logger.debug("Restart {} Processors: {}", runningComponents.size(), runningComponents);
}
asyncRequest.markStepComplete();
// Step 13. Restart all components
final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents);
// If there are any Remote Group Ports that are supposed to be started and have no connections, we want to remove those from our Set.
// This will happen if the Remote Group Port is transmitting when the flow change happens but the new flow does not have a connection
// to the port. In such a case, the Port still is included in the Updated Entities because we do not remove them when updating the flow
// (they are removed in the background).
final Set<AffectedComponentEntity> avoidStarting = new HashSet<>();
for (final AffectedComponentEntity componentEntity : componentsToStart) {
final AffectedComponentDTO componentDto = componentEntity.getComponent();
final String referenceType = componentDto.getReferenceType();
if (!AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT.equals(referenceType)
&& !AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT.equals(referenceType)) {
continue;
}
boolean startComponent;
try {
startComponent = serviceFacade.isRemoteGroupPortConnected(componentDto.getProcessGroupId(), componentDto.getId());
} catch (final ResourceNotFoundException rnfe) {
// Could occur if RPG is refreshed at just the right time.
startComponent = false;
}
// We must add the components to avoid starting to a separate Set and then remove them below,
// rather than removing the component here, because doing so would result in a ConcurrentModificationException.
if (!startComponent) {
avoidStarting.add(componentEntity);
}
}
componentsToStart.removeAll(avoidStarting);
final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(startComponentsPause::cancel);
logger.info("Restarting {} Processors", componentsToStart.size());
try {
componentLifecycle.scheduleComponents(requestUri, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause, InvalidComponentAction.SKIP);
} catch (final IllegalStateException ise) {
// Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
throw new ResumeFlowException("Successfully updated flow but could not restart all Processors because " + ise.getMessage(), ise);
}
}
}
asyncRequest.setCancelCallback(null);
}
/**
* Get a list of steps to perform for upload flow
*/
private static List<UpdateStep> getUpdateFlowSteps() {
final List<UpdateStep> updateSteps = new ArrayList<>();
updateSteps.add(new StandardUpdateStep("Stopping Affected Processors"));
updateSteps.add(new StandardUpdateStep("Disabling Affected Controller Services"));
updateSteps.add(new StandardUpdateStep("Updating Flow"));
updateSteps.add(new StandardUpdateStep("Re-Enabling Controller Services"));
updateSteps.add(new StandardUpdateStep("Restarting Affected Processors"));
return updateSteps;
}
/**
* Extracts the response entity from the specified node response.
*
* @param nodeResponse node response
* @param clazz class
* @param <T> type of class
* @return the response entity
*/
@SuppressWarnings("unchecked")
protected <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) {
T entity = (T) nodeResponse.getUpdatedEntity();
if (entity == null) {
entity = nodeResponse.getClientResponse().readEntity(clazz);
}
return entity;
}
protected Set<AffectedComponentEntity> getUpdatedEntities(final Set<AffectedComponentEntity> originalEntities) {
final Set<AffectedComponentEntity> entities = new LinkedHashSet<>();
for (final AffectedComponentEntity original : originalEntities) {
try {
final AffectedComponentEntity updatedEntity = AffectedComponentUtils.updateEntity(original, serviceFacade, dtoFactory);
if (updatedEntity != null) {
entities.add(updatedEntity);
}
} catch (final ResourceNotFoundException rnfe) {
// Component was removed. Just continue on without adding anything to the entities.
// We do this because the intent is to get updated versions of the entities with current
// Revisions so that we can change the states of the components. If the component was removed,
// then we can just drop the entity, since there is no need to change its state.
}
}
return entities;
}
/**
* Process a request to retrieve an existing flow update request.
*
* This is used by import-based flow replacements, registry-based flow updates and registry-based flow reverts
*
* @param requestType the type of request ("replace-requests", "update-requests" or "revert-requests")
* @param requestId the unique identifier for the update request
* @return response containing the requested flow update request
*/
protected Response retrieveFlowUpdateRequest(final String requestType, final String requestId) {
if (requestId == null) {
throw new IllegalArgumentException("Request ID must be specified.");
}
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// request manager will ensure that the current is the user that submitted this request
final AsynchronousWebRequest<T, T> asyncRequest = requestManager.getRequest(requestType, requestId, user);
return createUpdateRequestResponse(requestType, requestId, asyncRequest, true);
}
/**
* Process a request to cancel/delete an existing flow update request.
*
* This is used by import-based flow replacements, registry-based flow updates and registry-based flow reverts
*
* @param requestType the type of request ("replace-requests", "update-requests" or "revert-requests")
* @param requestId the unique identifier for the update request
* @param disconnectedNodeAcknowledged acknowledges that this node is disconnected to allow for mutable requests to proceed
* @return response containing the deleted flow update request
*/
protected Response deleteFlowUpdateRequest(final String requestType, final String requestId,
final boolean disconnectedNodeAcknowledged) {
if (requestId == null) {
throw new IllegalArgumentException("Request ID must be specified.");
}
if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(disconnectedNodeAcknowledged);
}
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// request manager will ensure that the current is the user that submitted this request
final AsynchronousWebRequest<T, T> asyncRequest = requestManager.removeRequest(requestType, requestId, user);
if (!asyncRequest.isComplete()) {
asyncRequest.cancel();
}
return createUpdateRequestResponse(requestType, requestId, asyncRequest, true);
}
/**
* Create response containing entity that reflects the status of the update request
*
* @param requestType the type of request ("replace-requests", "update-requests" or "revert-requests")
* @param requestId the unique identifier for the update request
* @param asyncRequest async request object
* @param finalizeCompletedRequest if true, perform additional custom operations to finalize the update request
* @return response containing entity that reflects the status of the update request
*/
protected Response createUpdateRequestResponse(final String requestType, final String requestId,
final AsynchronousWebRequest<T,T> asyncRequest,
final boolean finalizeCompletedRequest) {
final String groupId = asyncRequest.getComponentId();
final U updateRequestEntity = createUpdateRequestEntity();
final RevisionDTO groupRevision = serviceFacade.getProcessGroup(groupId).getRevision();
updateRequestEntity.setProcessGroupRevision(groupRevision);
final FlowUpdateRequestDTO updateRequestDto = updateRequestEntity.getRequest();
updateRequestDto.setComplete(asyncRequest.isComplete());
updateRequestDto.setFailureReason(asyncRequest.getFailureReason());
updateRequestDto.setLastUpdated(asyncRequest.getLastUpdated());
updateRequestDto.setProcessGroupId(groupId);
updateRequestDto.setRequestId(requestId);
updateRequestDto.setUri(generateResourceUri(getRequestPathFirstSegment(), requestType, requestId));
updateRequestDto.setPercentCompleted(asyncRequest.getPercentComplete());
updateRequestDto.setState(asyncRequest.getState());
if (finalizeCompletedRequest) {
// perform additional custom operations to finalize the update request
finalizeCompletedUpdateRequest(updateRequestEntity);
}
return generateOkResponse(updateRequestEntity).build();
}
/**
* Access the current request URI's first path segment (i.e., "versions" or "process-groups").
*
* This avoids having to hardcode the value as an argument to an update flow request.
*/
protected String getRequestPathFirstSegment() {
return uriInfo.getPathSegments().get(0).getPath();
}
protected class InitiateUpdateFlowRequestWrapper extends Entity {
private final T requestEntity;
private final ComponentLifecycle componentLifecycle;
private final String requestType;
private final URI requestUri;
private final String replicateUriPath;
private final Set<AffectedComponentEntity> affectedComponents;
private final boolean replicateRequest;
private final VersionedFlowSnapshot flowSnapshot;
public InitiateUpdateFlowRequestWrapper(final T requestEntity, final ComponentLifecycle componentLifecycle,
final String requestType, final URI requestUri, final String replicateUriPath,
final Set<AffectedComponentEntity> affectedComponents,
final boolean replicateRequest, final VersionedFlowSnapshot flowSnapshot) {
this.requestEntity = requestEntity;
this.componentLifecycle = componentLifecycle;
this.requestType = requestType;
this.requestUri = requestUri;
this.replicateUriPath = replicateUriPath;
this.affectedComponents = affectedComponents;
this.replicateRequest = replicateRequest;
this.flowSnapshot = flowSnapshot;
}
public T getRequestEntity() {
return requestEntity;
}
public ComponentLifecycle getComponentLifecycle() {
return componentLifecycle;
}
public String getRequestType() {
return requestType;
}
public URI getRequestUri() {
return requestUri;
}
public String getReplicateUriPath() {
return replicateUriPath;
}
public Set<AffectedComponentEntity> getAffectedComponents() {
return affectedComponents;
}
public boolean isReplicateRequest() {
return replicateRequest;
}
public VersionedFlowSnapshot getFlowSnapshot() {
return flowSnapshot;
}
}
// setters
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
public void setAuthorizer(Authorizer authorizer) {
this.authorizer = authorizer;
}
public void setDtoFactory(DtoFactory dtoFactory) {
this.dtoFactory = dtoFactory;
}
public void setClusterComponentLifecycle(ComponentLifecycle componentLifecycle) {
this.clusterComponentLifecycle = componentLifecycle;
}
public void setLocalComponentLifecycle(ComponentLifecycle componentLifecycle) {
this.localComponentLifecycle = componentLifecycle;
}
}

View File

@ -24,63 +24,12 @@ import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLStreamReader;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizeAccess;
import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
import org.apache.nifi.authorization.AuthorizeParameterReference;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.ComponentAuthorizable;
import org.apache.nifi.authorization.ProcessGroupAuthorizable;
import org.apache.nifi.authorization.RequestAction;
@ -110,18 +59,17 @@ import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.security.xml.XmlUtils;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessGroupReplaceRequestDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
@ -152,6 +100,8 @@ import org.apache.nifi.web.api.entity.OutputPortsEntity;
import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity;
import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity;
import org.apache.nifi.web.api.entity.ProcessGroupsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsEntity;
@ -171,6 +121,57 @@ import org.slf4j.LoggerFactory;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLStreamReader;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* RESTful endpoint for managing a Group.
*/
@ -179,13 +180,10 @@ import org.springframework.security.core.context.SecurityContextHolder;
value = "/process-groups",
description = "Endpoint for managing a Process Group."
)
public class ProcessGroupResource extends ApplicationResource {
public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportEntity, ProcessGroupReplaceRequestEntity> {
private static final Logger logger = LoggerFactory.getLogger(ProcessGroupResource.class);
private NiFiServiceFacade serviceFacade;
private Authorizer authorizer;
private ProcessorResource processorResource;
private InputPortResource inputPortResource;
private OutputPortResource outputPortResource;
@ -195,7 +193,6 @@ public class ProcessGroupResource extends ApplicationResource {
private ConnectionResource connectionResource;
private TemplateResource templateResource;
private ControllerServiceResource controllerServiceResource;
private DtoFactory dtoFactory;
private final ConcurrentMap<String, VariableRegistryUpdateRequest> varRegistryUpdateRequests = new ConcurrentHashMap<>();
private static final int MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS = 100;
@ -1588,7 +1585,7 @@ public class ProcessGroupResource extends ApplicationResource {
* @return the response entity
*/
@SuppressWarnings("unchecked")
private <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) {
protected <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) {
T entity = (T) nodeResponse.getUpdatedEntity();
if (entity == null) {
entity = nodeResponse.getClientResponse().readEntity(clazz);
@ -1866,7 +1863,7 @@ public class ProcessGroupResource extends ApplicationResource {
// To accomplish this, we call updateProcessGroupContents() passing 'true' for the updateSettings flag but null out the position.
flowSnapshot.getFlowContents().setPosition(null);
entity = serviceFacade.updateProcessGroupContents(newGroupRevision, newGroupId, versionControlInfo, flowSnapshot,
getIdGenerationSeed().orElse(null), false, true, true, this::generateUuid);
getIdGenerationSeed().orElse(null), false, true, true);
}
populateRemainingProcessGroupEntityContent(entity);
@ -1878,8 +1875,6 @@ public class ProcessGroupResource extends ApplicationResource {
);
}
private VersionedFlowSnapshot getFlowFromRegistry(final VersionControlInformationDTO versionControlInfo) {
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true);
final Bucket bucket = flowSnapshot.getBucket();
@ -1898,9 +1893,10 @@ public class ProcessGroupResource extends ApplicationResource {
/**
* Retrieves all the processors in this NiFi.
* Retrieves all the child process groups of the process group with the given id.
*
* @return A processorsEntity.
* @param groupId the parent process group id
* @return An entity containing all the child process group entities.
*/
@GET
@Consumes(MediaType.WILDCARD)
@ -3894,6 +3890,278 @@ public class ProcessGroupResource extends ApplicationResource {
);
}
/**
* Initiates the request to replace the Process Group with the given ID with the Process Group in the given import entity
*
* @param groupId The id of the process group to replace
* @param importEntity A request entity containing revision info and the process group to replace with
* @return A ProcessGroupReplaceRequestEntity.
*/
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("{id}/replace-requests")
@ApiOperation(
value = "Initiate the Replace Request of a Process Group with the given ID",
response = ProcessGroupReplaceRequestEntity.class,
notes = "This will initiate the action of replacing a process group with the given process group. This can be a lengthy "
+ "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, "
+ "the endpoint will immediately return a ProcessGroupReplaceRequestEntity, and the process of replacing the flow will occur "
+ "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to "
+ "/process-groups/replace-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to "
+ "/process-groups/replace-requests/{requestId}. " + NON_GUARANTEED_ENDPOINT,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}"),
@Authorization(value = "Write - /process-groups/{uuid}"),
@Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"),
@Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated components"),
@Authorization(value = "Write - if the template contains any restricted components - /restricted-components"),
@Authorization(value = "Read - /parameter-contexts/{uuid} - For any Parameter Context that is referenced by a Property that is changed, added, or removed")
}
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response initiateReplaceProcessGroup(@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
@ApiParam(value = "The process group replace request entity", required = true) final ProcessGroupImportEntity importEntity) {
if (importEntity == null) {
throw new IllegalArgumentException("Process Group Import Entity is required");
}
// replacing a flow under version control is not permitted via import. Versioned flows have additional requirements to allow
// them only to be replaced by a different version of the same flow.
if (serviceFacade.isAnyProcessGroupUnderVersionControl(groupId)) {
throw new IllegalStateException("Cannot replace a Process Group via import while it or its descendants are under Version Control.");
}
final VersionedFlowSnapshot versionedFlowSnapshot = importEntity.getVersionedFlowSnapshot();
if (versionedFlowSnapshot == null) {
throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied");
}
// remove any registry-specific versioning content which could be present if the flow was exported from registry
versionedFlowSnapshot.setFlow(null);
versionedFlowSnapshot.setBucket(null);
versionedFlowSnapshot.setSnapshotMetadata(null);
sanitizeRegistryInfo(versionedFlowSnapshot.getFlowContents());
return initiateFlowUpdate(groupId, importEntity, true, "replace-requests",
"/nifi-api/process-groups/" + groupId + "/flow-contents", importEntity::getVersionedFlowSnapshot);
}
/**
* Recursively clear the registry info in the given versioned process group and all nested versioned process groups
*
* @param versionedProcessGroup the process group to sanitize
*/
private void sanitizeRegistryInfo(final VersionedProcessGroup versionedProcessGroup) {
versionedProcessGroup.setVersionedFlowCoordinates(null);
for (final VersionedProcessGroup innerVersionedProcessGroup : versionedProcessGroup.getProcessGroups()) {
sanitizeRegistryInfo(innerVersionedProcessGroup);
}
}
/**
* Replace the Process Group contents with the given ID with the specified Process Group contents.
*
* This is the endpoint used in a cluster update replication scenario.
*
* @param groupId The id of the process group to replace
* @param importEntity A request entity containing revision info and the process group to replace with
* @return A ProcessGroupImportEntity.
*/
@PUT
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("{id}/flow-contents")
@ApiOperation(
value = "Replace Process Group contents with the given ID with the specified Process Group contents",
response = ProcessGroupImportEntity.class,
notes = "This endpoint is used for replication within a cluster, when replacing a flow with a new flow. It expects that the flow being"
+ "replaced is not under version control and that the given snapshot will not modify any Processor that is currently running "
+ "or any Controller Service that is enabled. "
+ NON_GUARANTEED_ENDPOINT,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}"),
@Authorization(value = "Write - /process-groups/{uuid}")
})
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response replaceProcessGroup(@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
@ApiParam(value = "The process group replace request entity.", required = true) final ProcessGroupImportEntity importEntity) {
// Verify the request
if (importEntity == null) {
throw new IllegalArgumentException("Process Group Import Entity is required");
}
final RevisionDTO revisionDto = importEntity.getProcessGroupRevision();
if (revisionDto == null) {
throw new IllegalArgumentException("Process Group Revision must be specified.");
}
final VersionedFlowSnapshot requestFlowSnapshot = importEntity.getVersionedFlowSnapshot();
if (requestFlowSnapshot == null) {
throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied.");
}
// Perform the request
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, importEntity);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(importEntity.isDisconnectedNodeAcknowledged());
}
final Revision requestRevision = getRevision(importEntity.getProcessGroupRevision(), groupId);
return withWriteLock(
serviceFacade,
importEntity,
requestRevision,
lookup -> {
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
final Authorizable processGroup = groupAuthorizable.getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> {
// We do not enforce that the Process Group is 'not dirty' because at this point,
// the client has explicitly indicated the dataflow that the Process Group should
// provide and provided the Revision to ensure that they have the most up-to-date
// view of the Process Group.
serviceFacade.verifyCanUpdate(groupId, requestFlowSnapshot, true, false);
},
(revision, entity) -> {
final ProcessGroupEntity updatedGroup =
performUpdateFlow(groupId, revision, importEntity, entity.getVersionedFlowSnapshot(),
getIdGenerationSeed().orElse(null), false, true);
// response to replication request is an entity with revision info but no versioned flow snapshot
final ProcessGroupImportEntity responseEntity = new ProcessGroupImportEntity();
responseEntity.setProcessGroupRevision(updatedGroup.getRevision());
return generateOkResponse(responseEntity).build();
});
}
/**
* Retrieve a request to replace a Process Group by request ID.
*
* @param replaceRequestId The ID of the replace request
* @return A ProcessGroupReplaceRequestEntity.
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("replace-requests/{id}")
@ApiOperation(
value = "Returns the Replace Request with the given ID",
response = ProcessGroupReplaceRequestEntity.class,
notes = "Returns the Replace Request with the given ID. Once a Replace Request has been created by performing a POST to /process-groups/{id}/replace-requests, "
+ "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the "
+ "current state of the request, and any failures. "
+ NON_GUARANTEED_ENDPOINT,
authorizations = {
@Authorization(value = "Only the user that submitted the request can get it")
})
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response getReplaceProcessGroupRequest(
@ApiParam("The ID of the Replace Request") @PathParam("id") final String replaceRequestId) {
return retrieveFlowUpdateRequest("replace-requests", replaceRequestId);
}
@DELETE
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("replace-requests/{id}")
@ApiOperation(
value = "Deletes the Replace Request with the given ID",
response = ProcessGroupReplaceRequestEntity.class,
notes = "Deletes the Replace Request with the given ID. After a request is created via a POST to /process-groups/{id}/replace-requests, it is expected "
+ "that the client will properly clean up the request by DELETE'ing it, once the Replace process has completed. If the request is deleted before the request "
+ "completes, then the Replace request will finish the step that it is currently performing and then will cancel any subsequent steps. "
+ NON_GUARANTEED_ENDPOINT,
authorizations = {
@Authorization(value = "Only the user that submitted the request can remove it")
})
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response deleteReplaceProcessGroupRequest(
@ApiParam(value = "Acknowledges that this node is disconnected to allow for mutable requests to proceed.", required = false)
@QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
@ApiParam("The ID of the Update Request") @PathParam("id") final String replaceRequestId) {
return deleteFlowUpdateRequest("replace-requests", replaceRequestId, disconnectedNodeAcknowledged.booleanValue());
}
/**
* Perform actual flow update of the specified flow. This is used for the initial flow update and replication updates.
*/
@Override
protected ProcessGroupEntity performUpdateFlow(final String groupId, final Revision revision, final ProcessGroupImportEntity requestEntity,
final VersionedFlowSnapshot flowSnapshot, final String idGenerationSeed,
final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) {
logger.info("Replacing Process Group with ID {} with imported Process Group with ID {}", groupId, flowSnapshot.getFlowContents().getIdentifier());
// Update Process Group to the new flow (including name) and update variable registry with any Variables that were added or removed
return serviceFacade.updateProcessGroupContents(revision, groupId, null, flowSnapshot, idGenerationSeed, verifyNotModified,
true, updateDescendantVersionedFlows);
}
/**
* Create the entity that is used for update flow replication. The initial replace request entity can be re-used for the replication request.
*/
@Override
protected Entity createReplicateUpdateFlowEntity(final Revision revision, final ProcessGroupImportEntity requestEntity,
final VersionedFlowSnapshot flowSnapshot) {
return requestEntity;
}
/**
* Create the entity that captures the status and result of a replace request
*
* @return a new instance of a ProcessGroupReplaceRequestEntity
*/
@Override
protected ProcessGroupReplaceRequestEntity createUpdateRequestEntity() {
return new ProcessGroupReplaceRequestEntity();
}
/**
* Finalize a completed update request for an existing replace request. This is used when retrieving and deleting a replace request.
*
* A completed request will contain the updated VersionedFlowSnapshot
*
* @param requestEntity the request entity to finalize
*/
@Override
protected void finalizeCompletedUpdateRequest(final ProcessGroupReplaceRequestEntity requestEntity) {
final ProcessGroupReplaceRequestDTO updateRequestDto = requestEntity.getRequest();
if (updateRequestDto.isComplete()) {
final VersionedFlowSnapshot versionedFlowSnapshot =
serviceFacade.getCurrentFlowSnapshotByGroupId(updateRequestDto.getProcessGroupId());
requestEntity.setVersionedFlowSnapshot(versionedFlowSnapshot);
}
}
private static class UpdateVariableRegistryRequestWrapper extends Entity {
private final Set<AffectedComponentEntity> allAffectedComponents;
private final List<AffectedComponentDTO> activeAffectedProcessors;
@ -3928,10 +4196,6 @@ public class ProcessGroupResource extends ApplicationResource {
// setters
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
public void setProcessorResource(ProcessorResource processorResource) {
this.processorResource = processorResource;
}
@ -3967,12 +4231,4 @@ public class ProcessGroupResource extends ApplicationResource {
public void setControllerServiceResource(ControllerServiceResource controllerServiceResource) {
this.controllerServiceResource = controllerServiceResource;
}
public void setAuthorizer(Authorizer authorizer) {
this.authorizer = authorizer;
}
public void setDtoFactory(DtoFactory dtoFactory) {
this.dtoFactory = dtoFactory;
}
}

View File

@ -25,39 +25,20 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizeParameterReference;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.ComponentAuthorizable;
import org.apache.nifi.authorization.ProcessGroupAuthorizable;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.ResumeFlowException;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.concurrent.AsyncRequestManager;
import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest;
import org.apache.nifi.web.api.concurrent.RequestManager;
import org.apache.nifi.web.api.concurrent.StandardAsynchronousWebRequest;
import org.apache.nifi.web.api.concurrent.StandardUpdateStep;
import org.apache.nifi.web.api.concurrent.UpdateStep;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.VersionedFlowDTO;
@ -73,11 +54,7 @@ import org.apache.nifi.web.api.entity.VersionedFlowSnapshotEntity;
import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
import org.apache.nifi.web.util.AffectedComponentUtils;
import org.apache.nifi.web.util.CancellableTimedPause;
import org.apache.nifi.web.util.ComponentLifecycle;
import org.apache.nifi.web.util.InvalidComponentAction;
import org.apache.nifi.web.util.LifecycleManagementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -99,33 +76,17 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@Path("/versions")
@Api(value = "/versions", description = "Endpoint for managing version control for a flow")
public class VersionsResource extends ApplicationResource {
public class VersionsResource extends FlowUpdateResource<VersionControlInformationEntity, VersionedFlowUpdateRequestEntity> {
private static final Logger logger = LoggerFactory.getLogger(VersionsResource.class);
private NiFiServiceFacade serviceFacade;
private Authorizer authorizer;
private ComponentLifecycle clusterComponentLifecycle;
private ComponentLifecycle localComponentLifecycle;
private DtoFactory dtoFactory;
private RequestManager<VersionControlInformationEntity, VersionControlInformationEntity> requestManager = new AsyncRequestManager<>(100, TimeUnit.MINUTES.toMillis(1L),
"Version Control Update Thread");
// We need to ensure that only a single Version Control Request can occur throughout the flow.
// Otherwise, User 1 could log into Node 1 and choose to Version Control Group A.
// At the same time, User 2 could log into Node 2 and choose to Version Control Group B, which is a child of Group A.
@ -872,32 +833,21 @@ public class VersionsResource extends ApplicationResource {
// view of the Process Group.
serviceFacade.verifyCanUpdate(groupId, requestFlowSnapshot, true, false);
},
(rev, entity) -> {
final VersionedFlowSnapshot flowSnapshot = entity.getVersionedFlowSnapshot();
final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
final Bucket bucket = flowSnapshot.getBucket();
final VersionedFlow flow = flowSnapshot.getFlow();
// Update the Process Group to match the proposed flow snapshot
(revision, entity) -> {
// prepare an entity similar to initial request to pass registry id to performUpdateFlow
final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO();
versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier());
versionControlInfoDto.setBucketName(bucket.getName());
versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier());
versionControlInfoDto.setFlowName(flow.getName());
versionControlInfoDto.setFlowDescription(flow.getDescription());
versionControlInfoDto.setGroupId(groupId);
versionControlInfoDto.setVersion(snapshotMetadata.getVersion());
versionControlInfoDto.setRegistryId(entity.getRegistryId());
versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(entity.getRegistryId()));
final VersionControlInformationEntity versionControlInfo = new VersionControlInformationEntity();
versionControlInfo.setVersionControlInformation(versionControlInfoDto);
final VersionedFlowState flowState = snapshotMetadata.getVersion() == flow.getVersionCount() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
versionControlInfoDto.setState(flowState.name());
final ProcessGroupEntity updatedGroup =
performUpdateFlow(groupId, revision, versionControlInfo, entity.getVersionedFlowSnapshot(),
getIdGenerationSeed().orElse(null), false,
entity.getUpdateDescendantVersionedFlows());
final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroupContents(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false,
false, entity.getUpdateDescendantVersionedFlows(), this::generateUuid);
final VersionControlInformationDTO updatedVci = updatedGroup.getComponent().getVersionControlInformation();
// response to replication request is a version control entity with revision and versioning info
final VersionControlInformationEntity responseEntity = new VersionControlInformationEntity();
responseEntity.setProcessGroupRevision(updatedGroup.getRevision());
responseEntity.setVersionControlInformation(updatedVci);
@ -929,7 +879,7 @@ public class VersionsResource extends ApplicationResource {
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response getUpdateRequest(@ApiParam("The ID of the Update Request") @PathParam("id") final String updateRequestId) {
return retrieveRequest("update-requests", updateRequestId);
return retrieveFlowUpdateRequest("update-requests", updateRequestId);
}
@GET
@ -954,41 +904,7 @@ public class VersionsResource extends ApplicationResource {
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response getRevertRequest(@ApiParam("The ID of the Revert Request") @PathParam("id") final String revertRequestId) {
return retrieveRequest("revert-requests", revertRequestId);
}
private Response retrieveRequest(final String requestType, final String requestId) {
if (requestId == null) {
throw new IllegalArgumentException("Request ID must be specified.");
}
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// request manager will ensure that the current is the user that submitted this request
final AsynchronousWebRequest<VersionControlInformationEntity, VersionControlInformationEntity> asyncRequest = requestManager.getRequest(requestType, requestId, user);
final VersionedFlowUpdateRequestDTO updateRequestDto = new VersionedFlowUpdateRequestDTO();
updateRequestDto.setComplete(asyncRequest.isComplete());
updateRequestDto.setFailureReason(asyncRequest.getFailureReason());
updateRequestDto.setLastUpdated(asyncRequest.getLastUpdated());
updateRequestDto.setProcessGroupId(asyncRequest.getComponentId());
updateRequestDto.setRequestId(requestId);
updateRequestDto.setUri(generateResourceUri("versions", requestType, requestId));
updateRequestDto.setState(asyncRequest.getState());
updateRequestDto.setPercentCompleted(asyncRequest.getPercentComplete());
if (updateRequestDto.isComplete()) {
final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(asyncRequest.getComponentId());
updateRequestDto.setVersionControlInformation(vciEntity == null ? null : vciEntity.getVersionControlInformation());
}
final RevisionDTO groupRevision = serviceFacade.getProcessGroup(asyncRequest.getComponentId()).getRevision();
final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity();
updateRequestEntity.setProcessGroupRevision(groupRevision);
updateRequestEntity.setRequest(updateRequestDto);
return generateOkResponse(updateRequestEntity).build();
return retrieveFlowUpdateRequest("revert-requests", revertRequestId);
}
@DELETE
@ -1020,7 +936,7 @@ public class VersionsResource extends ApplicationResource {
@QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
@ApiParam("The ID of the Update Request") @PathParam("id") final String updateRequestId) {
return deleteRequest("update-requests", updateRequestId, disconnectedNodeAcknowledged.booleanValue());
return deleteFlowUpdateRequest("update-requests", updateRequestId, disconnectedNodeAcknowledged.booleanValue());
}
@DELETE
@ -1052,57 +968,9 @@ public class VersionsResource extends ApplicationResource {
@QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged,
@ApiParam("The ID of the Revert Request") @PathParam("id") final String revertRequestId) {
return deleteRequest("revert-requests", revertRequestId, disconnectedNodeAcknowledged.booleanValue());
return deleteFlowUpdateRequest("revert-requests", revertRequestId, disconnectedNodeAcknowledged.booleanValue());
}
private Response deleteRequest(final String requestType, final String requestId, final boolean disconnectedNodeAcknowledged) {
if (requestId == null) {
throw new IllegalArgumentException("Request ID must be specified.");
}
if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(disconnectedNodeAcknowledged);
}
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// request manager will ensure that the current is the user that submitted this request
final AsynchronousWebRequest<VersionControlInformationEntity, VersionControlInformationEntity> asyncRequest = requestManager.removeRequest(requestType, requestId, user);
if (asyncRequest == null) {
throw new ResourceNotFoundException("Could not find request of type " + requestType + " with ID " + requestId);
}
if (!asyncRequest.isComplete()) {
asyncRequest.cancel();
}
final VersionedFlowUpdateRequestDTO updateRequestDto = new VersionedFlowUpdateRequestDTO();
updateRequestDto.setComplete(asyncRequest.isComplete());
updateRequestDto.setFailureReason(asyncRequest.getFailureReason());
updateRequestDto.setLastUpdated(asyncRequest.getLastUpdated());
updateRequestDto.setProcessGroupId(asyncRequest.getComponentId());
updateRequestDto.setRequestId(requestId);
updateRequestDto.setUri(generateResourceUri("versions", requestType, requestId));
updateRequestDto.setPercentCompleted(asyncRequest.getPercentComplete());
updateRequestDto.setState(asyncRequest.getState());
if (updateRequestDto.isComplete()) {
final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(asyncRequest.getComponentId());
updateRequestDto.setVersionControlInformation(vciEntity == null ? null : vciEntity.getVersionControlInformation());
}
final RevisionDTO groupRevision = serviceFacade.getProcessGroup(asyncRequest.getComponentId()).getRevision();
final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity();
updateRequestEntity.setProcessGroupRevision(groupRevision);
updateRequestEntity.setRequest(updateRequestDto);
return generateOkResponse(updateRequestEntity).build();
}
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@ -1136,12 +1004,7 @@ public class VersionsResource extends ApplicationResource {
@ApiParam("The process group id.") @PathParam("id") final String groupId,
@ApiParam(value = "The controller service configuration details.", required = true) final VersionControlInformationEntity requestEntity) {
// Verify the request
final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
if (revisionDto == null) {
throw new IllegalArgumentException("Process Group Revision must be specified");
}
// validate version control info
final VersionControlInformationDTO requestVersionControlInfoDto = requestEntity.getVersionControlInformation();
if (requestVersionControlInfoDto == null) {
throw new IllegalArgumentException("Version Control Information must be supplied.");
@ -1165,154 +1028,11 @@ public class VersionsResource extends ApplicationResource {
throw new IllegalArgumentException("The Version of the flow must be supplied.");
}
if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestEntity.isDisconnectedNodeAcknowledged());
}
// We will perform the updating of the Versioned Flow in a background thread because it can be a long-running process.
// In order to do this, we will need some parameters that are only available as Thread-Local variables to the current
// thread, so we will gather the values for these parameters up front.
final boolean replicateRequest = isReplicateRequest();
final ComponentLifecycle componentLifecycle = replicateRequest ? clusterComponentLifecycle : localComponentLifecycle;
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// Workflow for this process:
// 0. Obtain the versioned flow snapshot to use for the update
// a. Contact registry to download the desired version.
// b. Get Variable Registry of this Process Group and all ancestor groups
// c. Perform diff to find any new variables
// d. Get Variable Registry of any child Process Group in the versioned flow
// e. Perform diff to find any new variables
// f. Prompt user to fill in values for all new variables
// 1. Determine which components would be affected (and are enabled/running)
// a. Component itself is modified in some way, other than position changing.
// b. Source and Destination of any Connection that is modified.
// c. Any Processor or Controller Service that references a Controller Service that is modified.
// 2. Verify READ and WRITE permissions for user, for every component.
// 3. Verify that all components in the snapshot exist on all nodes (i.e., the NAR exists)?
// 4. Verify that Process Group is already under version control. If not, must start Version Control instead of updateFlow
// 5. Verify that Process Group is not 'dirty'.
// 6. Stop all Processors, Funnels, Ports that are affected.
// 7. Wait for all of the components to finish stopping.
// 8. Disable all Controller Services that are affected.
// 9. Wait for all Controller Services to finish disabling.
// 10. Ensure that if any connection was deleted, that it has no data in it. Ensure that no Input Port
// was removed, unless it currently has no incoming connections. Ensure that no Output Port was removed,
// unless it currently has no outgoing connections. Checking ports & connections could be done before
// stopping everything, but removal of Connections cannot.
// 11. Update variable registry to include new variables
// (only new variables so don't have to worry about affected components? Or do we need to in case a processor
// is already referencing the variable? In which case we need to include the affected components above in the
// Set of affected components before stopping/disabling.).
// 12. Update components in the Process Group; update Version Control Information.
// 13. Re-Enable all affected Controller Services that were not removed.
// 14. Re-Start all Processors, Funnels, Ports that are affected and not removed.
// Step 0: Get the Versioned Flow Snapshot from the Flow Registry
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true);
// The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// If there are any Controller Services referenced that are inherited from the parent group, resolve those to point to the appropriate Controller Service, if we are able to.
serviceFacade.resolveInheritedControllerServices(flowSnapshot, groupId, NiFiUserUtils.getNiFiUser());
// Step 1: Determine which components will be affected by updating the version
final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot);
// build a request wrapper
final InitiateChangeFlowVersionRequestWrapper requestWrapper = new InitiateChangeFlowVersionRequestWrapper(requestEntity, componentLifecycle, getAbsolutePath(), affectedComponents,
replicateRequest, flowSnapshot);
final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
return withWriteLock(
serviceFacade,
requestWrapper,
requestRevision,
lookup -> {
// Step 2: Verify READ and WRITE permissions for user, for every component.
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true, true);
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, false, true, true, false);
final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
final Set<ConfigurableComponent> restrictedComponents = FlowRegistryUtils.getRestrictedComponents(groupContents, serviceFacade);
restrictedComponents.forEach(restrictedComponent -> {
final ComponentAuthorizable restrictedComponentAuthorizable = lookup.getConfigurableComponent(restrictedComponent);
authorizeRestrictions(authorizer, restrictedComponentAuthorizable);
});
final Map<String, VersionedParameterContext> parameterContexts = flowSnapshot.getParameterContexts();
if (parameterContexts != null) {
parameterContexts.values().forEach(context -> AuthorizeParameterReference.authorizeParameterContextAddition(context, serviceFacade, authorizer, lookup, user));
}
},
() -> {
// Step 3: Verify that all components in the snapshot exist on all nodes
// Step 4: Verify that Process Group is already under version control. If not, must start Version Control instead of updating flow
// Step 5: Verify that Process Group is not 'dirty'
serviceFacade.verifyCanUpdate(groupId, flowSnapshot, false, true);
},
(revision, wrapper) -> {
final String idGenerationSeed = getIdGenerationSeed().orElse(null);
// Create an asynchronous request that will occur in the background, because this request may
// result in stopping components, which can take an indeterminate amount of time.
final String requestId = UUID.randomUUID().toString();
final AsynchronousWebRequest<VersionControlInformationEntity, VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, requestEntity, groupId, user,
getUpdateSteps());
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity, VersionControlInformationEntity>> updateTask = vcur -> {
try {
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, wrapper.getComponentLifecycle(), wrapper.getExampleUri(),
wrapper.getAffectedComponents(), wrapper.isReplicateRequest(), revision, wrapper.getVersionControlInformationEntity(), wrapper.getFlowSnapshot(), request,
idGenerationSeed, true, true);
vcur.markStepComplete(updatedVersionControlEntity);
} catch (final ResumeFlowException rfe) {
// Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow
// since in this case the flow was successfully updated - we just couldn't re-enable the components.
logger.warn(rfe.getMessage(), rfe);
vcur.fail(rfe.getMessage());
} catch (final Exception e) {
logger.error("Failed to update flow to new version", e);
vcur.fail("Failed to update flow to new version due to " + e);
}
};
requestManager.submitRequest("update-requests", requestId, request, updateTask);
// Generate the response.
final VersionedFlowUpdateRequestDTO updateRequestDto = new VersionedFlowUpdateRequestDTO();
updateRequestDto.setComplete(request.isComplete());
updateRequestDto.setFailureReason(request.getFailureReason());
updateRequestDto.setLastUpdated(request.getLastUpdated());
updateRequestDto.setProcessGroupId(groupId);
updateRequestDto.setRequestId(requestId);
updateRequestDto.setUri(generateResourceUri("versions", "update-requests", requestId));
updateRequestDto.setPercentCompleted(request.getPercentComplete());
updateRequestDto.setState(request.getState());
final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity();
final RevisionDTO groupRevision = serviceFacade.getProcessGroup(groupId).getRevision();
updateRequestEntity.setProcessGroupRevision(groupRevision);
updateRequestEntity.setRequest(updateRequestDto);
return generateOkResponse(updateRequestEntity).build();
});
}
private List<UpdateStep> getUpdateSteps() {
final List<UpdateStep> updateSteps = new ArrayList<>();
updateSteps.add(new StandardUpdateStep("Stopping Affected Processors"));
updateSteps.add(new StandardUpdateStep("Disabling Affected Controller Services"));
updateSteps.add(new StandardUpdateStep("Updating Flow"));
updateSteps.add(new StandardUpdateStep("Re-Enabling Controller Services"));
updateSteps.add(new StandardUpdateStep("Restarting Affected Processors"));
return updateSteps;
// supplier retrieves Versioned Flow Snapshot from the Flow Registry
return initiateFlowUpdate(groupId, requestEntity, false,"update-requests",
"/nifi-api/versions/process-groups/" + groupId,
() -> serviceFacade.getVersionedFlowSnapshot(requestVersionControlInfoDto, true)
);
}
@POST
@ -1399,42 +1119,26 @@ public class VersionsResource extends ApplicationResource {
serviceFacade.resolveInheritedControllerServices(flowSnapshot, groupId, NiFiUserUtils.getNiFiUser());
// Step 1: Determine which components will be affected by updating the version
final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot);
final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByFlowUpdate(groupId, flowSnapshot);
// build a request wrapper
final InitiateChangeFlowVersionRequestWrapper requestWrapper = new InitiateChangeFlowVersionRequestWrapper(requestEntity, componentLifecycle, getAbsolutePath(), affectedComponents,
replicateRequest, flowSnapshot);
final InitiateUpdateFlowRequestWrapper requestWrapper =
new InitiateUpdateFlowRequestWrapper(requestEntity, componentLifecycle, "revert-requests", getAbsolutePath(),
"/nifi-api/versions/process-groups/" + groupId, affectedComponents, replicateRequest, flowSnapshot);
final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
return withWriteLock(
serviceFacade,
requestWrapper,
requestRevision,
lookup -> {
// Step 2: Verify READ and WRITE permissions for user, for every component.
final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true, true);
authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, false, true, true, false);
final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
final Set<ConfigurableComponent> restrictedComponents = FlowRegistryUtils.getRestrictedComponents(groupContents, serviceFacade);
restrictedComponents.forEach(restrictedComponent -> {
final ComponentAuthorizable restrictedComponentAuthorizable = lookup.getConfigurableComponent(restrictedComponent);
authorizeRestrictions(authorizer, restrictedComponentAuthorizable);
});
final Map<String, VersionedParameterContext> parameterContexts = flowSnapshot.getParameterContexts();
if (parameterContexts != null) {
parameterContexts.values().forEach(context -> AuthorizeParameterReference.authorizeParameterContextAddition(context, serviceFacade, authorizer, lookup, user));
}
},
lookup -> authorizeFlowUpdate(lookup, user, groupId, flowSnapshot),
() -> {
// Step 3: Verify that all components in the snapshot exist on all nodes
// Step 4: Verify that Process Group is already under version control. If not, must start Version Control instead of updating flow
serviceFacade.verifyCanRevertLocalModifications(groupId, flowSnapshot);
},
(revision, wrapper) -> {
final VersionControlInformationEntity versionControlInformationEntity = wrapper.getVersionControlInformationEntity();
final VersionControlInformationEntity versionControlInformationEntity = wrapper.getRequestEntity();
final VersionControlInformationDTO versionControlInformationDTO = versionControlInformationEntity.getVersionControlInformation();
// Ensure that the information passed in is correct
@ -1457,322 +1161,83 @@ public class VersionsResource extends ApplicationResource {
throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with.");
}
final String idGenerationSeed = getIdGenerationSeed().orElse(null);
// Create an asynchronous request that will occur in the background, because this request may
// result in stopping components, which can take an indeterminate amount of time.
final String requestId = UUID.randomUUID().toString();
final AsynchronousWebRequest<VersionControlInformationEntity, VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, requestEntity, groupId, user,
getUpdateSteps());
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity, VersionControlInformationEntity>> updateTask = vcur -> {
try {
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, wrapper.getComponentLifecycle(), wrapper.getExampleUri(),
wrapper.getAffectedComponents(), wrapper.isReplicateRequest(), revision, versionControlInformationEntity, wrapper.getFlowSnapshot(), request,
idGenerationSeed, false, true);
vcur.markStepComplete(updatedVersionControlEntity);
} catch (final ResumeFlowException rfe) {
// Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow
// since in this case the flow was successfully updated - we just couldn't re-enable the components.
logger.warn(rfe.getMessage(), rfe);
vcur.fail(rfe.getMessage());
} catch (final Exception e) {
logger.error("Failed to update flow to new version", e);
vcur.fail("Failed to update flow to new version due to " + e.getMessage());
}
};
requestManager.submitRequest("revert-requests", requestId, request, updateTask);
// Generate the response.
final VersionedFlowUpdateRequestDTO updateRequestDto = new VersionedFlowUpdateRequestDTO();
updateRequestDto.setComplete(request.isComplete());
updateRequestDto.setFailureReason(request.getFailureReason());
updateRequestDto.setLastUpdated(request.getLastUpdated());
updateRequestDto.setProcessGroupId(groupId);
updateRequestDto.setRequestId(requestId);
updateRequestDto.setState(request.getState());
updateRequestDto.setPercentCompleted(request.getPercentComplete());
updateRequestDto.setUri(generateResourceUri("versions", "revert-requests", requestId));
final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity();
final RevisionDTO groupRevision = serviceFacade.getProcessGroup(groupId).getRevision();
updateRequestEntity.setProcessGroupRevision(groupRevision);
updateRequestEntity.setRequest(updateRequestDto);
return generateOkResponse(updateRequestEntity).build();
return submitFlowUpdateRequest(user, groupId, revision, wrapper,true);
});
}
private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri,
final Set<AffectedComponentEntity> affectedComponents, final boolean replicateRequest, final Revision revision, final VersionControlInformationEntity requestEntity,
final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest<VersionControlInformationEntity, VersionControlInformationEntity> asyncRequest, final String idGenerationSeed,
final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException, ResumeFlowException {
// Steps 6-7: Determine which components must be stopped and stop them.
final Set<String> stoppableReferenceTypes = new HashSet<>();
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
final Set<AffectedComponentEntity> runningComponents = affectedComponents.stream()
.filter(dto -> stoppableReferenceTypes.contains(dto.getComponent().getReferenceType()))
.filter(dto -> "Running".equalsIgnoreCase(dto.getComponent().getState()))
.collect(Collectors.toSet());
logger.info("Stopping {} Processors", runningComponents.size());
final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(stopComponentsPause::cancel);
componentLifecycle.scheduleComponents(exampleUri, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.SKIP);
if (asyncRequest.isCancelled()) {
return null;
}
asyncRequest.markStepComplete();
// Steps 8-9. Disable enabled controller services that are affected
final Set<AffectedComponentEntity> enabledServices = affectedComponents.stream()
.filter(dto -> AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(dto.getComponent().getReferenceType()))
.filter(dto -> "Enabled".equalsIgnoreCase(dto.getComponent().getState()))
.collect(Collectors.toSet());
logger.info("Disabling {} Controller Services", enabledServices.size());
final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(disableServicesPause::cancel);
componentLifecycle.activateControllerServices(exampleUri, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.SKIP);
if (asyncRequest.isCancelled()) {
return null;
}
asyncRequest.markStepComplete();
/**
* Perform actual flow update of the specified flow. This is used for the initial flow update and replication updates.
*/
@Override
protected ProcessGroupEntity performUpdateFlow(final String groupId, final Revision revision,
final VersionControlInformationEntity requestEntity,
final VersionedFlowSnapshot flowSnapshot, final String idGenerationSeed,
final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) {
logger.info("Updating Process Group with ID {} to version {} of the Versioned Flow", groupId, flowSnapshot.getSnapshotMetadata().getVersion());
// If replicating request, steps 10-12 are performed on each node individually, and this is accomplished
// by replicating a PUT to /nifi-api/versions/process-groups/{groupId}
try {
if (replicateRequest) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// Step 10-11. Update Process Group to the new flow and update variable registry with any Variables that were added or removed
final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation();
final URI updateUri;
try {
updateUri = new URI(exampleUri.getScheme(), exampleUri.getUserInfo(), exampleUri.getHost(),
exampleUri.getPort(), "/nifi-api/versions/process-groups/" + groupId, null, exampleUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final Bucket bucket = flowSnapshot.getBucket();
final VersionedFlow flow = flowSnapshot.getFlow();
final Map<String, String> headers = new HashMap<>();
headers.put("content-type", MediaType.APPLICATION_JSON);
final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
final VersionControlInformationDTO versionControlInfo = new VersionControlInformationDTO();
versionControlInfo.setBucketId(metadata.getBucketIdentifier());
versionControlInfo.setBucketName(bucket.getName());
versionControlInfo.setFlowDescription(flow.getDescription());
versionControlInfo.setFlowId(flow.getIdentifier());
versionControlInfo.setFlowName(flow.getName());
versionControlInfo.setGroupId(groupId);
versionControlInfo.setRegistryId(requestVci.getRegistryId());
versionControlInfo.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId()));
versionControlInfo.setVersion(metadata.getVersion());
versionControlInfo.setState(flowSnapshot.isLatest() ? VersionedFlowState.UP_TO_DATE.name() : VersionedFlowState.STALE.name());
final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity();
snapshotEntity.setProcessGroupRevision(dtoFactory.createRevisionDTO(revision));
snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId());
snapshotEntity.setVersionedFlow(flowSnapshot);
snapshotEntity.setUpdateDescendantVersionedFlows(updateDescendantVersionedFlows);
final NodeResponse clusterResponse;
try {
logger.debug("Replicating PUT request to {} for user {}", updateUri, user);
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse();
} else {
clusterResponse = getRequestReplicator().forwardToCoordinator(
getClusterCoordinatorNode(), user, HttpMethod.PUT, updateUri, snapshotEntity, headers).awaitMergedResponse();
}
} catch (final InterruptedException ie) {
logger.warn("Interrupted while replicating PUT request to {} for user {}", updateUri, user);
Thread.currentThread().interrupt();
throw new LifecycleManagementException("Interrupted while updating flows across cluster", ie);
}
final int updateFlowStatus = clusterResponse.getStatus();
if (updateFlowStatus != Status.OK.getStatusCode()) {
final String explanation = getResponseEntity(clusterResponse, String.class);
logger.error("Failed to update flow across cluster when replicating PUT request to {} for user {}. Received {} response with explanation: {}",
updateUri, user, updateFlowStatus, explanation);
throw new LifecycleManagementException("Failed to update Flow on all nodes in cluster due to " + explanation);
}
} else {
// Step 10: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot,
// that it has no data in it. Ensure that no Input Port was removed, unless it currently has no incoming connections.
// Ensure that no Output Port was removed, unless it currently has no outgoing connections.
serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, verifyNotModified);
// Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed
final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation();
final Bucket bucket = flowSnapshot.getBucket();
final VersionedFlow flow = flowSnapshot.getFlow();
final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
final VersionControlInformationDTO vci = new VersionControlInformationDTO();
vci.setBucketId(metadata.getBucketIdentifier());
vci.setBucketName(bucket.getName());
vci.setFlowDescription(flow.getDescription());
vci.setFlowId(flow.getIdentifier());
vci.setFlowName(flow.getName());
vci.setGroupId(groupId);
vci.setRegistryId(requestVci.getRegistryId());
vci.setRegistryName(serviceFacade.getFlowRegistryName(requestVci.getRegistryId()));
vci.setVersion(metadata.getVersion());
vci.setState(flowSnapshot.isLatest() ? VersionedFlowState.UP_TO_DATE.name() : VersionedFlowState.STALE.name());
serviceFacade.updateProcessGroupContents(revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false, updateDescendantVersionedFlows,
this::generateUuid);
}
} finally {
if (!asyncRequest.isCancelled()) {
if (logger.isDebugEnabled()) {
logger.debug("Re-Enabling {} Controller Services: {}", enabledServices.size(), enabledServices);
}
asyncRequest.markStepComplete();
// Step 13. Re-enable all disabled controller services
final CancellableTimedPause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(enableServicesPause::cancel);
final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices);
logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
try {
componentLifecycle.activateControllerServices(exampleUri, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
} catch (final IllegalStateException ise) {
// Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
throw new ResumeFlowException("Successfully updated flow but could not re-enable all Controller Services because " + ise.getMessage(), ise);
}
}
if (!asyncRequest.isCancelled()) {
if (logger.isDebugEnabled()) {
logger.debug("Restart {} Processors: {}", runningComponents.size(), runningComponents);
}
asyncRequest.markStepComplete();
// Step 14. Restart all components
final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents);
// If there are any Remote Group Ports that are supposed to be started and have no connections, we want to remove those from our Set.
// This will happen if the Remote Group Port is transmitting when the version change happens but the new flow version does not have
// a connection to the port. In such a case, the Port still is included in the Updated Entities because we do not remove them
// when updating the flow (they are removed in the background).
final Set<AffectedComponentEntity> avoidStarting = new HashSet<>();
for (final AffectedComponentEntity componentEntity : componentsToStart) {
final AffectedComponentDTO componentDto = componentEntity.getComponent();
final String referenceType = componentDto.getReferenceType();
if (!AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT.equals(referenceType)
&& !AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT.equals(referenceType)) {
continue;
}
boolean startComponent;
try {
startComponent = serviceFacade.isRemoteGroupPortConnected(componentDto.getProcessGroupId(), componentDto.getId());
} catch (final ResourceNotFoundException rnfe) {
// Could occur if RPG is refreshed at just the right time.
startComponent = false;
}
// We must add the components to avoid starting to a separate Set and then remove them below,
// rather than removing the component here, because doing so would result in a ConcurrentModificationException.
if (!startComponent) {
avoidStarting.add(componentEntity);
}
}
componentsToStart.removeAll(avoidStarting);
final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(startComponentsPause::cancel);
logger.info("Restarting {} Processors", componentsToStart.size());
try {
componentLifecycle.scheduleComponents(exampleUri, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause, InvalidComponentAction.SKIP);
} catch (final IllegalStateException ise) {
// Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
throw new ResumeFlowException("Successfully updated flow but could not restart all Processors because " + ise.getMessage(), ise);
}
}
}
asyncRequest.setCancelCallback(null);
if (asyncRequest.isCancelled()) {
return null;
}
return serviceFacade.getVersionControlInformation(groupId);
return serviceFacade.updateProcessGroupContents(revision, groupId, versionControlInfo, flowSnapshot, idGenerationSeed,
verifyNotModified, false, updateDescendantVersionedFlows);
}
/**
* Extracts the response entity from the specified node response.
*
* @param nodeResponse node response
* @param clazz class
* @param <T> type of class
* @return the response entity
* Create the entity that is used for update flow replication. Versioned flow update replication creates a new entity type containing
* the actual versioned flow snapshot and a registry identifier.
*/
@SuppressWarnings("unchecked")
private <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) {
T entity = (T) nodeResponse.getUpdatedEntity();
if (entity == null) {
entity = nodeResponse.getClientResponse().readEntity(clazz);
@Override
protected Entity createReplicateUpdateFlowEntity(final Revision revision, final VersionControlInformationEntity requestEntity,
final VersionedFlowSnapshot flowSnapshot) {
final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity();
snapshotEntity.setProcessGroupRevision(dtoFactory.createRevisionDTO(revision));
snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId());
snapshotEntity.setVersionedFlow(flowSnapshot);
snapshotEntity.setUpdateDescendantVersionedFlows(true);
return snapshotEntity;
}
/**
* Create the entity that captures the status and result of an update or revert request
*
* @return a new instance of a VersionedFlowUpdateRequestEntity
*/
@Override
protected VersionedFlowUpdateRequestEntity createUpdateRequestEntity() {
return new VersionedFlowUpdateRequestEntity();
}
/**
* Finalize a completed update request for an existing update or revert request. This is used when retrieving and deleting an update request.
*
* @param requestEntity the request entity to finalize
*/
@Override
protected void finalizeCompletedUpdateRequest(final VersionedFlowUpdateRequestEntity requestEntity) {
final VersionedFlowUpdateRequestDTO updateRequestDto = requestEntity.getRequest();
if (updateRequestDto.isComplete()) {
final VersionControlInformationEntity vciEntity =
serviceFacade.getVersionControlInformation(updateRequestDto.getProcessGroupId());
updateRequestDto.setVersionControlInformation(vciEntity == null ? null : vciEntity.getVersionControlInformation());
}
return entity;
}
private Set<AffectedComponentEntity> getUpdatedEntities(final Set<AffectedComponentEntity> originalEntities) {
final Set<AffectedComponentEntity> entities = new LinkedHashSet<>();
for (final AffectedComponentEntity original : originalEntities) {
try {
final AffectedComponentEntity updatedEntity = AffectedComponentUtils.updateEntity(original, serviceFacade, dtoFactory);
if (updatedEntity != null) {
entities.add(updatedEntity);
}
} catch (final ResourceNotFoundException rnfe) {
// Component was removed. Just continue on without adding anything to the entities.
// We do this because the intent is to get updated versions of the entities with current
// Revisions so that we can change the states of the components. If the component was removed,
// then we can just drop the entity, since there is no need to change its state.
}
}
return entities;
}
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
public void setAuthorizer(Authorizer authorizer) {
this.authorizer = authorizer;
}
public void setClusterComponentLifecycle(ComponentLifecycle componentLifecycle) {
this.clusterComponentLifecycle = componentLifecycle;
}
public void setLocalComponentLifecycle(ComponentLifecycle componentLifecycle) {
this.localComponentLifecycle = componentLifecycle;
}
public void setDtoFactory(final DtoFactory dtoFactory) {
this.dtoFactory = dtoFactory;
}
private static class ActiveRequest {
private static final long MAX_REQUEST_LOCK_NANOS = TimeUnit.MINUTES.toNanos(1L);
@ -1814,50 +1279,4 @@ public class VersionsResource extends ApplicationResource {
return updatePerformed;
}
}
private static class InitiateChangeFlowVersionRequestWrapper extends Entity {
private final VersionControlInformationEntity versionControlInformationEntity;
private final ComponentLifecycle componentLifecycle;
private final URI exampleUri;
private final Set<AffectedComponentEntity> affectedComponents;
private final boolean replicateRequest;
private final VersionedFlowSnapshot flowSnapshot;
public InitiateChangeFlowVersionRequestWrapper(final VersionControlInformationEntity versionControlInformationEntity, final ComponentLifecycle componentLifecycle,
final URI exampleUri, final Set<AffectedComponentEntity> affectedComponents, final boolean replicateRequest,
final VersionedFlowSnapshot flowSnapshot) {
this.versionControlInformationEntity = versionControlInformationEntity;
this.componentLifecycle = componentLifecycle;
this.exampleUri = exampleUri;
this.affectedComponents = affectedComponents;
this.replicateRequest = replicateRequest;
this.flowSnapshot = flowSnapshot;
}
public VersionControlInformationEntity getVersionControlInformationEntity() {
return versionControlInformationEntity;
}
public ComponentLifecycle getComponentLifecycle() {
return componentLifecycle;
}
public URI getExampleUri() {
return exampleUri;
}
public Set<AffectedComponentEntity> getAffectedComponents() {
return affectedComponents;
}
public boolean isReplicateRequest() {
return replicateRequest;
}
public VersionedFlowSnapshot getFlowSnapshot() {
return flowSnapshot;
}
}
}

View File

@ -408,11 +408,14 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows);
group.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.flowSnapshot(proposedSnapshot.getFlowContents())
.build();
// process group being updated may not be versioned
if (versionControlInformation != null) {
final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.flowSnapshot(proposedSnapshot.getFlowContents())
.build();
group.setVersionControlInformation(svci, Collections.emptyMap());
}
group.setVersionControlInformation(svci, Collections.emptyMap());
group.onComponentModified();
return group;

View File

@ -318,6 +318,8 @@
<property name="requestReplicator" ref="requestReplicator" />
<property name="authorizer" ref="authorizer"/>
<property name="flowController" ref="flowController" />
<property name="clusterComponentLifecycle" ref="clusterComponentLifecycle" />
<property name="localComponentLifecycle" ref="localComponentLifecycle" />
<property name="dtoFactory" ref="dtoFactory" />
</bean>
<bean id="versionsResource" class="org.apache.nifi.web.api.VersionsResource" scope="singleton">

View File

@ -17,6 +17,7 @@
package org.apache.nifi.web;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
@ -181,6 +182,8 @@ public class StandardNiFiServiceFacadeTest {
final ControllerFacade controllerFacade = new ControllerFacade();
controllerFacade.setFlowController(flowController);
processGroupDAO = mock(ProcessGroupDAO.class);
serviceFacade = new StandardNiFiServiceFacade();
serviceFacade.setAuditService(auditService);
serviceFacade.setAuthorizableLookup(authorizableLookup);
@ -188,6 +191,8 @@ public class StandardNiFiServiceFacadeTest {
serviceFacade.setEntityFactory(new EntityFactory());
serviceFacade.setDtoFactory(new DtoFactory());
serviceFacade.setControllerFacade(controllerFacade);
serviceFacade.setProcessGroupDAO(processGroupDAO);
}
private FlowChangeAction getAction(final Integer actionId, final String processorId) {
@ -300,8 +305,6 @@ public class StandardNiFiServiceFacadeTest {
final String groupId = UUID.randomUUID().toString();
final ProcessGroup processGroup = mock(ProcessGroup.class);
final ProcessGroupDAO processGroupDAO = mock(ProcessGroupDAO.class);
serviceFacade.setProcessGroupDAO(processGroupDAO);
when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup);
final FlowManager flowManager = mock(FlowManager.class);
@ -347,4 +350,49 @@ public class StandardNiFiServiceFacadeTest {
assertNull(versionedFlowSnapshot.getSnapshotMetadata());
}
@Test
public void testIsAnyProcessGroupUnderVersionControl_None() {
final String groupId = UUID.randomUUID().toString();
final ProcessGroup processGroup = mock(ProcessGroup.class);
final ProcessGroup childProcessGroup = mock(ProcessGroup.class);
when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup);
when(processGroup.getVersionControlInformation()).thenReturn(null);
when(processGroup.getProcessGroups()).thenReturn(Sets.newHashSet(childProcessGroup));
when(childProcessGroup.getVersionControlInformation()).thenReturn(null);
assertFalse(serviceFacade.isAnyProcessGroupUnderVersionControl(groupId));
}
@Test
public void testIsAnyProcessGroupUnderVersionControl_PrimaryGroup() {
final String groupId = UUID.randomUUID().toString();
final ProcessGroup processGroup = mock(ProcessGroup.class);
when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup);
final VersionControlInformation vci = mock(VersionControlInformation.class);
when(processGroup.getVersionControlInformation()).thenReturn(vci);
when(processGroup.getProcessGroups()).thenReturn(Sets.newHashSet());
assertTrue(serviceFacade.isAnyProcessGroupUnderVersionControl(groupId));
}
@Test
public void testIsAnyProcessGroupUnderVersionControl_ChildGroup() {
final String groupId = UUID.randomUUID().toString();
final ProcessGroup processGroup = mock(ProcessGroup.class);
final ProcessGroup childProcessGroup = mock(ProcessGroup.class);
when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup);
final VersionControlInformation vci = mock(VersionControlInformation.class);
when(processGroup.getVersionControlInformation()).thenReturn(null);
when(processGroup.getProcessGroups()).thenReturn(Sets.newHashSet(childProcessGroup));
when(childProcessGroup.getVersionControlInformation()).thenReturn(vci);
assertTrue(serviceFacade.isAnyProcessGroupUnderVersionControl(groupId));
}
}

View File

@ -20,6 +20,10 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.web.NiFiServiceFacade;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import javax.ws.rs.core.Response;
import java.util.UUID;
@ -28,12 +32,18 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class TestProcessGroupResource {
@InjectMocks
private ProcessGroupResource processGroupResource = new ProcessGroupResource();
@Mock
private NiFiServiceFacade serviceFacade;
@Test
public void testExportProcessGroup() {
final String groupId = UUID.randomUUID().toString();
final NiFiServiceFacade serviceFacade = mock(NiFiServiceFacade.class);
final VersionedFlowSnapshot versionedFlowSnapshot = mock(VersionedFlowSnapshot.class);
when(serviceFacade.getCurrentFlowSnapshotByGroupId(groupId)).thenReturn(versionedFlowSnapshot);
@ -43,9 +53,7 @@ public class TestProcessGroupResource {
when(versionedFlowSnapshot.getFlowContents()).thenReturn(versionedProcessGroup);
when(versionedProcessGroup.getName()).thenReturn(flowName);
final ProcessGroupResource resource = getProcessGroupResource(serviceFacade);
final Response response = resource.exportProcessGroup(groupId);
final Response response = processGroupResource.exportProcessGroup(groupId);
final VersionedFlowSnapshot resultEntity = (VersionedFlowSnapshot)response.getEntity();
@ -53,10 +61,4 @@ public class TestProcessGroupResource {
assertEquals(versionedFlowSnapshot, resultEntity);
}
private ProcessGroupResource getProcessGroupResource(final NiFiServiceFacade serviceFacade) {
final ProcessGroupResource resource = new ProcessGroupResource();
resource.setServiceFacade(serviceFacade);
return resource;
}
}

View File

@ -22,6 +22,10 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.web.NiFiServiceFacade;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import javax.ws.rs.core.Response;
import java.util.UUID;
@ -31,12 +35,18 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class TestVersionsResource {
@InjectMocks
private VersionsResource versionsResource = new VersionsResource();
@Mock
private NiFiServiceFacade serviceFacade;
@Test
public void testExportFlowVersion() {
final String groupId = UUID.randomUUID().toString();
final NiFiServiceFacade serviceFacade = mock(NiFiServiceFacade.class);
final VersionedFlowSnapshot versionedFlowSnapshot = mock(VersionedFlowSnapshot.class);
when(serviceFacade.getVersionedFlowSnapshotByGroupId(groupId)).thenReturn(versionedFlowSnapshot);
@ -55,9 +65,7 @@ public class TestVersionsResource {
when(versionedProcessGroup.getProcessGroups()).thenReturn(Sets.newHashSet(innerVersionedProcessGroup));
when(innerVersionedProcessGroup.getProcessGroups()).thenReturn(Sets.newHashSet(innerInnerVersionedProcessGroup));
final VersionsResource resource = getVersionsResource(serviceFacade);
final Response response = resource.exportFlowVersion(groupId);
final Response response = versionsResource.exportFlowVersion(groupId);
final VersionedFlowSnapshot resultEntity = (VersionedFlowSnapshot)response.getEntity();
@ -72,10 +80,4 @@ public class TestVersionsResource {
verify(innerInnerVersionedProcessGroup).setVersionedFlowCoordinates(null);
}
private VersionsResource getVersionsResource(final NiFiServiceFacade serviceFacade) {
final VersionsResource resource = new VersionsResource();
resource.setServiceFacade(serviceFacade);
return resource;
}
}