NIFI-4436, NIFI-4461: When copying and pasting an RPG, ensure that we copy Batch Settings for each Port. Bug fixes. Now works in clustered mode.

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Mark Payne 2017-11-22 09:55:30 -05:00 committed by Bryan Bende
parent c92022dd60
commit e1606701c7
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
12 changed files with 145 additions and 117 deletions

View File

@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull;
import java.io.IOException; import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -93,12 +94,15 @@ import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableDescriptor; import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.BatchSize;
import org.apache.nifi.registry.flow.Bundle; import org.apache.nifi.registry.flow.Bundle;
import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.ConnectableComponent; import org.apache.nifi.registry.flow.ConnectableComponent;
import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedControllerService; import org.apache.nifi.registry.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlow;
@ -128,7 +132,6 @@ import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.web.Revision; import org.apache.nifi.web.Revision;
@ -145,6 +148,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private final AtomicReference<String> comments; private final AtomicReference<String> comments;
private final AtomicReference<String> versionedComponentId = new AtomicReference<>(); private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
private final AtomicReference<StandardVersionControlInformation> versionControlInfo = new AtomicReference<>(); private final AtomicReference<StandardVersionControlInformation> versionControlInfo = new AtomicReference<>();
private static final SecureRandom randomGenerator = new SecureRandom();
private final StandardProcessScheduler scheduler; private final StandardProcessScheduler scheduler;
private final ControllerServiceProvider controllerServiceProvider; private final ControllerServiceProvider controllerServiceProvider;
@ -3018,10 +3022,20 @@ public final class StandardProcessGroup implements ProcessGroup {
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, new StaticDifferenceDescriptor()); final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, new StaticDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare(); final FlowComparison flowComparison = flowComparator.compare();
final Set<String> updatedVersionedComponentIds = flowComparison.getDifferences().stream() final Set<String> updatedVersionedComponentIds = new HashSet<>();
.filter(diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED) for (final FlowDifference diff : flowComparison.getDifferences()) {
.map(diff -> diff.getComponentA() == null ? diff.getComponentB().getIdentifier() : diff.getComponentA().getIdentifier()) if (diff.getDifferenceType() == DifferenceType.POSITION_CHANGED) {
.collect(Collectors.toSet()); continue;
}
final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA();
updatedVersionedComponentIds.add(component.getIdentifier());
if (component.getComponentType() == ComponentType.REMOTE_INPUT_PORT || component.getComponentType() == ComponentType.REMOTE_OUTPUT_PORT) {
final String remoteGroupId = ((VersionedRemoteGroupPort) component).getRemoteGroupId();
updatedVersionedComponentIds.add(remoteGroupId);
}
}
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
final String differencesByLine = flowComparison.getDifferences().stream() final String differencesByLine = flowComparison.getDifferences().stream()
@ -3106,13 +3120,13 @@ public final class StandardProcessGroup implements ProcessGroup {
.registryId(registryId) .registryId(registryId)
.registryName(registryName) .registryName(registryName)
.bucketId(bucketId) .bucketId(bucketId)
.bucketName(bucketId) // bucket name not yet known .bucketName(bucketId)
.flowId(flowId) .flowId(flowId)
.flowName(flowId) // flow id not yet known .flowName(flowId)
.version(version) .version(version)
.flowSnapshot(proposed) .flowSnapshot(proposed)
.modified(false) .modified(false)
.current(true) .current(remoteCoordinates.getLatest())
.build(); .build();
group.setVersionControlInformation(vci, Collections.emptyMap()); group.setVersionControlInformation(vci, Collections.emptyMap());
@ -3125,7 +3139,6 @@ public final class StandardProcessGroup implements ProcessGroup {
for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates(); final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates();
if (childGroup == null) { if (childGroup == null) {
@ -3291,7 +3304,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final RemoteProcessGroup added = addRemoteProcessGroup(group, proposedRpg, componentIdSeed); final RemoteProcessGroup added = addRemoteProcessGroup(group, proposedRpg, componentIdSeed);
LOG.info("Added {} to {}", added, this); LOG.info("Added {} to {}", added, this);
} else if (updatedVersionedComponentIds.contains(proposedRpg.getIdentifier())) { } else if (updatedVersionedComponentIds.contains(proposedRpg.getIdentifier())) {
updateRemoteProcessGroup(rpg, proposedRpg); updateRemoteProcessGroup(rpg, proposedRpg, componentIdSeed);
LOG.info("Updated {}", rpg); LOG.info("Updated {}", rpg);
} else { } else {
rpg.setPosition(new Position(proposedRpg.getPosition().getX(), proposedRpg.getPosition().getY())); rpg.setPosition(new Position(proposedRpg.getPosition().getX(), proposedRpg.getPosition().getY()));
@ -3388,27 +3401,28 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
} }
protected String generateUuid(final String componentIdSeed) { private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) {
UUID uuid; // TODO: I think we can get rid of all of those LinkedHashSet's now in the VersionedProcessGroup because
if (componentIdSeed == null) { /// the UUID is properly keyed off of the ID of the component in the VersionedProcessGroup.
uuid = ComponentIdGenerator.generateId(); long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits();
} else {
try {
UUID seedId = UUID.fromString(componentIdSeed);
uuid = new UUID(seedId.getMostSignificantBits(), componentIdSeed.hashCode());
} catch (Exception e) {
LOG.warn("Provided 'seed' does not represent UUID. Will not be able to extract most significant bits for ID generation.");
uuid = UUID.nameUUIDFromBytes(componentIdSeed.getBytes(StandardCharsets.UTF_8));
}
}
UUID uuid;
if (StringUtils.isBlank(seed)) {
long lsb = randomGenerator.nextLong();
// since msb is extracted from type-one UUID, the type-one semantics will be preserved
uuid = new UUID(msb, lsb);
} else {
UUID seedId = UUID.nameUUIDFromBytes((propposedId + destinationGroupId + seed).getBytes(StandardCharsets.UTF_8));
uuid = new UUID(msb, seedId.getLeastSignificantBits());
}
LOG.debug("Generating UUID {} from currentId={}, seed={}", uuid, propposedId, seed);
return uuid.toString(); return uuid.toString();
} }
private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed, final Set<String> variablesToSkip) private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed, final Set<String> variablesToSkip)
throws ProcessorInstantiationException { throws ProcessorInstantiationException {
final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed)); final ProcessGroup group = flowController.createProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
group.setVersionedComponentId(proposed.getIdentifier()); group.setVersionedComponentId(proposed.getIdentifier());
group.setParent(destination); group.setParent(destination);
updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, true, variablesToSkip); updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, true, variablesToSkip);
@ -3461,7 +3475,8 @@ public final class StandardProcessGroup implements ProcessGroup {
+ " but no component could be found in the Process Group with a corresponding identifier"); + " but no component could be found in the Process Group with a corresponding identifier");
} }
final Connection connection = flowController.createConnection(generateUuid(componentIdSeed), proposed.getName(), source, destination, proposed.getSelectedRelationships()); final Connection connection = flowController.createConnection(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName(), source, destination,
proposed.getSelectedRelationships());
connection.setVersionedComponentId(proposed.getIdentifier()); connection.setVersionedComponentId(proposed.getIdentifier());
destinationGroup.addConnection(connection); destinationGroup.addConnection(connection);
updateConnection(connection, proposed); updateConnection(connection, proposed);
@ -3523,7 +3538,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final String rpgId = connectableComponent.getGroupId(); final String rpgId = connectableComponent.getGroupId();
final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream() final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream()
.filter(component -> component.getVersionedComponentId().isPresent()) .filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get())) .filter(component -> rpgId.equals(component.getVersionedComponentId().get()))
.findAny(); .findAny();
if (!rpgOption.isPresent()) { if (!rpgOption.isPresent()) {
@ -3598,7 +3613,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private ControllerServiceNode addControllerService(final ProcessGroup destination, final VersionedControllerService proposed, final String componentIdSeed) { private ControllerServiceNode addControllerService(final ProcessGroup destination, final VersionedControllerService proposed, final String componentIdSeed) {
final String type = proposed.getType(); final String type = proposed.getType();
final String id = generateUuid(componentIdSeed); final String id = generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed);
final Bundle bundle = proposed.getBundle(); final Bundle bundle = proposed.getBundle();
final BundleCoordinate coordinate = toCoordinate(bundle); final BundleCoordinate coordinate = toCoordinate(bundle);
@ -3619,7 +3634,7 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
private Funnel addFunnel(final ProcessGroup destination, final VersionedFunnel proposed, final String componentIdSeed) { private Funnel addFunnel(final ProcessGroup destination, final VersionedFunnel proposed, final String componentIdSeed) {
final Funnel funnel = flowController.createFunnel(generateUuid(componentIdSeed)); final Funnel funnel = flowController.createFunnel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
funnel.setVersionedComponentId(proposed.getIdentifier()); funnel.setVersionedComponentId(proposed.getIdentifier());
destination.addFunnel(funnel); destination.addFunnel(funnel);
updateFunnel(funnel, proposed); updateFunnel(funnel, proposed);
@ -3634,7 +3649,7 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) { private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
final Port port = flowController.createLocalInputPort(generateUuid(componentIdSeed), proposed.getName()); final Port port = flowController.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
port.setVersionedComponentId(proposed.getIdentifier()); port.setVersionedComponentId(proposed.getIdentifier());
destination.addInputPort(port); destination.addInputPort(port);
updatePort(port, proposed); updatePort(port, proposed);
@ -3643,7 +3658,7 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) { private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
final Port port = flowController.createLocalOutputPort(generateUuid(componentIdSeed), proposed.getName()); final Port port = flowController.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
port.setVersionedComponentId(proposed.getIdentifier()); port.setVersionedComponentId(proposed.getIdentifier());
destination.addOutputPort(port); destination.addOutputPort(port);
updatePort(port, proposed); updatePort(port, proposed);
@ -3652,7 +3667,7 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
private Label addLabel(final ProcessGroup destination, final VersionedLabel proposed, final String componentIdSeed) { private Label addLabel(final ProcessGroup destination, final VersionedLabel proposed, final String componentIdSeed) {
final Label label = flowController.createLabel(generateUuid(componentIdSeed), proposed.getLabel()); final Label label = flowController.createLabel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getLabel());
label.setVersionedComponentId(proposed.getIdentifier()); label.setVersionedComponentId(proposed.getIdentifier());
destination.addLabel(label); destination.addLabel(label);
updateLabel(label, proposed); updateLabel(label, proposed);
@ -3669,7 +3684,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private ProcessorNode addProcessor(final ProcessGroup destination, final VersionedProcessor proposed, final String componentIdSeed) throws ProcessorInstantiationException { private ProcessorNode addProcessor(final ProcessGroup destination, final VersionedProcessor proposed, final String componentIdSeed) throws ProcessorInstantiationException {
final BundleCoordinate coordinate = toCoordinate(proposed.getBundle()); final BundleCoordinate coordinate = toCoordinate(proposed.getBundle());
final ProcessorNode procNode = flowController.createProcessor(proposed.getType(), generateUuid(componentIdSeed), coordinate, true); final ProcessorNode procNode = flowController.createProcessor(proposed.getType(), generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), coordinate, true);
procNode.setVersionedComponentId(proposed.getIdentifier()); procNode.setVersionedComponentId(proposed.getIdentifier());
destination.addProcessor(procNode); destination.addProcessor(procNode);
@ -3717,25 +3732,25 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) { private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) {
final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(componentIdSeed), proposed.getTargetUris()); final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getTargetUris());
rpg.setVersionedComponentId(proposed.getIdentifier()); rpg.setVersionedComponentId(proposed.getIdentifier());
destination.addRemoteProcessGroup(rpg); destination.addRemoteProcessGroup(rpg);
updateRemoteProcessGroup(rpg, proposed); updateRemoteProcessGroup(rpg, proposed, componentIdSeed);
return rpg; return rpg;
} }
private void updateRemoteProcessGroup(final RemoteProcessGroup rpg, final VersionedRemoteProcessGroup proposed) { private void updateRemoteProcessGroup(final RemoteProcessGroup rpg, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) {
rpg.setComments(proposed.getComments()); rpg.setComments(proposed.getComments());
rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout()); rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout());
rpg.setInputPorts(proposed.getInputPorts() == null ? Collections.emptySet() : proposed.getInputPorts().stream() rpg.setInputPorts(proposed.getInputPorts() == null ? Collections.emptySet() : proposed.getInputPorts().stream()
.map(port -> createPortDescriptor(port)) .map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
.collect(Collectors.toSet())); .collect(Collectors.toSet()));
rpg.setName(proposed.getName()); rpg.setName(proposed.getName());
rpg.setNetworkInterface(proposed.getLocalNetworkInterface()); rpg.setNetworkInterface(proposed.getLocalNetworkInterface());
rpg.setOutputPorts(proposed.getOutputPorts() == null ? Collections.emptySet() : proposed.getOutputPorts().stream() rpg.setOutputPorts(proposed.getOutputPorts() == null ? Collections.emptySet() : proposed.getOutputPorts().stream()
.map(port -> createPortDescriptor(port)) .map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
.collect(Collectors.toSet())); .collect(Collectors.toSet()));
rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
rpg.setProxyHost(proposed.getProxyHost()); rpg.setProxyHost(proposed.getProxyHost());
@ -3745,16 +3760,22 @@ public final class StandardProcessGroup implements ProcessGroup {
rpg.setYieldDuration(proposed.getYieldDuration()); rpg.setYieldDuration(proposed.getYieldDuration());
} }
private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed) { private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed, final String componentIdSeed, final String rpgId) {
final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
descriptor.setVersionedComponentId(proposed.getIdentifier()); descriptor.setVersionedComponentId(proposed.getIdentifier());
descriptor.setBatchCount(proposed.getBatchSize().getCount());
descriptor.setBatchDuration(proposed.getBatchSize().getDuration()); final BatchSize batchSize = proposed.getBatchSize();
descriptor.setBatchSize(proposed.getBatchSize().getSize()); if (batchSize != null) {
descriptor.setBatchCount(batchSize.getCount());
descriptor.setBatchDuration(batchSize.getDuration());
descriptor.setBatchSize(batchSize.getSize());
}
descriptor.setComments(proposed.getComments()); descriptor.setComments(proposed.getComments());
descriptor.setConcurrentlySchedulableTaskCount(proposed.getConcurrentlySchedulableTaskCount()); descriptor.setConcurrentlySchedulableTaskCount(proposed.getConcurrentlySchedulableTaskCount());
descriptor.setGroupId(proposed.getGroupId()); descriptor.setGroupId(proposed.getRemoteGroupId());
descriptor.setId(UUID.randomUUID().toString()); // TODO: Need to address this issue of port id's descriptor.setTargetId(proposed.getTargetId());
descriptor.setId(generateUuid(proposed.getIdentifier(), rpgId, componentIdSeed));
descriptor.setName(proposed.getName()); descriptor.setName(proposed.getName());
descriptor.setUseCompression(proposed.isUseCompression()); descriptor.setUseCompression(proposed.isUseCompression());
return descriptor; return descriptor;
@ -3785,29 +3806,8 @@ public final class StandardProcessGroup implements ProcessGroup {
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false); final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false);
final ComparableDataFlow currentFlow = new ComparableDataFlow() { final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
@Override final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());
public VersionedProcessGroup getContents() {
return versionedGroup;
}
@Override
public String getName() {
return "Local Flow";
}
};
final ComparableDataFlow snapshotFlow = new ComparableDataFlow() {
@Override
public VersionedProcessGroup getContents() {
return vci.getFlowSnapshot();
}
@Override
public String getName() {
return "Versioned Flow";
}
};
final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor()); final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare(); final FlowComparison comparison = flowComparator.compare();

