From e1606701c78984f794f96814d8f84a3f686099cf Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 22 Nov 2017 09:55:30 -0500 Subject: [PATCH] NIFI-4436, NIFI-4461: When copying and pasting an RPG, ensure that we copy Batch Settings for each Port. Bug fixes. Now works in clustered mode. Signed-off-by: Matt Gilman --- .../nifi/groups/StandardProcessGroup.java | 132 +++++++++--------- .../registry/flow/RestBasedFlowRegistry.java | 1 + .../flow/mapping/NiFiRegistryDtoMapper.java | 5 +- .../flow/mapping/NiFiRegistryFlowMapper.java | 26 +++- .../mapping/StandardComparableDataFlow.java | 42 ++++++ .../remote/StandardRemoteProcessGroup.java | 1 + .../fingerprint/FingerprintFactoryTest.java | 1 + .../nifi/web/StandardNiFiServiceFacade.java | 27 +--- .../org/apache/nifi/web/api/FlowResource.java | 18 --- .../nifi/web/api/ProcessGroupResource.java | 2 +- .../apache/nifi/web/api/VersionsResource.java | 6 +- .../apache/nifi/web/api/dto/DtoFactory.java | 1 + 12 files changed, 145 insertions(+), 117 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 51839d0390..77be3fedd4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull; import java.io.IOException; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -93,12 +94,15 @@ import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableDescriptor; import org.apache.nifi.registry.client.NiFiRegistryException; +import org.apache.nifi.registry.flow.BatchSize; import org.apache.nifi.registry.flow.Bundle; +import org.apache.nifi.registry.flow.ComponentType; import org.apache.nifi.registry.flow.ConnectableComponent; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.VersionedComponent; import org.apache.nifi.registry.flow.VersionedConnection; import org.apache.nifi.registry.flow.VersionedControllerService; import org.apache.nifi.registry.flow.VersionedFlow; @@ -128,7 +132,6 @@ import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.util.ComponentIdGenerator; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.Revision; @@ -145,6 +148,7 @@ public final class StandardProcessGroup implements ProcessGroup { private final AtomicReference comments; private final AtomicReference versionedComponentId = new AtomicReference<>(); private final AtomicReference versionControlInfo = new AtomicReference<>(); + private static final SecureRandom randomGenerator = new SecureRandom(); private final StandardProcessScheduler scheduler; private final ControllerServiceProvider controllerServiceProvider; @@ -3018,10 +3022,20 @@ public final class StandardProcessGroup implements ProcessGroup { final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, new StaticDifferenceDescriptor()); final FlowComparison flowComparison = flowComparator.compare(); - final Set updatedVersionedComponentIds = flowComparison.getDifferences().stream() - .filter(diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED) - .map(diff -> diff.getComponentA() == null ? diff.getComponentB().getIdentifier() : diff.getComponentA().getIdentifier()) - .collect(Collectors.toSet()); + final Set updatedVersionedComponentIds = new HashSet<>(); + for (final FlowDifference diff : flowComparison.getDifferences()) { + if (diff.getDifferenceType() == DifferenceType.POSITION_CHANGED) { + continue; + } + + final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA(); + updatedVersionedComponentIds.add(component.getIdentifier()); + + if (component.getComponentType() == ComponentType.REMOTE_INPUT_PORT || component.getComponentType() == ComponentType.REMOTE_OUTPUT_PORT) { + final String remoteGroupId = ((VersionedRemoteGroupPort) component).getRemoteGroupId(); + updatedVersionedComponentIds.add(remoteGroupId); + } + } if (LOG.isInfoEnabled()) { final String differencesByLine = flowComparison.getDifferences().stream() @@ -3106,13 +3120,13 @@ public final class StandardProcessGroup implements ProcessGroup { .registryId(registryId) .registryName(registryName) .bucketId(bucketId) - .bucketName(bucketId) // bucket name not yet known + .bucketName(bucketId) .flowId(flowId) - .flowName(flowId) // flow id not yet known + .flowName(flowId) .version(version) .flowSnapshot(proposed) .modified(false) - .current(true) + .current(remoteCoordinates.getLatest()) .build(); group.setVersionControlInformation(vci, Collections.emptyMap()); @@ -3125,7 +3139,6 @@ public final class StandardProcessGroup implements ProcessGroup { for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); - final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates(); if (childGroup == null) { @@ -3291,7 +3304,7 @@ public final class StandardProcessGroup implements ProcessGroup { final RemoteProcessGroup added = addRemoteProcessGroup(group, proposedRpg, componentIdSeed); LOG.info("Added {} to {}", added, this); } else if (updatedVersionedComponentIds.contains(proposedRpg.getIdentifier())) { - updateRemoteProcessGroup(rpg, proposedRpg); + updateRemoteProcessGroup(rpg, proposedRpg, componentIdSeed); LOG.info("Updated {}", rpg); } else { rpg.setPosition(new Position(proposedRpg.getPosition().getX(), proposedRpg.getPosition().getY())); @@ -3388,27 +3401,28 @@ public final class StandardProcessGroup implements ProcessGroup { } } - protected String generateUuid(final String componentIdSeed) { - UUID uuid; - if (componentIdSeed == null) { - uuid = ComponentIdGenerator.generateId(); - } else { - try { - UUID seedId = UUID.fromString(componentIdSeed); - uuid = new UUID(seedId.getMostSignificantBits(), componentIdSeed.hashCode()); - } catch (Exception e) { - LOG.warn("Provided 'seed' does not represent UUID. Will not be able to extract most significant bits for ID generation."); - uuid = UUID.nameUUIDFromBytes(componentIdSeed.getBytes(StandardCharsets.UTF_8)); - } - } + private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) { + // TODO: I think we can get rid of all of those LinkedHashSet's now in the VersionedProcessGroup because + /// the UUID is properly keyed off of the ID of the component in the VersionedProcessGroup. + long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits(); + UUID uuid; + if (StringUtils.isBlank(seed)) { + long lsb = randomGenerator.nextLong(); + // since msb is extracted from type-one UUID, the type-one semantics will be preserved + uuid = new UUID(msb, lsb); + } else { + UUID seedId = UUID.nameUUIDFromBytes((propposedId + destinationGroupId + seed).getBytes(StandardCharsets.UTF_8)); + uuid = new UUID(msb, seedId.getLeastSignificantBits()); + } + LOG.debug("Generating UUID {} from currentId={}, seed={}", uuid, propposedId, seed); return uuid.toString(); } private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed, final Set variablesToSkip) throws ProcessorInstantiationException { - final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed)); + final ProcessGroup group = flowController.createProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed)); group.setVersionedComponentId(proposed.getIdentifier()); group.setParent(destination); updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, true, variablesToSkip); @@ -3461,7 +3475,8 @@ public final class StandardProcessGroup implements ProcessGroup { + " but no component could be found in the Process Group with a corresponding identifier"); } - final Connection connection = flowController.createConnection(generateUuid(componentIdSeed), proposed.getName(), source, destination, proposed.getSelectedRelationships()); + final Connection connection = flowController.createConnection(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName(), source, destination, + proposed.getSelectedRelationships()); connection.setVersionedComponentId(proposed.getIdentifier()); destinationGroup.addConnection(connection); updateConnection(connection, proposed); @@ -3523,7 +3538,7 @@ public final class StandardProcessGroup implements ProcessGroup { final String rpgId = connectableComponent.getGroupId(); final Optional rpgOption = group.getRemoteProcessGroups().stream() .filter(component -> component.getVersionedComponentId().isPresent()) - .filter(component -> id.equals(component.getVersionedComponentId().get())) + .filter(component -> rpgId.equals(component.getVersionedComponentId().get())) .findAny(); if (!rpgOption.isPresent()) { @@ -3598,7 +3613,7 @@ public final class StandardProcessGroup implements ProcessGroup { private ControllerServiceNode addControllerService(final ProcessGroup destination, final VersionedControllerService proposed, final String componentIdSeed) { final String type = proposed.getType(); - final String id = generateUuid(componentIdSeed); + final String id = generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed); final Bundle bundle = proposed.getBundle(); final BundleCoordinate coordinate = toCoordinate(bundle); @@ -3619,7 +3634,7 @@ public final class StandardProcessGroup implements ProcessGroup { } private Funnel addFunnel(final ProcessGroup destination, final VersionedFunnel proposed, final String componentIdSeed) { - final Funnel funnel = flowController.createFunnel(generateUuid(componentIdSeed)); + final Funnel funnel = flowController.createFunnel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed)); funnel.setVersionedComponentId(proposed.getIdentifier()); destination.addFunnel(funnel); updateFunnel(funnel, proposed); @@ -3634,7 +3649,7 @@ public final class StandardProcessGroup implements ProcessGroup { } private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) { - final Port port = flowController.createLocalInputPort(generateUuid(componentIdSeed), proposed.getName()); + final Port port = flowController.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName()); port.setVersionedComponentId(proposed.getIdentifier()); destination.addInputPort(port); updatePort(port, proposed); @@ -3643,7 +3658,7 @@ public final class StandardProcessGroup implements ProcessGroup { } private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) { - final Port port = flowController.createLocalOutputPort(generateUuid(componentIdSeed), proposed.getName()); + final Port port = flowController.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName()); port.setVersionedComponentId(proposed.getIdentifier()); destination.addOutputPort(port); updatePort(port, proposed); @@ -3652,7 +3667,7 @@ public final class StandardProcessGroup implements ProcessGroup { } private Label addLabel(final ProcessGroup destination, final VersionedLabel proposed, final String componentIdSeed) { - final Label label = flowController.createLabel(generateUuid(componentIdSeed), proposed.getLabel()); + final Label label = flowController.createLabel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getLabel()); label.setVersionedComponentId(proposed.getIdentifier()); destination.addLabel(label); updateLabel(label, proposed); @@ -3669,7 +3684,7 @@ public final class StandardProcessGroup implements ProcessGroup { private ProcessorNode addProcessor(final ProcessGroup destination, final VersionedProcessor proposed, final String componentIdSeed) throws ProcessorInstantiationException { final BundleCoordinate coordinate = toCoordinate(proposed.getBundle()); - final ProcessorNode procNode = flowController.createProcessor(proposed.getType(), generateUuid(componentIdSeed), coordinate, true); + final ProcessorNode procNode = flowController.createProcessor(proposed.getType(), generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), coordinate, true); procNode.setVersionedComponentId(proposed.getIdentifier()); destination.addProcessor(procNode); @@ -3717,25 +3732,25 @@ public final class StandardProcessGroup implements ProcessGroup { } private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) { - final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(componentIdSeed), proposed.getTargetUris()); + final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getTargetUris()); rpg.setVersionedComponentId(proposed.getIdentifier()); destination.addRemoteProcessGroup(rpg); - updateRemoteProcessGroup(rpg, proposed); + updateRemoteProcessGroup(rpg, proposed, componentIdSeed); return rpg; } - private void updateRemoteProcessGroup(final RemoteProcessGroup rpg, final VersionedRemoteProcessGroup proposed) { + private void updateRemoteProcessGroup(final RemoteProcessGroup rpg, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) { rpg.setComments(proposed.getComments()); rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout()); rpg.setInputPorts(proposed.getInputPorts() == null ? Collections.emptySet() : proposed.getInputPorts().stream() - .map(port -> createPortDescriptor(port)) + .map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier())) .collect(Collectors.toSet())); rpg.setName(proposed.getName()); rpg.setNetworkInterface(proposed.getLocalNetworkInterface()); rpg.setOutputPorts(proposed.getOutputPorts() == null ? Collections.emptySet() : proposed.getOutputPorts().stream() - .map(port -> createPortDescriptor(port)) + .map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier())) .collect(Collectors.toSet())); rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); rpg.setProxyHost(proposed.getProxyHost()); @@ -3745,16 +3760,22 @@ public final class StandardProcessGroup implements ProcessGroup { rpg.setYieldDuration(proposed.getYieldDuration()); } - private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed) { + private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed, final String componentIdSeed, final String rpgId) { final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); descriptor.setVersionedComponentId(proposed.getIdentifier()); - descriptor.setBatchCount(proposed.getBatchSize().getCount()); - descriptor.setBatchDuration(proposed.getBatchSize().getDuration()); - descriptor.setBatchSize(proposed.getBatchSize().getSize()); + + final BatchSize batchSize = proposed.getBatchSize(); + if (batchSize != null) { + descriptor.setBatchCount(batchSize.getCount()); + descriptor.setBatchDuration(batchSize.getDuration()); + descriptor.setBatchSize(batchSize.getSize()); + } + descriptor.setComments(proposed.getComments()); descriptor.setConcurrentlySchedulableTaskCount(proposed.getConcurrentlySchedulableTaskCount()); - descriptor.setGroupId(proposed.getGroupId()); - descriptor.setId(UUID.randomUUID().toString()); // TODO: Need to address this issue of port id's + descriptor.setGroupId(proposed.getRemoteGroupId()); + descriptor.setTargetId(proposed.getTargetId()); + descriptor.setId(generateUuid(proposed.getIdentifier(), rpgId, componentIdSeed)); descriptor.setName(proposed.getName()); descriptor.setUseCompression(proposed.isUseCompression()); return descriptor; @@ -3785,29 +3806,8 @@ public final class StandardProcessGroup implements ProcessGroup { final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false); - final ComparableDataFlow currentFlow = new ComparableDataFlow() { - @Override - public VersionedProcessGroup getContents() { - return versionedGroup; - } - - @Override - public String getName() { - return "Local Flow"; - } - }; - - final ComparableDataFlow snapshotFlow = new ComparableDataFlow() { - @Override - public VersionedProcessGroup getContents() { - return vci.getFlowSnapshot(); - } - - @Override - public String getName() { - return "Versioned Flow"; - } - }; + final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup); + final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot()); final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor()); final FlowComparison comparison = flowComparator.compare(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java index 1d3eec606d..1147b9edb6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java @@ -231,6 +231,7 @@ public class RestBasedFlowRegistry implements FlowRegistry { group.setProcessors(contents.getProcessors()); group.setRemoteProcessGroups(contents.getRemoteProcessGroups()); group.setVariables(contents.getVariables()); + coordinates.setLatest(snapshot.isLatest()); } for (final VersionedProcessGroup child : group.getProcessGroups()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java index c3c10371a7..193bde8454 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java @@ -309,10 +309,11 @@ public class NiFiRegistryDtoMapper { port.setGroupIdentifier(getGroupId(dto.getGroupId())); port.setComments(dto.getComments()); port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount()); - port.setGroupId(dto.getGroupId()); + port.setRemoteGroupId(dto.getGroupId()); port.setName(dto.getName()); port.setUseCompression(dto.getUseCompression()); - port.setBatchSettings(mapBatchSettings(dto.getBatchSettings())); + port.setBatchSize(mapBatchSettings(dto.getBatchSettings())); + port.setTargetId(dto.getTargetId()); port.setComponentType(componentType); return port; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index a10a1b8fae..7bab76d815 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -310,7 +310,26 @@ public class NiFiRegistryFlowMapper { } component.setComments(connectable.getComments()); - component.setGroupId(connectable.getProcessGroupIdentifier()); + if (connectable instanceof RemoteGroupPort) { + final RemoteGroupPort port = (RemoteGroupPort) connectable; + final RemoteProcessGroup rpg = port.getRemoteProcessGroup(); + final Optional rpgVersionedId = rpg.getVersionedComponentId(); + final String groupId; + if (rpgVersionedId.isPresent()) { + groupId = rpgVersionedId.get(); + } else { + final String resolved = versionedComponentIds.get(rpg.getIdentifier()); + if (resolved == null) { + throw new IllegalArgumentException("Unable to find the Versioned Component ID for Remote Process Group that " + connectable + " belongs to"); + } + + groupId = resolved; + } + + component.setGroupId(groupId); + } else { + component.setGroupId(connectable.getProcessGroupIdentifier()); + } component.setName(connectable.getName()); component.setType(ConnectableComponentType.valueOf(connectable.getConnectableType().name())); return component; @@ -478,10 +497,11 @@ public class NiFiRegistryFlowMapper { port.setGroupIdentifier(getGroupId(remotePort.getRemoteProcessGroup().getIdentifier())); port.setComments(remotePort.getComments()); port.setConcurrentlySchedulableTaskCount(remotePort.getMaxConcurrentTasks()); - port.setGroupId(remotePort.getProcessGroupIdentifier()); + port.setRemoteGroupId(getGroupId(remotePort.getRemoteProcessGroup().getIdentifier())); port.setName(remotePort.getName()); port.setUseCompression(remotePort.isUseCompression()); - port.setBatchSettings(mapBatchSettings(remotePort)); + port.setBatchSize(mapBatchSettings(remotePort)); + port.setTargetId(remotePort.getTargetIdentifier()); port.setComponentType(componentType); return port; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java new file mode 100644 index 0000000000..fe92f91892 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.mapping; + +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.flow.diff.ComparableDataFlow; + +public class StandardComparableDataFlow implements ComparableDataFlow { + private final String name; + private final VersionedProcessGroup contents; + + public StandardComparableDataFlow(final String name, final VersionedProcessGroup contents) { + this.name = name; + this.contents = contents; + } + + @Override + public String getName() { + return name; + } + + @Override + public VersionedProcessGroup getContents() { + return contents; + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 726daa07c8..039ac66e94 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -780,6 +780,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { if (!StringUtils.isBlank(descriptor.getBatchDuration())) { port.setBatchDuration(descriptor.getBatchDuration()); } + port.setVersionedComponentId(descriptor.getVersionedComponentId()); inputPorts.put(descriptor.getId(), port); return port; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java index 31f1fbe2cb..30294a4759 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java @@ -299,6 +299,7 @@ public class FingerprintFactoryTest { // Assert fingerprints with expected one. final String expected = "portId" + + "NO_VALUE" + "NO_VALUE" + "3" + "true" + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index c66aebb401..e0594faf24 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -3756,29 +3756,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false); final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents(); - final ComparableDataFlow localFlow = new ComparableDataFlow() { - @Override - public VersionedProcessGroup getContents() { - return localGroup; - } - - @Override - public String getName() { - return "Local Flow"; - } - }; - - final ComparableDataFlow registryFlow = new ComparableDataFlow() { - @Override - public VersionedProcessGroup getContents() { - return registryGroup; - } - - @Override - public String getName() { - return "Versioned Flow"; - } - }; + final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup); + final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup); final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor()); final FlowComparison flowComparison = flowComparator.compare(); @@ -4037,7 +4016,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_PROCESS_GROUP.name())) { - return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId()); + return authorizableLookup.getRemoteProcessGroup(componentId); } return null; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java index 6bf4cca9f7..0ae5864734 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java @@ -604,24 +604,6 @@ public class FlowResource extends ApplicationResource { componentIds.add(outputPort.getIdentifier()); }); - // ensure authorized for each remote input port we will attempt to schedule - group.findAllRemoteProcessGroups().stream() - .flatMap(rpg -> rpg.getInputPorts().stream()) - .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS) - .filter(port -> port.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())) - .forEach(port -> { - componentIds.add(port.getIdentifier()); - }); - - // ensure authorized for each remote output port we will attempt to schedule - group.findAllRemoteProcessGroups().stream() - .flatMap(rpg -> rpg.getOutputPorts().stream()) - .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS) - .filter(port -> port.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())) - .forEach(port -> { - componentIds.add(port.getIdentifier()); - }); - return componentIds; }); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 7262a821a9..3fa4462245 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -1641,7 +1641,7 @@ public class ProcessGroupResource extends ApplicationResource { // Step 6: Replicate the request or call serviceFacade.updateProcessGroup final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation(); - if (versionControlInfo != null) { + if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) { // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. // Step 2: Retrieve flow from Flow Registry final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index f2a207e970..6e61182b0f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -709,7 +709,7 @@ public class VersionsResource extends ApplicationResource { final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO(); versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier()); versionControlInfoDto.setBucketName(bucket.getName()); - versionControlInfoDto.setCurrent(true); + versionControlInfoDto.setCurrent(snapshotMetadata.getVersion() == flow.getVersionCount()); versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier()); versionControlInfoDto.setFlowName(flow.getName()); versionControlInfoDto.setFlowDescription(flow.getDescription()); @@ -1152,7 +1152,7 @@ public class VersionsResource extends ApplicationResource { final String idGenerationSeed = getIdGenerationSeed().orElse(null); // Step 0: Get the Versioned Flow Snapshot from the Flow Registry - final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), false); + final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true); // The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update // the flow snapshot to contain compatible bundles. @@ -1217,7 +1217,7 @@ public class VersionsResource extends ApplicationResource { final Consumer> updateTask = vcur -> { try { final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri, - affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false, false); + affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false, true); vcur.markComplete(updatedVersionControlEntity); } catch (final LifecycleManagementException e) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 7d40473a0a..fb606047c1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -3428,6 +3428,7 @@ public final class DtoFactory { batchCopy.setCount(batchOrg.getCount()); batchCopy.setSize(batchOrg.getSize()); batchCopy.setDuration(batchOrg.getDuration()); + copy.setBatchSettings(batchCopy); } return copy; }