NIFI-4436: More intelligently flag a ProcessGroup to indicate whether or not it has any local modifications compared to Versioned Flow - Bug fixes - Updated to include status of a Versioned Process Group to include VersionedFlowState and explanation

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Mark Payne 2017-11-28 12:33:00 -05:00 committed by Bryan Bende
parent d34fb5e2ef
commit fdef5b5605
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
30 changed files with 749 additions and 590 deletions

View File

@ -34,6 +34,8 @@ public class VersionControlInformationDTO {
private Integer version;
private Boolean modified;
private Boolean current;
private String state;
private String stateExplanation;
@ApiModelProperty("The ID of the Process Group that is under version control")
public String getGroupId() {
@ -135,4 +137,24 @@ public class VersionControlInformationDTO {
public void setCurrent(Boolean current) {
this.current = current;
}
@ApiModelProperty(readOnly = true,
value = "The current state of the Process Group, as it relates to the Versioned Flow",
allowableValues = "LOCALLY_MODIFIED_DESCENDANT, LOCALLY_MODIFIED, STALE, LOCALLY_MODIFIED_AND_STALE, UP_TO_DATE")
public String getState() {
return state;
}
public void setState(final String state) {
this.state = state;
}
@ApiModelProperty(readOnly = true, value = "Explanation of why the group is in the specified state")
public String getStateExplanation() {
return stateExplanation;
}
public void setStateExplanation(String explanation) {
this.stateExplanation = explanation;
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller.service;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
@ -27,7 +28,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
public interface ControllerServiceNode extends ConfiguredComponent, VersionedComponent {
public interface ControllerServiceNode extends ConfiguredComponent, ConfigurableComponent, VersionedComponent {
/**
* @return the Process Group that this Controller Service belongs to, or <code>null</code> if the Controller Service

View File

@ -462,11 +462,11 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
/**
* @param id of the Controller Service
* @return the Controller Service with the given ID, if it exists as a child or
* descendant of this ProcessGroup. This performs a recursive search of all
* descendant ProcessGroups
* @param includeDescendantGroups whether or not to include descendant process groups
* @param includeAncestorGroups whether or not to include ancestor process groups
* @return the Controller Service with the given ID
*/
ControllerServiceNode findControllerService(String id);
ControllerServiceNode findControllerService(String id, boolean includeDescendantGroups, boolean includeAncestorGroups);
/**
* @return a List of all Controller Services contained within this ProcessGroup and any child Process Groups
@ -976,4 +976,9 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @param flowRegistry the Flow Registry to synchronize with
*/
void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry);
/**
* Called whenever a component within this group or the group itself is modified
*/
void onComponentModified();
}

View File

@ -61,9 +61,9 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable,
void setName(String name);
void setInputPorts(Set<RemoteProcessGroupPortDescriptor> ports);
void setInputPorts(Set<RemoteProcessGroupPortDescriptor> ports, boolean pruneUnusedPorts);
void setOutputPorts(Set<RemoteProcessGroupPortDescriptor> ports);
void setOutputPorts(Set<RemoteProcessGroupPortDescriptor> ports, boolean pruneUnusedPorts);
Set<RemoteGroupPort> getInputPorts();
@ -215,11 +215,6 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable,
*/
void reinitialize(boolean isClustered);
/**
* Removes all non existent ports from this RemoteProcessGroup.
*/
void removeAllNonExistentPorts();
/**
* Removes a port that no longer exists on the remote instance from this
* RemoteProcessGroup

View File

@ -76,6 +76,11 @@ public interface VersionControlInformation {
*/
boolean isCurrent();
/**
* @return the current status of the Process Group as it relates to the associated Versioned Flow.
*/
VersionedFlowStatus getStatus();
/**
* @return the snapshot of the flow that was synchronized with the Flow Registry
*/

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow;
public enum VersionedFlowState {
/**
* We are unable to communicate with the Flow Registry in order to determine the appropriate state
*/
SYNC_FAILURE,
/**
* This Process Group (or a child/descendant Process Group that is not itself under Version Control)
* is on the latest version of the Versioned Flow, but is different than the Versioned Flow that is
* stored in the Flow Registry.
*/
LOCALLY_MODIFIED,
/**
* This Process Group has not been modified since it was last synchronized with the Flow Registry, but
* the Flow Registry has a newer version of the flow than what is contained in this Process Group.
*/
STALE,
/**
* This Process Group (or a child/descendant Process Group that is not itself under Version Control)
* has been modified since it was last synchronized with the Flow Registry, and the Flow Registry has
* a newer version of the flow than what is contained in this Process Group.
*/
LOCALLY_MODIFIED_AND_STALE,
/**
* This Process Group and all child/descendant Process Groups are on the latest version of the flow in
* the Flow Registry and have no local modifications.
*/
UP_TO_DATE;
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow;
public interface VersionedFlowStatus {
/**
* @return the current state of the versioned process group
*/
VersionedFlowState getState();
/**
* @return an explanation of why the process group is in the state that it is in.
*/
String getStateExplanation();
}

View File

@ -16,6 +16,39 @@
*/
package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@ -169,7 +202,6 @@ import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.remote.HttpRemoteSiteListener;
@ -225,38 +257,6 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider,
QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider, IdentifierLookup, ReloadComponent {
@ -1983,14 +1983,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (remoteGroupDTO.getContents() != null) {
final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents();
// ensure there input ports
// ensure there are input ports
if (contents.getInputPorts() != null) {
remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()));
remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()), false);
}
// ensure there are output ports
if (contents.getOutputPorts() != null) {
remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()));
remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()), false);
}
}
@ -2035,12 +2035,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
instantiateSnippet(childGroup, childTemplateDTO, false);
if (groupDTO.getVersionControlInformation() != null) {
final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup versionedGroup = flowMapper.mapProcessGroup(childGroup, getFlowRegistryClient(), false);
final VersionControlInformation vci = StandardVersionControlInformation.Builder
.fromDto(groupDTO.getVersionControlInformation())
.flowSnapshot(versionedGroup)
.build();
childGroup.setVersionControlInformation(vci, Collections.emptyMap());
}

View File