View File

@ -231,6 +231,7 @@ public class RestBasedFlowRegistry implements FlowRegistry {
group.setProcessors(contents.getProcessors()); group.setProcessors(contents.getProcessors());
group.setRemoteProcessGroups(contents.getRemoteProcessGroups()); group.setRemoteProcessGroups(contents.getRemoteProcessGroups());
group.setVariables(contents.getVariables()); group.setVariables(contents.getVariables());
coordinates.setLatest(snapshot.isLatest());
} }
for (final VersionedProcessGroup child : group.getProcessGroups()) { for (final VersionedProcessGroup child : group.getProcessGroups()) {

View File

@ -309,10 +309,11 @@ public class NiFiRegistryDtoMapper {
port.setGroupIdentifier(getGroupId(dto.getGroupId())); port.setGroupIdentifier(getGroupId(dto.getGroupId()));
port.setComments(dto.getComments()); port.setComments(dto.getComments());
port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount()); port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount());
port.setGroupId(dto.getGroupId()); port.setRemoteGroupId(dto.getGroupId());
port.setName(dto.getName()); port.setName(dto.getName());
port.setUseCompression(dto.getUseCompression()); port.setUseCompression(dto.getUseCompression());
port.setBatchSettings(mapBatchSettings(dto.getBatchSettings())); port.setBatchSize(mapBatchSettings(dto.getBatchSettings()));
port.setTargetId(dto.getTargetId());
port.setComponentType(componentType); port.setComponentType(componentType);
return port; return port;
} }

View File

@ -310,7 +310,26 @@ public class NiFiRegistryFlowMapper {
} }
component.setComments(connectable.getComments()); component.setComments(connectable.getComments());
component.setGroupId(connectable.getProcessGroupIdentifier()); if (connectable instanceof RemoteGroupPort) {
final RemoteGroupPort port = (RemoteGroupPort) connectable;
final RemoteProcessGroup rpg = port.getRemoteProcessGroup();
final Optional<String> rpgVersionedId = rpg.getVersionedComponentId();
final String groupId;
if (rpgVersionedId.isPresent()) {
groupId = rpgVersionedId.get();
} else {
final String resolved = versionedComponentIds.get(rpg.getIdentifier());
if (resolved == null) {
throw new IllegalArgumentException("Unable to find the Versioned Component ID for Remote Process Group that " + connectable + " belongs to");
}
groupId = resolved;
}
component.setGroupId(groupId);
} else {
component.setGroupId(connectable.getProcessGroupIdentifier());
}
component.setName(connectable.getName()); component.setName(connectable.getName());
component.setType(ConnectableComponentType.valueOf(connectable.getConnectableType().name())); component.setType(ConnectableComponentType.valueOf(connectable.getConnectableType().name()));
return component; return component;
@ -478,10 +497,11 @@ public class NiFiRegistryFlowMapper {
port.setGroupIdentifier(getGroupId(remotePort.getRemoteProcessGroup().getIdentifier())); port.setGroupIdentifier(getGroupId(remotePort.getRemoteProcessGroup().getIdentifier()));
port.setComments(remotePort.getComments()); port.setComments(remotePort.getComments());
port.setConcurrentlySchedulableTaskCount(remotePort.getMaxConcurrentTasks()); port.setConcurrentlySchedulableTaskCount(remotePort.getMaxConcurrentTasks());
port.setGroupId(remotePort.getProcessGroupIdentifier()); port.setRemoteGroupId(getGroupId(remotePort.getRemoteProcessGroup().getIdentifier()));
port.setName(remotePort.getName()); port.setName(remotePort.getName());
port.setUseCompression(remotePort.isUseCompression()); port.setUseCompression(remotePort.isUseCompression());
port.setBatchSettings(mapBatchSettings(remotePort)); port.setBatchSize(mapBatchSettings(remotePort));
port.setTargetId(remotePort.getTargetIdentifier());
port.setComponentType(componentType); port.setComponentType(componentType);
return port; return port;
} }

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow.mapping;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
public class StandardComparableDataFlow implements ComparableDataFlow {
private final String name;
private final VersionedProcessGroup contents;
public StandardComparableDataFlow(final String name, final VersionedProcessGroup contents) {
this.name = name;
this.contents = contents;
}
@Override
public String getName() {
return name;
}
@Override
public VersionedProcessGroup getContents() {
return contents;
}
}