@ -931,6 +931,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final Label label = controller.createLabel(labelDTO.getId(), labelDTO.getLabel());
label.setStyle(labelDTO.getStyle());
label.setPosition(new Position(labelDTO.getPosition().getX(), labelDTO.getPosition().getY()));
label.setVersionedComponentId(labelDTO.getVersionedComponentId());
if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) {
label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight()));
}
@ -1327,13 +1328,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "inputPort")) {
inputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement));
}
remoteGroup.setInputPorts(inputPorts);
remoteGroup.setInputPorts(inputPorts, false);
final Set<RemoteProcessGroupPortDescriptor> outputPorts = new HashSet<>();
for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "outputPort")) {
outputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement));
}
remoteGroup.setOutputPorts(outputPorts);
remoteGroup.setOutputPorts(outputPorts, false);
processGroup.addRemoteProcessGroup(remoteGroup);
for (final RemoteProcessGroupPortDescriptor remoteGroupPortDTO : outputPorts) {

View File

@ -83,11 +83,14 @@ import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedFlowStatus;
import org.apache.nifi.registry.flow.VersionedFunnel;
import org.apache.nifi.registry.flow.VersionedLabel;
import org.apache.nifi.registry.flow.VersionedPort;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedPropertyDescriptor;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
@ -166,6 +169,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private final Map<String, Template> templates = new HashMap<>();
private final StringEncryptor encryptor;
private final MutableVariableRegistry variableRegistry;
private final AtomicReference<StandardVersionedFlowStatus> flowStatus = new AtomicReference<>(
new StandardVersionedFlowStatus(null, "Not yet synchronized with Flow Registry", null));
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@ -494,6 +499,7 @@ public final class StandardProcessGroup implements ProcessGroup {
port.setProcessGroup(this);
inputPorts.put(requireNonNull(port).getIdentifier(), port);
flowController.onInputPortAdded(port);
onComponentModified();
} finally {
writeLock.unlock();
}
@ -528,6 +534,8 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException(port.getIdentifier() + " is not an Input Port of this Process Group");
}
onComponentModified();
flowController.onInputPortRemoved(port);
LOG.info("Input Port {} removed from flow", port);
} finally {
@ -575,6 +583,7 @@ public final class StandardProcessGroup implements ProcessGroup {
port.setProcessGroup(this);
outputPorts.put(port.getIdentifier(), port);
flowController.onOutputPortAdded(port);
onComponentModified();
} finally {
writeLock.unlock();
}
@ -600,6 +609,8 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException(port.getIdentifier() + " is not an Output Port of this Process Group");
}
onComponentModified();
flowController.onOutputPortRemoved(port);
LOG.info("Output Port {} removed from flow", port);
} finally {
@ -640,6 +651,7 @@ public final class StandardProcessGroup implements ProcessGroup {
processGroups.put(Objects.requireNonNull(group).getIdentifier(), group);
flowController.onProcessGroupAdded(group);
onComponentModified();
} finally {
writeLock.unlock();
}
@ -679,6 +691,8 @@ public final class StandardProcessGroup implements ProcessGroup {
removeComponents(group);
processGroups.remove(group.getIdentifier());
onComponentModified();
flowController.onProcessGroupRemoved(group);
LOG.info("{} removed from flow", group);
} finally {
@ -734,6 +748,7 @@ public final class StandardProcessGroup implements ProcessGroup {
remoteGroup.setProcessGroup(this);
remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), remoteGroup);
onComponentModified();
} finally {
writeLock.unlock();
}
@ -767,6 +782,8 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
onComponentModified();
for (final RemoteGroupPort port : remoteGroup.getOutputPorts()) {
// must copy to avoid a concurrent modification
final Set<Connection> copy = new HashSet<>(port.getConnections());
@ -802,6 +819,7 @@ public final class StandardProcessGroup implements ProcessGroup {
processor.getVariableRegistry().setParent(getVariableRegistry());
processors.put(processorId, processor);
flowController.onProcessorAdded(processor);
onComponentModified();
} finally {
writeLock.unlock();
}
@ -843,6 +861,8 @@ public final class StandardProcessGroup implements ProcessGroup {
}
processors.remove(id);
onComponentModified();
flowController.onProcessorRemoved(processor);
LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
@ -912,6 +932,7 @@ public final class StandardProcessGroup implements ProcessGroup {
writeLock.lock();
try {
connections.put(connection.getIdentifier(), connection);
onComponentModified();
connection.setProcessGroup(this);
} finally {
writeLock.unlock();
@ -983,6 +1004,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
connections.put(connection.getIdentifier(), connection);
flowController.onConnectionAdded(connection);
onComponentModified();
} finally {
writeLock.unlock();
}
@ -1042,6 +1064,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// remove the connection from our map
connections.remove(connection.getIdentifier());
LOG.info("{} removed from flow", connection);
onComponentModified();
flowController.onConnectionRemoved(connection);
} finally {
writeLock.unlock();
@ -1109,6 +1133,7 @@ public final class StandardProcessGroup implements ProcessGroup {
label.setProcessGroup(this);
labels.put(label.getIdentifier(), label);
onComponentModified();
} finally {
writeLock.unlock();
}
@ -1123,6 +1148,7 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException(label + " is not a member of this Process Group.");
}
onComponentModified();
LOG.info("Label with ID {} removed from flow", label.getIdentifier());
} finally {
writeLock.unlock();
@ -1828,6 +1854,8 @@ public final class StandardProcessGroup implements ProcessGroup {
if (autoStart) {
startFunnel(funnel);
}
onComponentModified();
} finally {
writeLock.unlock();
}
@ -1859,18 +1887,43 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public ControllerServiceNode findControllerService(final String id) {
return findControllerService(id, this);
public ControllerServiceNode findControllerService(final String id, final boolean includeDescendants, final boolean includeAncestors) {
ControllerServiceNode serviceNode;
if (includeDescendants) {
serviceNode = findDescendantControllerService(id, this);
} else {
serviceNode = getControllerService(id);
}
private ControllerServiceNode findControllerService(final String id, final ProcessGroup start) {
if (serviceNode == null && includeAncestors) {
serviceNode = findAncestorControllerService(id, getParent());
}
return serviceNode;
}
private ControllerServiceNode findAncestorControllerService(final String id, final ProcessGroup start) {
if (start == null) {
return null;
}
final ControllerServiceNode serviceNode = start.getControllerService(id);
if (serviceNode != null) {
return serviceNode;
}
final ProcessGroup parent = start.getParent();
return findAncestorControllerService(id, parent);
}
private ControllerServiceNode findDescendantControllerService(final String id, final ProcessGroup start) {
ControllerServiceNode service = start.getControllerService(id);
if (service != null) {
return service;
}
for (final ProcessGroup group : start.getProcessGroups()) {
service = findControllerService(id, group);
service = findDescendantControllerService(id, group);
if (service != null) {
return service;
}
@ -1916,6 +1969,8 @@ public final class StandardProcessGroup implements ProcessGroup {
}
funnels.remove(funnel.getIdentifier());
onComponentModified();
flowController.onFunnelRemoved(funnel);
LOG.info("{} removed from flow", funnel);
} finally {
@ -1947,6 +2002,7 @@ public final class StandardProcessGroup implements ProcessGroup {
service.getVariableRegistry().setParent(getVariableRegistry());
this.controllerServices.put(service.getIdentifier(), service);
LOG.info("{} added to {}", service, this);
onComponentModified();
} finally {
writeLock.unlock();
}
@ -2010,6 +2066,21 @@ public final class StandardProcessGroup implements ProcessGroup {
}
controllerServices.remove(service.getIdentifier());
onComponentModified();
// For any component that references this Controller Service, find the component's Process Group
// and notify the Process Group that a component has been modified. This way, we know to re-calculate
// whether or not the Process Group has local modifications.
service.getReferences().getReferencingComponents().stream()
.map(ConfiguredComponent::getProcessGroupIdentifier)
.filter(id -> !id.equals(getIdentifier()))
.forEach(groupId -> {
final ProcessGroup descendant = findProcessGroup(groupId);
if (descendant != null) {
descendant.onComponentModified();
}
});
flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier());
removed = true;
@ -2043,6 +2114,7 @@ public final class StandardProcessGroup implements ProcessGroup {
templates.put(id, template);
template.setProcessGroup(this);
LOG.info("{} added to {}", template, this);
onComponentModified();
} finally {
writeLock.unlock();
}
@ -2112,6 +2184,8 @@ public final class StandardProcessGroup implements ProcessGroup {
}
templates.remove(template.getIdentifier());
onComponentModified();
LOG.info("{} removed from flow", template);
} finally {
writeLock.unlock();
@ -2172,6 +2246,8 @@ public final class StandardProcessGroup implements ProcessGroup {
toRemove.verifyCanDelete(true);
}
onComponentModified();
for (final String id : connectionIdsToRemove) {
removeConnection(connections.get(id));
}
@ -2224,6 +2300,8 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new IllegalStateException("Cannot move Ports into the root group");
}
onComponentModified();
for (final String id : getKeys(snippet.getInputPorts())) {
destination.addInputPort(inputPorts.remove(id));
}
@ -2844,6 +2922,34 @@ public final class StandardProcessGroup implements ProcessGroup {
return versionControlInfo.get();
}
@Override
public void onComponentModified() {
// We no longer know if or how the Process Group has changed, so the next time that we
// get the local modifications, we must re-calculate it. We cannot simply assume that
// the flow was modified now, because if a Processor Property changed from 'A' to 'B',
// then back to 'A', then we have to know that it was not modified. So we set it to null
// to indicate that we must calculate the local modifications.
final StandardVersionControlInformation svci = this.versionControlInfo.get();
if (svci == null) {
// This group is not under version control directly. Notify parent.
final ProcessGroup parentGroup = parent.get();
if (parentGroup != null) {
parentGroup.onComponentModified();
}
}
clearFlowDifferences();
}
private void clearFlowDifferences() {
boolean updated = false;
while (!updated) {
final StandardVersionedFlowStatus status = flowStatus.get();
final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), null);
updated = flowStatus.compareAndSet(status, updatedStatus);
}
}
@Override
public void setVersionControlInformation(final VersionControlInformation versionControlInformation, final Map<String, String> versionedComponentIds) {
final StandardVersionControlInformation svci = new StandardVersionControlInformation(
@ -2854,17 +2960,64 @@ public final class StandardProcessGroup implements ProcessGroup {
versionControlInformation.getVersion(),
stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true),
versionControlInformation.isModified(),
versionControlInformation.isCurrent()) {
versionControlInformation.isCurrent(),
versionControlInformation.getStatus()) {
@Override
public boolean isModified() {
final Set<FlowDifference> differences = StandardProcessGroup.this.getModifications();
boolean updated = false;
while (true) {
final StandardVersionedFlowStatus status = flowStatus.get();
Set<FlowDifference> differences = status.getCurrentDifferences();
if (differences == null) {
differences = getModifications();
if (differences == null) {
return false;
}
final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(status.getState(), status.getStateExplanation(), differences);
updated = flowStatus.compareAndSet(status, updatedStatus);
if (updated) {
return !differences.isEmpty();
}
continue;
}
return !differences.isEmpty();
}
}
@Override
public VersionedFlowStatus getStatus() {
// If current state is a sync failure, then
final StandardVersionedFlowStatus status = flowStatus.get();
final VersionedFlowState state = status.getState();
if (state == VersionedFlowState.SYNC_FAILURE) {
return status;
}
final boolean modified = isModified();
if (!modified) {
final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get();
if (vci.getFlowSnapshot() == null) {
return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry", null);
}
}
final boolean stale = !isCurrent();
if (modified && stale) {
return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED_AND_STALE, "Local changes have been made and a newer version of this flow is available", null);
} else if (modified) {
return new StandardVersionedFlowStatus(VersionedFlowState.LOCALLY_MODIFIED, "Local changes have been made", null);
} else if (stale) {
return new StandardVersionedFlowStatus(VersionedFlowState.STALE, "A newer version of this flow is available", null);
} else {
return new StandardVersionedFlowStatus(VersionedFlowState.UP_TO_DATE, "Flow version is current", null);
}
}
};
svci.setBucketName(versionControlInformation.getBucketName());
@ -2875,6 +3028,7 @@ public final class StandardProcessGroup implements ProcessGroup {
try {
updateVersionedComponentIds(this, versionedComponentIds);
this.versionControlInfo.set(svci);
clearFlowDifferences();
} finally {
writeLock.unlock();
}
@ -2901,6 +3055,7 @@ public final class StandardProcessGroup implements ProcessGroup {
copy.setProcessors(processGroup.getProcessors());
copy.setRemoteProcessGroups(processGroup.getRemoteProcessGroups());
copy.setVariables(processGroup.getVariables());
copy.setLabels(processGroup.getLabels());
final Set<VersionedProcessGroup> copyChildren = new HashSet<>();
@ -2944,7 +3099,21 @@ public final class StandardProcessGroup implements ProcessGroup {
}
applyVersionedComponentIds(processGroup, versionedComponentIds::get);
// If we versioned any parent groups' Controller Services, set their versioned component id's too.
final ProcessGroup parent = processGroup.getParent();
if (parent != null) {
for (final ControllerServiceNode service : parent.getControllerServices(true)) {
if (!service.getVersionedComponentId().isPresent()) {
final String versionedId = versionedComponentIds.get(service.getIdentifier());
if (versionedId != null) {
service.setVersionedComponentId(versionedId);
}
}
}
}
}
private void applyVersionedComponentIds(final ProcessGroup processGroup, final Function<String, String> lookup) {
processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier()));
@ -2980,6 +3149,14 @@ public final class StandardProcessGroup implements ProcessGroup {
.forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup));
}
private void setSyncFailedState(final String explanation) {
boolean updated = false;
while (!updated) {
final StandardVersionedFlowStatus status = flowStatus.get();
final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, explanation, status.getCurrentDifferences());
updated = flowStatus.compareAndSet(status, updatedStatus);
}
}
@Override
public void synchronizeWithFlowRegistry(final FlowRegistryClient flowRegistryClient) {
@ -2991,6 +3168,10 @@ public final class StandardProcessGroup implements ProcessGroup {
final String registryId = vci.getRegistryIdentifier();
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
if (flowRegistry == null) {
final String message = String.format("Unable to synchronize Process Group with Flow Registry because Process Group was placed under Version Control using Flow Registry "
+ "with identifier %s but cannot find any Flow Registry with this identifier", registryId);
setSyncFailedState(message);
LOG.error("Unable to synchronize {} with Flow Registry because Process Group was placed under Version Control using Flow Registry "
+ "with identifier {} but cannot find any Flow Registry with this identifier", this, registryId);
return;
@ -3005,8 +3186,12 @@ public final class StandardProcessGroup implements ProcessGroup {
final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents();
vci.setFlowSnapshot(registryFlow);
} catch (final IOException | NiFiRegistryException e) {
final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not retrieve version %s of flow with identifier %s in bucket %s",
vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier());
setSyncFailedState(message);
LOG.error("Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}",
new Object[] {this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier()}, e);
this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier(), e);
return;
}
}
@ -3027,7 +3212,17 @@ public final class StandardProcessGroup implements ProcessGroup {
LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}",
new Object[] {this, vci.getVersion(), latestVersion});
}
boolean updated = false;
while (!updated) {
final StandardVersionedFlowStatus status = flowStatus.get();
final StandardVersionedFlowStatus updatedStatus = new StandardVersionedFlowStatus(null, null, status.getCurrentDifferences());
updated = flowStatus.compareAndSet(status, updatedStatus);
}
} catch (final IOException | NiFiRegistryException e) {
final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry");
setSyncFailedState(message);
LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e);
}
}
@ -3041,12 +3236,12 @@ public final class StandardProcessGroup implements ProcessGroup {
verifyCanUpdate(proposedSnapshot, true, verifyNotDirty);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), true);
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), true);
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents());
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, new StaticDifferenceDescriptor());
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare();
final Set<String> updatedVersionedComponentIds = new HashSet<>();
@ -3055,6 +3250,25 @@ public final class StandardProcessGroup implements ProcessGroup {
continue;
}
// If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
// and if so compare our VersionedControllerService to the existing service.
if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA();
if (ComponentType.CONTROLLER_SERVICE == component.getComponentType()) {
final ControllerServiceNode serviceNode = getVersionedControllerService(this, component.getIdentifier());
if (serviceNode != null) {
final VersionedControllerService versionedService = mapper.mapControllerService(serviceNode, controllerServiceProvider);
final Set<FlowDifference> differences = flowComparator.compareControllerServices(versionedService, (VersionedControllerService) component);
if (!differences.isEmpty()) {
updatedVersionedComponentIds.add(component.getIdentifier());
}
continue;
}
}
}
final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA();
updatedVersionedComponentIds.add(component.getIdentifier());
@ -3081,6 +3295,35 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
private Set<String> getAncestorGroupServiceIds() {
final Set<String> ancestorServiceIds;
ProcessGroup parentGroup = getParent();
if (parentGroup == null) {
ancestorServiceIds = Collections.emptySet();
} else {
ancestorServiceIds = parentGroup.getControllerServices(true).stream()
.map(ControllerServiceNode::getIdentifier)
.collect(Collectors.toSet());
}
return ancestorServiceIds;
}
private ControllerServiceNode getVersionedControllerService(final ProcessGroup group, final String versionedComponentId) {
if (group == null) {
return null;
}
for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) {
if (serviceNode.getVersionedComponentId().isPresent() && serviceNode.getVersionedComponentId().get().equals(versionedComponentId)) {
return serviceNode;
}
}
return getVersionedControllerService(group.getParent(), versionedComponentId);
}
private Set<String> getKnownVariableNames() {
final Set<String> variableNames = new HashSet<>();
populateKnownVariableNames(this, variableNames);
@ -3159,6 +3402,44 @@ public final class StandardProcessGroup implements ProcessGroup {
group.setVersionControlInformation(vci, Collections.emptyMap());
}
// Controller Services
// Controller Services have to be handled a bit differently than other components. This is because Processors and Controller
// Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding
// Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each
// Controller Service. This way, we ensure that all services have been created before setting the properties. This allows us to
// properly obtain the correct mapping of Controller Service VersionedComponentID to Controller Service instance id.
final Map<String, ControllerServiceNode> servicesByVersionedId = group.getControllerServices(false).stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet());
final Map<ControllerServiceNode, VersionedControllerService> services = new HashMap<>();
// Add any Controller Service that does not yet exist.
for (final VersionedControllerService proposedService : proposed.getControllerServices()) {
ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier());
if (service == null) {
service = addControllerService(group, proposedService, componentIdSeed);
LOG.info("Added {} to {}", service, this);
}
services.put(service, proposedService);
}
// Update all of the Controller Services to match the VersionedControllerService
for (final Map.Entry<ControllerServiceNode, VersionedControllerService> entry : services.entrySet()) {
final ControllerServiceNode service = entry.getKey();
final VersionedControllerService proposedService = entry.getValue();
if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
updateControllerService(service, proposedService);
LOG.info("Updated {}", service);
}
controllerServicesRemoved.remove(proposedService.getIdentifier());
}
// Child groups
final Map<String, ProcessGroup> childGroupsByVersionedId = group.getProcessGroups().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
@ -3179,26 +3460,6 @@ public final class StandardProcessGroup implements ProcessGroup {
childGroupsRemoved.remove(proposedChildGroup.getIdentifier());
}
// Controller Services
final Map<String, ControllerServiceNode> servicesByVersionedId = group.getControllerServices(false).stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet());
for (final VersionedControllerService proposedService : proposed.getControllerServices()) {
final ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier());
if (service == null) {
final ControllerServiceNode added = addControllerService(group, proposedService, componentIdSeed);
LOG.info("Added {} to {}", added, this);
} else if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
updateControllerService(service, proposedService);
LOG.info("Updated {}", service);
}
controllerServicesRemoved.remove(proposedService.getIdentifier());
}
// Funnels
final Map<String, Funnel> funnelsByVersionedId = group.getFunnels().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
@ -3608,7 +3869,7 @@ public final class StandardProcessGroup implements ProcessGroup {
service.setAnnotationData(proposed.getAnnotationData());
service.setComments(proposed.getComments());
service.setName(proposed.getName());
service.setProperties(populatePropertiesMap(service.getProperties(), proposed.getProperties()));
service.setProperties(populatePropertiesMap(service.getProperties(), proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup()));
if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
@ -3728,7 +3989,7 @@ public final class StandardProcessGroup implements ProcessGroup {
processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode()));
processor.setName(proposed.getName());
processor.setPenalizationPeriod(proposed.getPenaltyDuration());
processor.setProperties(populatePropertiesMap(processor.getProperties(), proposed.getProperties()));
processor.setProperties(populatePropertiesMap(processor.getProperties(), proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup()));
processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS);
processor.setScheduldingPeriod(proposed.getSchedulingPeriod());
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
@ -3745,19 +4006,60 @@ public final class StandardProcessGroup implements ProcessGroup {
}
private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties) {
private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties,
final Map<String, VersionedPropertyDescriptor> proposedDescriptors, final ProcessGroup group) {
final Map<String, String> fullPropertyMap = new HashMap<>();
for (final PropertyDescriptor property : currentProperties.keySet()) {
fullPropertyMap.put(property.getName(), null);
}
if (proposedProperties != null) {
fullPropertyMap.putAll(proposedProperties);
for (final Map.Entry<String, String> entry : proposedProperties.entrySet()) {
final String propertyName = entry.getKey();
final VersionedPropertyDescriptor descriptor = proposedDescriptors.get(propertyName);
String value;
if (descriptor != null && descriptor.getIdentifiesControllerService()) {
// Property identifies a Controller Service. So the value that we want to assign is not the value given.
// The value given is instead the Versioned Component ID of the Controller Service. We want to resolve this
// to the instance ID of the Controller Service.
final String serviceVersionedComponentId = entry.getValue();
final String instanceId = getServiceInstanceId(serviceVersionedComponentId, group);
value = instanceId == null ? serviceVersionedComponentId : instanceId;
} else {
value = entry.getValue();
}
fullPropertyMap.put(propertyName, value);
}
}
return fullPropertyMap;
}
private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) {
for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) {
final Optional<String> optionalVersionedId = serviceNode.getVersionedComponentId();
if (!optionalVersionedId.isPresent()) {
continue;
}
final String versionedId = optionalVersionedId.get();
if (versionedId.equals(serviceVersionedComponentId)) {
return serviceNode.getIdentifier();
}
}
final ProcessGroup parent = group.getParent();
if (parent == null) {
return null;
}
return getServiceInstanceId(serviceVersionedComponentId, parent);
}
private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) {
final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getTargetUris());
rpg.setVersionedComponentId(proposed.getIdentifier());
@ -3773,12 +4075,12 @@ public final class StandardProcessGroup implements ProcessGroup {
rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout());
rpg.setInputPorts(proposed.getInputPorts() == null ? Collections.emptySet() : proposed.getInputPorts().stream()
.map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
.collect(Collectors.toSet()));
.collect(Collectors.toSet()), false);
rpg.setName(proposed.getName());
rpg.setNetworkInterface(proposed.getLocalNetworkInterface());
rpg.setOutputPorts(proposed.getOutputPorts() == null ? Collections.emptySet() : proposed.getOutputPorts().stream()
.map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
.collect(Collectors.toSet()));
.collect(Collectors.toSet()), false);
rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
rpg.setProxyHost(proposed.getProxyHost());
rpg.setProxyPort(proposed.getProxyPort());
@ -3831,12 +4133,12 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false);
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), false);
final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());
final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor());
final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorGroupServiceIds(), new EvolvingDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare();
final Set<FlowDifference> differences = comparison.getDifferences();
final Set<FlowDifference> functionalDifferences = differences.stream()

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.groups;
import java.util.Set;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedFlowStatus;
import org.apache.nifi.registry.flow.diff.FlowDifference;
class StandardVersionedFlowStatus implements VersionedFlowStatus {
private final VersionedFlowState state;
private final String explanation;
private final Set<FlowDifference> currentDifferences;
StandardVersionedFlowStatus(final VersionedFlowState state, final String explanation, final Set<FlowDifference> differences) {
this.state = state;
this.explanation = explanation;
this.currentDifferences = differences;
}
@Override
public VersionedFlowState getState() {
return state;
}
@Override
public String getStateExplanation() {
return explanation;
}
Set<FlowDifference> getCurrentDifferences() {
return currentDifferences;
}
}

View File

@ -34,6 +34,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
private volatile VersionedProcessGroup flowSnapshot;
private volatile boolean modified;
private volatile boolean current;
private final VersionedFlowStatus status;
public static class Builder {
private String registryIdentifier;
@ -47,6 +48,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
private VersionedProcessGroup flowSnapshot;
private Boolean modified = null;
private Boolean current = null;
private VersionedFlowStatus status;
public Builder registryId(String registryId) {
this.registryIdentifier = registryId;
@ -103,6 +105,11 @@ public class StandardVersionControlInformation implements VersionControlInformat
return this;
}
public Builder status(final VersionedFlowStatus status) {
this.status = status;
return this;
}
public static Builder fromDto(VersionControlInformationDTO dto) {
Builder builder = new Builder();
builder.registryId(dto.getRegistryId())
@ -126,7 +133,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
Objects.requireNonNull(version, "Version must be specified");
final StandardVersionControlInformation svci = new StandardVersionControlInformation(registryIdentifier, registryName,
bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current);
bucketIdentifier, flowIdentifier, version, flowSnapshot, modified, current, status);
svci.setBucketName(bucketName);
svci.setFlowName(flowName);
@ -138,7 +145,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
public StandardVersionControlInformation(final String registryId, final String registryName, final String bucketId, final String flowId, final int version,
final VersionedProcessGroup snapshot, final boolean modified, final boolean current) {
final VersionedProcessGroup snapshot, final boolean modified, final boolean current, final VersionedFlowStatus status) {
this.registryIdentifier = registryId;
this.registryName = registryName;
this.bucketIdentifier = bucketId;
@ -147,6 +154,7 @@ public class StandardVersionControlInformation implements VersionControlInformat
this.flowSnapshot = snapshot;
this.modified = modified;
this.current = current;
this.status = status;
}
@ -232,4 +240,9 @@ public class StandardVersionControlInformation implements VersionControlInformat
public void setFlowSnapshot(final VersionedProcessGroup flowSnapshot) {
this.flowSnapshot = flowSnapshot;
}
@Override
public VersionedFlowStatus getStatus() {
return status;
}
}