View File

@ -780,6 +780,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
if (!StringUtils.isBlank(descriptor.getBatchDuration())) { if (!StringUtils.isBlank(descriptor.getBatchDuration())) {
port.setBatchDuration(descriptor.getBatchDuration()); port.setBatchDuration(descriptor.getBatchDuration());
} }
port.setVersionedComponentId(descriptor.getVersionedComponentId());
inputPorts.put(descriptor.getId(), port); inputPorts.put(descriptor.getId(), port);
return port; return port;

View File

@ -299,6 +299,7 @@ public class FingerprintFactoryTest {
// Assert fingerprints with expected one. // Assert fingerprints with expected one.
final String expected = "portId" + final String expected = "portId" +
"NO_VALUE" +
"NO_VALUE" + "NO_VALUE" +
"3" + "3" +
"true" + "true" +

View File

@ -3756,29 +3756,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false); final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false);
final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents(); final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
final ComparableDataFlow localFlow = new ComparableDataFlow() { final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
@Override final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);
public VersionedProcessGroup getContents() {
return localGroup;
}
@Override
public String getName() {
return "Local Flow";
}
};
final ComparableDataFlow registryFlow = new ComparableDataFlow() {
@Override
public VersionedProcessGroup getContents() {
return registryGroup;
}
@Override
public String getName() {
return "Versioned Flow";
}
};
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor()); final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare(); final FlowComparison flowComparison = flowComparator.compare();
@ -4037,7 +4016,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
} }
if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_PROCESS_GROUP.name())) { if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_PROCESS_GROUP.name())) {
return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId()); return authorizableLookup.getRemoteProcessGroup(componentId);
} }
return null; return null;

View File

@ -604,24 +604,6 @@ public class FlowResource extends ApplicationResource {
componentIds.add(outputPort.getIdentifier()); componentIds.add(outputPort.getIdentifier());
}); });
// ensure authorized for each remote input port we will attempt to schedule
group.findAllRemoteProcessGroups().stream()
.flatMap(rpg -> rpg.getInputPorts().stream())
.filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
.filter(port -> port.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
.forEach(port -> {
componentIds.add(port.getIdentifier());
});
// ensure authorized for each remote output port we will attempt to schedule
group.findAllRemoteProcessGroups().stream()
.flatMap(rpg -> rpg.getOutputPorts().stream())
.filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
.filter(port -> port.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
.forEach(port -> {
componentIds.add(port.getIdentifier());
});
return componentIds; return componentIds;
}); });

View File

@ -1641,7 +1641,7 @@ public class ProcessGroupResource extends ApplicationResource {
// Step 6: Replicate the request or call serviceFacade.updateProcessGroup // Step 6: Replicate the request or call serviceFacade.updateProcessGroup
final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation(); final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation();
if (versionControlInfo != null) { if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) {
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
// Step 2: Retrieve flow from Flow Registry // Step 2: Retrieve flow from Flow Registry
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true); final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true);

View File

@ -709,7 +709,7 @@ public class VersionsResource extends ApplicationResource {
final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO(); final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO();
versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier()); versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier());
versionControlInfoDto.setBucketName(bucket.getName()); versionControlInfoDto.setBucketName(bucket.getName());
versionControlInfoDto.setCurrent(true); versionControlInfoDto.setCurrent(snapshotMetadata.getVersion() == flow.getVersionCount());
versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier()); versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier());
versionControlInfoDto.setFlowName(flow.getName()); versionControlInfoDto.setFlowName(flow.getName());
versionControlInfoDto.setFlowDescription(flow.getDescription()); versionControlInfoDto.setFlowDescription(flow.getDescription());
@ -1152,7 +1152,7 @@ public class VersionsResource extends ApplicationResource {
final String idGenerationSeed = getIdGenerationSeed().orElse(null); final String idGenerationSeed = getIdGenerationSeed().orElse(null);
// Step 0: Get the Versioned Flow Snapshot from the Flow Registry // Step 0: Get the Versioned Flow Snapshot from the Flow Registry
final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), false); final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true);
// The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update // The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
// the flow snapshot to contain compatible bundles. // the flow snapshot to contain compatible bundles.
@ -1217,7 +1217,7 @@ public class VersionsResource extends ApplicationResource {
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> { final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
try { try {
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri, final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false, false); affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false, true);
vcur.markComplete(updatedVersionControlEntity); vcur.markComplete(updatedVersionControlEntity);
} catch (final LifecycleManagementException e) { } catch (final LifecycleManagementException e) {

View File

@ -3428,6 +3428,7 @@ public final class DtoFactory {
batchCopy.setCount(batchOrg.getCount()); batchCopy.setCount(batchOrg.getCount());
batchCopy.setSize(batchOrg.getSize()); batchCopy.setSize(batchOrg.getSize());
batchCopy.setDuration(batchOrg.getDuration()); batchCopy.setDuration(batchOrg.getDuration());
copy.setBatchSettings(batchCopy);
} }
return copy; return copy;
} }