View File

@ -1,328 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow.mapping;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.nifi.registry.flow.BatchSize;
import org.apache.nifi.registry.flow.Bundle;
import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.ConnectableComponent;
import org.apache.nifi.registry.flow.ConnectableComponentType;
import org.apache.nifi.registry.flow.ControllerServiceAPI;
import org.apache.nifi.registry.flow.PortType;
import org.apache.nifi.registry.flow.Position;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFunnel;
import org.apache.nifi.registry.flow.VersionedLabel;
import org.apache.nifi.registry.flow.VersionedPort;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceApiDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
public class NiFiRegistryDtoMapper {
// We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when
// we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned'
// identifier based on the comopnent's actual id. We do connections last, so that all components will already have been
// created before attempting to create the connection, where the ConnectableDTO is converted.
private Map<String, String> versionedComponentIds = new HashMap<>();
public VersionedProcessGroup mapProcessGroup(final ProcessGroupDTO dto) {
versionedComponentIds.clear();
return mapGroup(dto);
}
private VersionedProcessGroup mapGroup(final ProcessGroupDTO dto) {
final VersionedProcessGroup versionedGroup = new VersionedProcessGroup();
versionedGroup.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
versionedGroup.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
versionedGroup.setName(dto.getName());
versionedGroup.setComments(dto.getComments());
versionedGroup.setPosition(mapPosition(dto.getPosition()));
final FlowSnippetDTO contents = dto.getContents();
versionedGroup.setControllerServices(contents.getControllerServices().stream()
.map(this::mapControllerService)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setFunnels(contents.getFunnels().stream()
.map(this::mapFunnel)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setInputPorts(contents.getInputPorts().stream()
.map(this::mapPort)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setOutputPorts(contents.getOutputPorts().stream()
.map(this::mapPort)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setLabels(contents.getLabels().stream()
.map(this::mapLabel)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setProcessors(contents.getProcessors().stream()
.map(this::mapProcessor)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setRemoteProcessGroups(contents.getRemoteProcessGroups().stream()
.map(this::mapRemoteProcessGroup)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setProcessGroups(contents.getProcessGroups().stream()
.map(this::mapGroup)
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setConnections(contents.getConnections().stream()
.map(this::mapConnection)
.collect(Collectors.toCollection(LinkedHashSet::new)));
return versionedGroup;
}
private String getId(final String currentVersionedId, final String componentId) {
final String versionedId;
if (currentVersionedId == null) {
versionedId = UUID.nameUUIDFromBytes(componentId.getBytes(StandardCharsets.UTF_8)).toString();
} else {
versionedId = currentVersionedId;
}
versionedComponentIds.put(componentId, versionedId);
return versionedId;
}
private String getGroupId(final String groupId) {
return versionedComponentIds.get(groupId);
}
public VersionedConnection mapConnection(final ConnectionDTO dto) {
final VersionedConnection connection = new VersionedConnection();
connection.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
connection.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
connection.setName(dto.getName());
connection.setBackPressureDataSizeThreshold(dto.getBackPressureDataSizeThreshold());
connection.setBackPressureObjectThreshold(dto.getBackPressureObjectThreshold());
connection.setFlowFileExpiration(dto.getFlowFileExpiration());
connection.setLabelIndex(dto.getLabelIndex());
connection.setPosition(mapPosition(dto.getPosition()));
connection.setPrioritizers(dto.getPrioritizers());
connection.setSelectedRelationships(dto.getSelectedRelationships());
connection.setzIndex(dto.getzIndex());
connection.setBends(dto.getBends().stream()
.map(this::mapPosition)
.collect(Collectors.toList()));
connection.setSource(mapConnectable(dto.getSource()));
connection.setDestination(mapConnectable(dto.getDestination()));
return connection;
}
public ConnectableComponent mapConnectable(final ConnectableDTO dto) {
final ConnectableComponent component = new ConnectableComponent();
final String versionedId = dto.getVersionedComponentId();
if (versionedId == null) {
final String resolved = versionedComponentIds.get(dto.getId());
if (resolved == null) {
throw new IllegalArgumentException("Unable to map Connectable Component with identifier " + dto.getId() + " to any version-controlled component");
}
component.setId(resolved);
} else {
component.setId(versionedId);
}
component.setComments(dto.getComments());
component.setGroupId(dto.getGroupId());
component.setName(dto.getName());
component.setType(ConnectableComponentType.valueOf(dto.getType()));
return component;
}
public VersionedControllerService mapControllerService(final ControllerServiceDTO dto) {
final VersionedControllerService service = new VersionedControllerService();
service.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
service.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
service.setName(dto.getName());
service.setAnnotationData(dto.getAnnotationData());
service.setBundle(mapBundle(dto.getBundle()));
service.setComments(dto.getComments());
service.setControllerServiceApis(dto.getControllerServiceApis().stream()
.map(this::mapControllerServiceApi)
.collect(Collectors.toList()));
service.setProperties(dto.getProperties());
service.setType(dto.getType());
return null;
}
private Bundle mapBundle(final BundleDTO dto) {
final Bundle bundle = new Bundle();
bundle.setGroup(dto.getGroup());
bundle.setArtifact(dto.getArtifact());
bundle.setVersion(dto.getVersion());
return bundle;
}
private ControllerServiceAPI mapControllerServiceApi(final ControllerServiceApiDTO dto) {
final ControllerServiceAPI api = new ControllerServiceAPI();
api.setBundle(mapBundle(dto.getBundle()));
api.setType(dto.getType());
return api;
}
public VersionedFunnel mapFunnel(final FunnelDTO dto) {
final VersionedFunnel funnel = new VersionedFunnel();
funnel.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
funnel.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
funnel.setPosition(mapPosition(dto.getPosition()));
return funnel;
}
public VersionedLabel mapLabel(final LabelDTO dto) {
final VersionedLabel label = new VersionedLabel();
label.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
label.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
label.setHeight(dto.getHeight());
label.setWidth(dto.getWidth());
label.setLabel(dto.getLabel());
label.setPosition(mapPosition(dto.getPosition()));
label.setStyle(dto.getStyle());
return label;
}
public VersionedPort mapPort(final PortDTO dto) {
final VersionedPort port = new VersionedPort();
port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
port.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
port.setComments(dto.getComments());
port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount());
port.setName(dto.getName());
port.setPosition(mapPosition(dto.getPosition()));
port.setType(PortType.valueOf(dto.getType()));
return port;
}
public Position mapPosition(final PositionDTO dto) {
final Position position = new Position();
position.setX(dto.getX());
position.setY(dto.getY());
return position;
}
public VersionedProcessor mapProcessor(final ProcessorDTO dto) {
final ProcessorConfigDTO config = dto.getConfig();
final VersionedProcessor processor = new VersionedProcessor();
processor.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
processor.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
processor.setType(dto.getType());
processor.setAnnotationData(config.getAnnotationData());
processor.setAutoTerminatedRelationships(config.getAutoTerminatedRelationships());
processor.setBulletinLevel(config.getBulletinLevel());
processor.setBundle(mapBundle(dto.getBundle()));
processor.setComments(config.getComments());
processor.setConcurrentlySchedulableTaskCount(config.getConcurrentlySchedulableTaskCount());
processor.setExecutionNode(config.getExecutionNode());
processor.setName(dto.getName());
processor.setPenaltyDuration(config.getPenaltyDuration());
processor.setPosition(mapPosition(dto.getPosition()));
processor.setProperties(config.getProperties());
processor.setRunDurationMillis(config.getRunDurationMillis());
processor.setSchedulingPeriod(config.getSchedulingPeriod());
processor.setSchedulingStrategy(config.getSchedulingStrategy());
processor.setStyle(dto.getStyle());
processor.setYieldDuration(config.getYieldDuration());
return processor;
}
public VersionedRemoteProcessGroup mapRemoteProcessGroup(final RemoteProcessGroupDTO dto) {
final VersionedRemoteProcessGroup rpg = new VersionedRemoteProcessGroup();
rpg.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
rpg.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
rpg.setComments(dto.getComments());
rpg.setCommunicationsTimeout(dto.getCommunicationsTimeout());
rpg.setLocalNetworkInterface(dto.getLocalNetworkInterface());
rpg.setName(dto.getName());
rpg.setInputPorts(dto.getContents().getInputPorts().stream()
.map(port -> mapRemotePort(port, ComponentType.REMOTE_INPUT_PORT))
.collect(Collectors.toSet()));
rpg.setOutputPorts(dto.getContents().getOutputPorts().stream()
.map(port -> mapRemotePort(port, ComponentType.REMOTE_OUTPUT_PORT))
.collect(Collectors.toSet()));
rpg.setPosition(mapPosition(dto.getPosition()));
rpg.setProxyHost(dto.getProxyHost());
rpg.setProxyPort(dto.getProxyPort());
rpg.setProxyUser(dto.getProxyUser());
rpg.setTargetUri(dto.getTargetUri());
rpg.setTargetUris(dto.getTargetUris());
rpg.setTransportProtocol(dto.getTransportProtocol());
rpg.setYieldDuration(dto.getYieldDuration());
return rpg;
}
public VersionedRemoteGroupPort mapRemotePort(final RemoteProcessGroupPortDTO dto, final ComponentType componentType) {
final VersionedRemoteGroupPort port = new VersionedRemoteGroupPort();
port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
port.setGroupIdentifier(getGroupId(dto.getGroupId()));
port.setComments(dto.getComments());
port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount());
port.setRemoteGroupId(dto.getGroupId());
port.setName(dto.getName());
port.setUseCompression(dto.getUseCompression());
port.setBatchSize(mapBatchSettings(dto.getBatchSettings()));
port.setTargetId(dto.getTargetId());
port.setComponentType(componentType);
return port;
}
private BatchSize mapBatchSettings(final BatchSettingsDTO dto) {
final BatchSize batchSize = new BatchSize();
batchSize.setCount(dto.getCount());
batchSize.setDuration(dto.getDuration());
batchSize.setSize(dto.getSize());
return batchSize;
}
}

View File

@ -28,7 +28,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ClassUtils;
@ -44,6 +43,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
@ -59,15 +59,16 @@ import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.PortType;
import org.apache.nifi.registry.flow.Position;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedFunnel;
import org.apache.nifi.registry.flow.VersionedLabel;
import org.apache.nifi.registry.flow.VersionedPort;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedPropertyDescriptor;
import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
@ -80,56 +81,16 @@ public class NiFiRegistryFlowMapper {
// created before attempting to create the connection, where the ConnectableDTO is converted.
private Map<String, String> versionedComponentIds = new HashMap<>();
public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean mapDescendantVersionedFlows) {
public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, final FlowRegistryClient registryClient,
final boolean mapDescendantVersionedFlows) {
versionedComponentIds.clear();
final InstantiatedVersionedProcessGroup mapped = mapGroup(group, registryClient, true, mapDescendantVersionedFlows);
final InstantiatedVersionedProcessGroup mapped = mapGroup(group, serviceProvider, registryClient, true, mapDescendantVersionedFlows);
populateReferencedAncestorServices(group, mapped);
populateReferencedAncestorVariables(group, mapped);
return mapped;
}
private void populateReferencedAncestorServices(final ProcessGroup group, final VersionedProcessGroup versionedGroup) {
final Set<ControllerServiceNode> ancestorControllerServices = group.getControllerServices(true);
ancestorControllerServices.remove(group.getControllerServices(false));
final Map<String, ControllerServiceNode> ancestorServicesById = ancestorControllerServices.stream()
.collect(Collectors.toMap(ControllerServiceNode::getIdentifier, Function.identity()));
final Set<ControllerServiceNode> referenced = new HashSet<>();
for (final ProcessorNode processor : group.findAllProcessors()) {
findReferencedServices(processor, ancestorServicesById, referenced);
}
for (final ControllerServiceNode service : group.findAllControllerServices()) {
findReferencedServices(service, ancestorServicesById, referenced);
}
final Set<VersionedControllerService> versionedServices = referenced.stream().map(this::mapControllerService)
.collect(Collectors.toCollection(LinkedHashSet::new));
versionedGroup.getControllerServices().addAll(versionedServices);
}
private Set<ControllerServiceNode> findReferencedServices(final ConfiguredComponent component, final Map<String, ControllerServiceNode> ancestorServicesById,
final Set<ControllerServiceNode> referenced) {
for (final Map.Entry<PropertyDescriptor, String> entry : component.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.getControllerServiceDefinition() != null) {
final String serviceId = entry.getValue();
final ControllerServiceNode serviceNode = ancestorServicesById.get(serviceId);
if (serviceNode != null) {
referenced.add(serviceNode);
referenced.addAll(findReferencedServices(serviceNode, ancestorServicesById, referenced));
}
}
}
return referenced;
}
private void populateReferencedAncestorVariables(final ProcessGroup group, final VersionedProcessGroup versionedGroup) {
final Set<String> ancestorVariableNames = new HashSet<>();
populateVariableNames(group.getParent(), ancestorVariableNames);
@ -167,7 +128,9 @@ public class NiFiRegistryFlowMapper {
}
private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel, final boolean mapDescendantVersionedFlows) {
private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final ControllerServiceProvider serviceLookup, final FlowRegistryClient registryClient,
final boolean topLevel, final boolean mapDescendantVersionedFlows) {
final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier());
versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier()));
versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier()));
@ -212,7 +175,7 @@ public class NiFiRegistryFlowMapper {
}
versionedGroup.setControllerServices(group.getControllerServices(false).stream()
.map(this::mapControllerService)
.map(service -> mapControllerService(service, serviceLookup))
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setFunnels(group.getFunnels().stream()
@ -232,7 +195,7 @@ public class NiFiRegistryFlowMapper {
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setProcessors(group.getProcessors().stream()
.map(this::mapProcessor)
.map(processor -> mapProcessor(processor, serviceLookup))
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setRemoteProcessGroups(group.getRemoteProcessGroups().stream()
@ -240,7 +203,7 @@ public class NiFiRegistryFlowMapper {
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setProcessGroups(group.getProcessGroups().stream()
.map(grp -> mapGroup(grp, registryClient, false, mapDescendantVersionedFlows))
.map(grp -> mapGroup(grp, serviceLookup, registryClient, false, mapDescendantVersionedFlows))
.collect(Collectors.toCollection(LinkedHashSet::new)));
versionedGroup.setConnections(group.getConnections().stream()
@ -335,7 +298,7 @@ public class NiFiRegistryFlowMapper {
return component;
}
public VersionedControllerService mapControllerService(final ControllerServiceNode controllerService) {
public VersionedControllerService mapControllerService(final ControllerServiceNode controllerService, final ControllerServiceProvider serviceProvider) {
final VersionedControllerService versionedService = new InstantiatedVersionedControllerService(controllerService.getIdentifier(), controllerService.getProcessGroupIdentifier());
versionedService.setIdentifier(getId(controllerService.getVersionedComponentId(), controllerService.getIdentifier()));
versionedService.setGroupIdentifier(getGroupId(controllerService.getProcessGroupIdentifier()));
@ -345,14 +308,16 @@ public class NiFiRegistryFlowMapper {
versionedService.setComments(controllerService.getComments());
versionedService.setControllerServiceApis(mapControllerServiceApis(controllerService));
versionedService.setProperties(mapProperties(controllerService));
versionedService.setProperties(mapProperties(controllerService, serviceProvider));
versionedService.setPropertyDescriptors(mapPropertyDescriptors(controllerService));
versionedService.setType(controllerService.getCanonicalClassName());
return versionedService;
}
private Map<String, String> mapProperties(final ConfiguredComponent component) {
private Map<String, String> mapProperties(final ConfiguredComponent component, final ControllerServiceProvider serviceProvider) {
final Map<String, String> mapped = new HashMap<>();
component.getProperties().keySet().stream()
.filter(property -> !property.isSensitive())
.forEach(property -> {
@ -360,11 +325,34 @@ public class NiFiRegistryFlowMapper {
if (value == null) {
value = property.getDefaultValue();
}
if (value != null && property.getControllerServiceDefinition() != null) {
// Property references a Controller Service. Instead of storing the existing value, we want
// to store the Versioned Component ID of the service.
final ControllerServiceNode controllerService = serviceProvider.getControllerServiceNode(value);
if (controllerService != null) {
value = getId(controllerService.getVersionedComponentId(), controllerService.getIdentifier());
}
}
mapped.put(property.getName(), value);
});
return mapped;
}
private Map<String, VersionedPropertyDescriptor> mapPropertyDescriptors(final ConfiguredComponent component) {
final Map<String, VersionedPropertyDescriptor> descriptors = new HashMap<>();
for (final PropertyDescriptor descriptor : component.getProperties().keySet()) {
final VersionedPropertyDescriptor versionedDescriptor = new VersionedPropertyDescriptor();
versionedDescriptor.setName(descriptor.getName());
versionedDescriptor.setDisplayName(descriptor.getDisplayName());
versionedDescriptor.setIdentifiesControllerService(descriptor.getControllerServiceDefinition() != null);
descriptors.put(descriptor.getName(), versionedDescriptor);
}
return descriptors;
}
private Bundle mapBundle(final BundleCoordinate coordinate) {
final Bundle versionedBundle = new Bundle();
versionedBundle.setGroup(coordinate.getGroup());
@ -441,7 +429,7 @@ public class NiFiRegistryFlowMapper {
return position;
}
public VersionedProcessor mapProcessor(final ProcessorNode procNode) {
public VersionedProcessor mapProcessor(final ProcessorNode procNode, final ControllerServiceProvider serviceProvider) {
final VersionedProcessor processor = new InstantiatedVersionedProcessor(procNode.getIdentifier(), procNode.getProcessGroupIdentifier());
processor.setIdentifier(getId(procNode.getVersionedComponentId(), procNode.getIdentifier()));
processor.setGroupIdentifier(getGroupId(procNode.getProcessGroupIdentifier()));
@ -456,7 +444,8 @@ public class NiFiRegistryFlowMapper {
processor.setName(procNode.getName());
processor.setPenaltyDuration(procNode.getPenalizationPeriod());
processor.setPosition(mapPosition(procNode.getPosition()));
processor.setProperties(mapProperties(procNode));
processor.setProperties(mapProperties(procNode, serviceProvider));
processor.setPropertyDescriptors(mapPropertyDescriptors(procNode));
processor.setRunDurationMillis(procNode.getRunDuration(TimeUnit.MILLISECONDS));
processor.setSchedulingPeriod(procNode.getSchedulingPeriod());
processor.setSchedulingStrategy(procNode.getSchedulingStrategy().name());

View File

@ -178,7 +178,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
};
final Runnable checkAuthorizations = new InitializationTask();
backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id);
backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id, true);
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS);
backgroundThreadExecutor.submit(() -> {
try {
@ -435,11 +435,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
* and started.
*
* @param ports the new ports
* @param pruneUnusedPorts if true, any ports that are not included in the given set of ports
* and that do not have any incoming connections will be removed.
*
* @throws NullPointerException if the given argument is null
*/
@Override
public void setInputPorts(final Set<RemoteProcessGroupPortDescriptor> ports) {
public void setInputPorts(final Set<RemoteProcessGroupPortDescriptor> ports, final boolean pruneUnusedPorts) {
writeLock.lock();
try {
final List<String> newPortTargetIds = new ArrayList<>();
@ -478,6 +480,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
// See if we have any ports that no longer exist; cannot be removed within the loop because it would cause
// a ConcurrentModificationException.
if (pruneUnusedPorts) {
final Iterator<StandardRemoteGroupPort> itr = inputPorts.values().iterator();
while (itr.hasNext()) {
final StandardRemoteGroupPort port = itr.next();
@ -491,6 +494,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
}
}
} finally {
writeLock.unlock();
}
@ -521,11 +525,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
* and started.
*
* @param ports the new ports
* @param pruneUnusedPorts if true, will remove any ports that are not in the given list and that have
* no outgoing connections
*
* @throws NullPointerException if the given argument is null
*/
@Override
public void setOutputPorts(final Set<RemoteProcessGroupPortDescriptor> ports) {
public void setOutputPorts(final Set<RemoteProcessGroupPortDescriptor> ports, final boolean pruneUnusedPorts) {
writeLock.lock();
try {
final List<String> newPortTargetIds = new ArrayList<>();
@ -535,7 +541,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
final Map<String, StandardRemoteGroupPort> outputPortByTargetId = outputPorts.values().stream()
.collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity()));
final Map<String, StandardRemoteGroupPort> outputPortByName = inputPorts.values().stream()
final Map<String, StandardRemoteGroupPort> outputPortByName = outputPorts.values().stream()
.collect(Collectors.toMap(StandardRemoteGroupPort::getName, Function.identity()));
// Check if we have a matching port already and add the port if not. We determine a matching port
@ -564,6 +570,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
// See if we have any ports that no longer exist; cannot be removed within the loop because it would cause
// a ConcurrentModificationException.
if (pruneUnusedPorts) {
final Iterator<StandardRemoteGroupPort> itr = outputPorts.values().iterator();
while (itr.hasNext()) {
final StandardRemoteGroupPort port = itr.next();
@ -577,6 +584,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
}
}
} finally {
writeLock.unlock();
}
@ -617,53 +625,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
@Override
public void removeAllNonExistentPorts() {
writeLock.lock();
try {
final Set<String> inputPortIds = new HashSet<>();
final Set<String> outputPortIds = new HashSet<>();
for (final Map.Entry<String, StandardRemoteGroupPort> entry : inputPorts.entrySet()) {
final RemoteGroupPort port = entry.getValue();
if (port.getTargetExists()) {
continue;
}
// If there's a connection, we don't remove it.
if (port.hasIncomingConnection()) {
continue;
}
inputPortIds.add(entry.getKey());
}
for (final Map.Entry<String, StandardRemoteGroupPort> entry : outputPorts.entrySet()) {
final RemoteGroupPort port = entry.getValue();
if (port.getTargetExists()) {
continue;
}
// If there's a connection, we don't remove it.
if (!port.getConnections().isEmpty()) {
continue;
}
outputPortIds.add(entry.getKey());
}
for (final String id : inputPortIds) {
inputPorts.remove(id);
}
for (final String id : outputPortIds) {
outputPorts.remove(id);
}
} finally {
writeLock.unlock();
}
}
/**
* Adds an Output Port to this Remote Process Group that is described by
@ -865,35 +826,16 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) {
dto = apiClient.getController(targetUris);
} catch (IOException e) {
writeLock.lock();
try {
for (final Iterator<StandardRemoteGroupPort> iter = inputPorts.values().iterator(); iter.hasNext();) {
final StandardRemoteGroupPort inputPort = iter.next();
if (!inputPort.hasIncomingConnection()) {
iter.remove();
}
}
for (final Iterator<StandardRemoteGroupPort> iter = outputPorts.values().iterator(); iter.hasNext();) {
final StandardRemoteGroupPort outputPort = iter.next();
if (outputPort.getConnections().isEmpty()) {
iter.remove();
}
}
} finally {
writeLock.unlock();
}
throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + targetUris + " due to: " + e.getMessage());
}
writeLock.lock();
try {
if (dto.getInputPorts() != null) {
setInputPorts(convertRemotePort(dto.getInputPorts()));
setInputPorts(convertRemotePort(dto.getInputPorts()), true);
}
if (dto.getOutputPorts() != null) {
setOutputPorts(convertRemotePort(dto.getOutputPorts()));
setOutputPorts(convertRemotePort(dto.getOutputPorts()), true);
}
// set the controller details

View File

@ -347,7 +347,7 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
public ControllerServiceNode findControllerService(final String id) {
public ControllerServiceNode findControllerService(final String id, final boolean includeDescendants, final boolean includeAncestors) {
return serviceMap.get(id);
}
@ -678,4 +678,8 @@ public class MockProcessGroup implements ProcessGroup {
@Override
public void verifyCanShowLocalModifications() {
}
@Override
public void onComponentModified() {
}
}

View File

@ -3731,7 +3731,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false);
final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false);
return versionedGroup;
}
@ -3753,21 +3753,39 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), false, NiFiUserUtils.getNiFiUser());
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false);
final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), flowRegistryClient, false);
final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor());
final Set<String> ancestorServiceIds = getAncestorGroupServiceIds(processGroup);
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare();
final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison);
final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison,
diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED && diff.getDifferenceType() != DifferenceType.STYLE_CHANGED);
final FlowComparisonEntity entity = new FlowComparisonEntity();
entity.setComponentDifferences(differenceDtos);
return entity;
}
private Set<String> getAncestorGroupServiceIds(final ProcessGroup group) {
final Set<String> ancestorServiceIds;
ProcessGroup parentGroup = group.getParent();
if (parentGroup == null) {
ancestorServiceIds = Collections.emptySet();
} else {
ancestorServiceIds = parentGroup.getControllerServices(true).stream()
.map(ControllerServiceNode::getIdentifier)
.collect(Collectors.toSet());
}
return ancestorServiceIds;
}
@Override
public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, NiFiRegistryException {
final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
@ -3852,12 +3870,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, flowRegistryClient, true);
final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(), flowRegistryClient, true);
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents);
final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Versioned Flow", updatedSnapshot.getFlowContents());
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, new StaticDifferenceDescriptor());
final Set<String> ancestorGroupServiceIds = getAncestorGroupServiceIds(group);
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorGroupServiceIds, new StaticDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare();
final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()

View File

@ -117,6 +117,8 @@ import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedFlowStatus;
import org.apache.nifi.registry.flow.diff.FlowComparison;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
@ -213,6 +215,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -2187,9 +2190,14 @@ public final class DtoFactory {
public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison) {
return createComponentDifferenceDtos(comparison, null);
}
public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison, final Predicate<FlowDifference> filter) {
final Map<ComponentDifferenceDTO, List<DifferenceDTO>> differencesByComponent = new HashMap<>();
for (final FlowDifference difference : comparison.getDifferences()) {
if (filter == null || filter.test(difference)) {
final ComponentDifferenceDTO componentDiff = createComponentDifference(difference);
final List<DifferenceDTO> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>());
@ -2199,6 +2207,7 @@ public final class DtoFactory {
differences.add(dto);
}
}
for (final Map.Entry<ComponentDifferenceDTO, List<DifferenceDTO>> entry : differencesByComponent.entrySet()) {
entry.getKey().setDifferences(entry.getValue());
@ -2252,6 +2261,12 @@ public final class DtoFactory {
dto.setVersion(versionControlInfo.getVersion());
dto.setCurrent(versionControlInfo.isCurrent());
dto.setModified(versionControlInfo.isModified());
final VersionedFlowStatus status = versionControlInfo.getStatus();
final VersionedFlowState state = status.getState();
dto.setState(state == null ? null : state.name());
dto.setStateExplanation(status.getStateExplanation());
return dto;
}
@ -3488,6 +3503,8 @@ public final class DtoFactory {
copy.setVersion(original.getVersion());
copy.setCurrent(original.getCurrent());
copy.setModified(original.getModified());
copy.setState(original.getState());
copy.setStateExplanation(original.getStateExplanation());
return copy;
}

View File

@ -52,6 +52,7 @@ import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
@ -172,6 +173,10 @@ public class ControllerFacade implements Authorizable {
}
}
public ControllerServiceProvider getControllerServiceProvider() {
return flowController;
}
/**
* Sets the name of this controller.
*

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.web.dao.impl;
import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.state.Scope;
@ -45,8 +47,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS;
public class StandardControllerServiceDAO extends ComponentDAO implements ControllerServiceDAO {
private ControllerServiceProvider serviceProvider;
@ -172,6 +172,22 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
}
}
controllerService.getProcessGroup().onComponentModified();
// For any component that references this Controller Service, find the component's Process Group
// and notify the Process Group that a component has been modified. This way, we know to re-calculate
// whether or not the Process Group has local modifications.
final ProcessGroup group = controllerService.getProcessGroup();
controllerService.getReferences().getReferencingComponents().stream()
.map(ConfiguredComponent::getProcessGroupIdentifier)
.filter(id -> !id.equals(group.getIdentifier()))
.forEach(groupId -> {
final ProcessGroup descendant = group.findProcessGroup(groupId);
if (descendant != null) {
descendant.onComponentModified();
}
});
return controllerService;
}

View File

@ -91,6 +91,8 @@ public class StandardFunnelDAO extends ComponentDAO implements FunnelDAO {
}
}
funnel.getProcessGroup().onComponentModified();
return funnel;
}

View File

@ -237,6 +237,7 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO {
inputPort.setMaxConcurrentTasks(concurrentTasks);
}
inputPort.getProcessGroup().onComponentModified();
return inputPort;
}

View File

@ -102,6 +102,7 @@ public class StandardLabelDAO extends ComponentDAO implements LabelDAO {
label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight()));
}
label.getProcessGroup().onComponentModified();
return label;
}

View File

@ -233,6 +233,7 @@ public class StandardOutputPortDAO extends ComponentDAO implements PortDAO {
outputPort.setMaxConcurrentTasks(concurrentTasks);
}
outputPort.getProcessGroup().onComponentModified();
return outputPort;
}

View File

@ -204,7 +204,11 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
for (final String serviceId : serviceIds) {
final ControllerServiceNode serviceNode = group.findControllerService(serviceId);
final ControllerServiceNode serviceNode = group.findControllerService(serviceId, true, true);
if (serviceNode == null) {
throw new ResourceNotFoundException("Could not find Controller Service with identifier " + serviceId);
}
if (ControllerServiceState.ENABLED.equals(state)) {
final CompletableFuture<Void> serviceFuture = flowController.enableControllerService(serviceNode);
future = CompletableFuture.allOf(future, serviceFuture);
@ -234,6 +238,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
group.setComments(comments);
}
group.onComponentModified();
return group;
}
@ -247,7 +252,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController.getFlowRegistryClient(), false);
final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController, flowController.getFlowRegistryClient(), false);
final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
.registryName(registryName)
@ -257,6 +262,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
.build();
group.setVersionControlInformation(vci, versionedComponentMapping);
group.onComponentModified();
return group;
}
@ -279,6 +285,8 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
.build();
group.setVersionControlInformation(svci, Collections.emptyMap());
group.onComponentModified();
return group;
}
@ -295,6 +303,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
.forEach(var -> variableMap.put(var.getName(), var.getValue()));
group.setVariables(variableMap);
group.onComponentModified();
return group;
}

View File

@ -411,6 +411,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
// configure the processor
configureProcessor(processor, processorDTO);
parentGroup.onComponentModified();
// attempt to change the underlying processor if an updated bundle is specified
// updating the bundle must happen after configuring so that any additional classpath resources are set first

View File

@ -314,6 +314,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
// perform the update
updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup);
remoteProcessGroup.getProcessGroup().onComponentModified();
return port;
}
@ -332,6 +333,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
// perform the update
updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup);
remoteProcessGroup.getProcessGroup().onComponentModified();
return port;
}
@ -373,8 +375,6 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
public RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroupDTO remoteProcessGroupDTO) {
RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupDTO.getId());
return updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);
}
private RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroup remoteProcessGroup, RemoteProcessGroupDTO remoteProcessGroupDTO) {
@ -447,6 +447,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
}
}
remoteProcessGroup.getProcessGroup().onComponentModified();
return remoteProcessGroup;
}

View File

@ -26,6 +26,7 @@ import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
@ -38,6 +39,9 @@ public class AffectedComponentUtils {
case AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR:
final ProcessorEntity procEntity = serviceFacade.getProcessor(componentEntity.getId(), user);
return dtoFactory.createAffectedComponentEntity(procEntity);
case AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE:
final ControllerServiceEntity serviceEntity = serviceFacade.getControllerService(componentEntity.getId(), user);
return dtoFactory.createAffectedComponentEntity(serviceEntity);
case AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT: {
final RemoteProcessGroupEntity remoteGroupEntity = serviceFacade.getRemoteProcessGroup(componentEntity.getComponent().getProcessGroupId(), user);
final RemoteProcessGroupContentsDTO remoteGroupContents = remoteGroupEntity.getComponent().getContents();

View File

@ -107,7 +107,8 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
final int scheduleComponentStatus = clusterResponse.getStatus();
if (scheduleComponentStatus != Status.OK.getStatusCode()) {
throw new LifecycleManagementException("Failed to transition components to a state of " + desiredState);
final String explanation = getResponseEntity(clusterResponse, String.class);
throw new LifecycleManagementException("Failed to transition components to a state of " + desiredState + " due to " + explanation);
}
final boolean processorsTransitioned = waitForProcessorStatus(user, exampleUri, groupId, componentMap, desiredState, pause);
@ -312,7 +313,8 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
final int disableServicesStatus = clusterResponse.getStatus();
if (disableServicesStatus != Status.OK.getStatusCode()) {
throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState);
final String explanation = getResponseEntity(clusterResponse, String.class);
throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState + " due to " + explanation);
}
final boolean serviceTransitioned = waitForControllerServiceStatus(user, originalUri, groupId, affectedServiceIds, desiredState, pause);