From 014c542f48b61c027152129ae7cd8c8535dd6a64 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 1 Dec 2017 16:31:22 -0500 Subject: [PATCH] NIFI-4436: Bug fix to ensure that RPG's ports are not removed until after connections are established to the ports; ensure that if a registry's name is changed that it is updated immediately in VersionControlInformation objects Signed-off-by: Matt Gilman --- .../nifi/groups/RemoteProcessGroup.java | 2 + .../nifi/controller/FlowController.java | 1 + .../controller/StandardFlowSynchronizer.java | 2 + .../nifi/groups/StandardProcessGroup.java | 12 ++- .../registry/flow/RestBasedFlowRegistry.java | 42 +++++++-- .../remote/StandardRemoteProcessGroup.java | 94 +++++++++++-------- .../apache/nifi/web/NiFiServiceFacade.java | 11 +++ .../nifi/web/StandardNiFiServiceFacade.java | 25 +++++ .../nifi/web/api/ProcessGroupResource.java | 12 +-- .../apache/nifi/web/api/VersionsResource.java | 15 ++- .../apache/nifi/web/api/dto/DtoFactory.java | 2 +- .../impl/StandardRemoteProcessGroupDAO.java | 1 + 12 files changed, 159 insertions(+), 60 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index 7d92246f40..39be045bd3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit; public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent { + void initialize(); + @Override String getIdentifier(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 2afa9dcfb7..158aaa27e0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -1779,6 +1779,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R */ public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException { instantiateSnippet(group, dto, true); + group.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize); } private void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto, final boolean topLevel) throws ProcessorInstantiationException { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 28d9b791b8..9cbf3234f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -360,6 +360,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion); } + rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize); + // If there are any Templates that do not exist in the Proposed Flow that do exist in the 'existing flow', we need // to ensure that we also add those to the appropriate Process Groups, so that we don't lose them. final Document existingFlowConfiguration = parseFlowBytes(existingFlow); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 4b186a9513..fb3d3a6a34 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -2963,6 +2963,13 @@ public final class StandardProcessGroup implements ProcessGroup { versionControlInformation.isCurrent(), versionControlInformation.getStatus()) { + @Override + public String getRegistryName() { + final String registryId = versionControlInformation.getRegistryIdentifier(); + final FlowRegistry registry = flowController.getFlowRegistryClient().getFlowRegistry(registryId); + return registry == null ? registryId : registry.getName(); + } + @Override public boolean isModified() { boolean updated = false; @@ -3220,7 +3227,7 @@ public final class StandardProcessGroup implements ProcessGroup { updated = flowStatus.compareAndSet(status, updatedStatus); } } catch (final IOException | NiFiRegistryException e) { - final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry"); + final String message = String.format("Failed to synchronize Process Group with Flow Registry : " + e.getMessage()); setSyncFailedState(message); LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e); @@ -3451,6 +3458,7 @@ public final class StandardProcessGroup implements ProcessGroup { if (childGroup == null) { final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip); + added.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize); LOG.info("Added {} to {}", added, this); } else if (childCoordinates == null || updateDescendantVersionedGroups) { updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName, updateDescendantVersionedGroups, variablesToSkip); @@ -3759,7 +3767,7 @@ public final class StandardProcessGroup implements ProcessGroup { final Connectable destination = getConnectable(destinationGroup, proposed.getDestination()); if (destination == null) { - throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getIdentifier() + throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getDestination().getId() + " but no component could be found in the Process Group with a corresponding identifier"); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java index 1147b9edb6..21e5e0c929 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java @@ -115,40 +115,61 @@ public class RestBasedFlowRegistry implements FlowRegistry { return (user == null || user.isAnonymous()) ? null : user.getIdentity(); } + private BucketClient getBucketClient(final NiFiUser user) { + final String identity = getIdentity(user); + final NiFiRegistryClient registryClient = getRegistryClient(); + final BucketClient bucketClient = identity == null ? registryClient.getBucketClient() : registryClient.getBucketClient(identity); + return bucketClient; + } + + private FlowSnapshotClient getFlowSnapshotClient(final NiFiUser user) { + final String identity = getIdentity(user); + final NiFiRegistryClient registryClient = getRegistryClient(); + final FlowSnapshotClient snapshotClient = identity == null ? registryClient.getFlowSnapshotClient() : registryClient.getFlowSnapshotClient(identity); + return snapshotClient; + } + + private FlowClient getFlowClient(final NiFiUser user) { + final String identity = getIdentity(user); + final NiFiRegistryClient registryClient = getRegistryClient(); + final FlowClient flowClient = identity == null ? registryClient.getFlowClient() : registryClient.getFlowClient(identity); + return flowClient; + } + @Override public Set getBuckets(final NiFiUser user) throws IOException, NiFiRegistryException { - final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user)); + final BucketClient bucketClient = getBucketClient(user); return new HashSet<>(bucketClient.getAll()); } @Override public Bucket getBucket(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException { - final BucketClient bucketClient = getRegistryClient().getBucketClient(getIdentity(user)); + final BucketClient bucketClient = getBucketClient(user); return bucketClient.get(bucketId); } @Override public Set getFlows(final String bucketId, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user)); + final FlowClient flowClient = getFlowClient(user); return new HashSet<>(flowClient.getByBucket(bucketId)); } @Override public Set getFlowVersions(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user)); + final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user); return new HashSet<>(snapshotClient.getSnapshotMetadata(bucketId, flowId)); } @Override public VersionedFlow registerVersionedFlow(final VersionedFlow flow, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user)); + final FlowClient flowClient = getFlowClient(user); return flowClient.create(flow); } @Override public VersionedFlow deleteVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user)); + final FlowClient flowClient = getFlowClient(user); return flowClient.delete(bucketId, flowId); } @@ -156,7 +177,7 @@ public class RestBasedFlowRegistry implements FlowRegistry { public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments, final int expectedVersion, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user)); + final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user); final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot(); versionedFlowSnapshot.setFlowContents(snapshot); @@ -174,13 +195,14 @@ public class RestBasedFlowRegistry implements FlowRegistry { @Override public int getLatestVersion(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { - return (int) getRegistryClient().getFlowClient(getIdentity(user)).get(bucketId, flowId).getVersionCount(); + return (int) getFlowClient(user).get(bucketId, flowId).getVersionCount(); } @Override public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowSnapshotClient snapshotClient = getRegistryClient().getFlowSnapshotClient(getIdentity(user)); + + final FlowSnapshotClient snapshotClient = getFlowSnapshotClient(user); final VersionedFlowSnapshot flowSnapshot = snapshotClient.get(bucketId, flowId, version); if (fetchRemoteFlows) { @@ -241,7 +263,7 @@ public class RestBasedFlowRegistry implements FlowRegistry { @Override public VersionedFlow getVersionedFlow(final String bucketId, final String flowId, final NiFiUser user) throws IOException, NiFiRegistryException { - final FlowClient flowClient = getRegistryClient().getFlowClient(getIdentity(user)); + final FlowClient flowClient = getFlowClient(user); return flowClient.get(bucketId, flowId); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index ef05a1b15f..5808500f38 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -16,43 +16,8 @@ */ package org.apache.nifi.remote; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.authorization.Resource; -import org.apache.nifi.authorization.resource.Authorizable; -import org.apache.nifi.authorization.resource.ResourceFactory; -import org.apache.nifi.authorization.resource.ResourceType; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.connectable.ConnectableType; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.connectable.Port; -import org.apache.nifi.connectable.Position; -import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.ProcessScheduler; -import org.apache.nifi.controller.ScheduledState; -import org.apache.nifi.controller.exception.CommunicationsException; -import org.apache.nifi.engine.FlowEngine; -import org.apache.nifi.events.BulletinFactory; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.groups.ProcessGroupCounts; -import org.apache.nifi.groups.RemoteProcessGroup; -import org.apache.nifi.groups.RemoteProcessGroupCounts; -import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; -import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; -import org.apache.nifi.remote.protocol.http.HttpProxy; -import org.apache.nifi.remote.util.SiteToSiteRestApiClient; -import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.reporting.ComponentType; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.api.dto.ControllerDTO; -import org.apache.nifi.web.api.dto.PortDTO; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static java.util.Objects.requireNonNull; -import javax.net.ssl.SSLContext; -import javax.ws.rs.core.Response; import java.io.File; import java.io.IOException; import java.net.InetAddress; @@ -82,7 +47,42 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; +import javax.net.ssl.SSLContext; +import javax.ws.rs.core.Response; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Resource; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.authorization.resource.ResourceFactory; +import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.connectable.Position; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.exception.CommunicationsException; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.events.BulletinFactory; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroupCounts; +import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.remote.protocol.http.HttpProxy; +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ComponentType; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Represents the Root Process Group of a remote NiFi Instance. Holds @@ -104,6 +104,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private final EventReporter eventReporter; private final NiFiProperties nifiProperties; private final long remoteContentsCacheExpiration; + private volatile boolean initialized = false; private final AtomicReference name = new AtomicReference<>(); private final AtomicReference position = new AtomicReference<>(new Position(0D, 0D)); @@ -179,7 +180,16 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { final Runnable checkAuthorizations = new InitializationTask(); backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id, true); - backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS); + backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 30L, 30L, TimeUnit.SECONDS); + } + + @Override + public void initialize() { + if (initialized) { + return; + } + + initialized = true; backgroundThreadExecutor.submit(() -> { try { refreshFlowContents(); @@ -820,6 +830,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { @Override public void refreshFlowContents() throws CommunicationsException { + if (!initialized) { + return; + } + try { // perform the request final ControllerDTO dto; @@ -1153,6 +1167,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { @Override public void run() { + if (!initialized) { + return; + } + try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) { try { final ControllerDTO dto = apiClient.getController(targetUris); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 02df16b400..be77d106dd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -417,6 +417,17 @@ public interface NiFiServiceFacade { */ void verifyComponentTypes(VersionedProcessGroup versionedGroup); + /** + * Verifies that the flow identified by the given Version Control Information can be imported into the Process Group + * with the given id + * + * @param versionControlInfo the information about the versioned flow + * @param groupId the ID of the Process Group where the flow should be instantiated + * + * @throws IllegalStateException if the flow cannot be imported into the specified group + */ + void verifyImportProcessGroup(VersionControlInformationDTO versionControlInfo, String groupId); + /** * Creates a new Template based off the specified snippet. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 49452963f4..4adb85b0ce 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -1862,6 +1862,31 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.verifyComponentTypes(versionedGroup); } + @Override + public void verifyImportProcessGroup(final VersionControlInformationDTO versionControlInfo, final String groupId) { + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + verifyImportProcessGroup(versionControlInfo, group); + } + + private void verifyImportProcessGroup(final VersionControlInformationDTO vciDto, final ProcessGroup group) { + if (group == null) { + return; + } + + final VersionControlInformation vci = group.getVersionControlInformation(); + if (vci != null) { + if (Objects.equals(vciDto.getRegistryId(), vci.getRegistryIdentifier()) + && Objects.equals(vciDto.getBucketId(), vci.getBucketIdentifier()) + && Objects.equals(vciDto.getFlowId(), vci.getFlowIdentifier())) { + + throw new IllegalStateException("Cannot import the specified Versioned Flow into the Process Group because doing so would cause a recursive dataflow. " + + "If Process Group A contains Process Group B, then Process Group B is not allowed to contain the flow identified by Process Group A."); + } + } + + verifyImportProcessGroup(vciDto, group.getParent()); + } + @Override public TemplateDTO createTemplate(final String name, final String description, final String snippetId, final String groupId, final Optional idGenerationSeed) { // get the specified snippet diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 7b753d6dc9..a3bb5b2576 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -1641,6 +1641,10 @@ public class ProcessGroupResource extends ApplicationResource { // Step 6: Replicate the request or call serviceFacade.updateProcessGroup final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation(); + if (versionControlInfo != null) { + serviceFacade.verifyImportProcessGroup(versionControlInfo, groupId); + } + if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) { // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail. // Step 2: Retrieve flow from Flow Registry @@ -1685,12 +1689,8 @@ public class ProcessGroupResource extends ApplicationResource { } } }, - () -> { - final VersionedFlowSnapshot versionedFlowSnapshot = requestProcessGroupEntity.getVersionedFlowSnapshot(); - if (versionedFlowSnapshot != null) { - serviceFacade.verifyComponentTypes(versionedFlowSnapshot.getFlowContents()); - } - }, + () -> { + }, processGroupEntity -> { final ProcessGroupDTO processGroup = processGroupEntity.getComponent(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index 245713e0ad..6dd641b677 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -424,15 +424,24 @@ public class VersionsResource extends ApplicationResource { if (versionedFlowDto == null) { throw new IllegalArgumentException("Version Control Information must be supplied."); } - if (versionedFlowDto.getBucketId() == null) { + if (StringUtils.isEmpty(versionedFlowDto.getBucketId())) { throw new IllegalArgumentException("The Bucket ID must be supplied."); } - if (versionedFlowDto.getFlowName() == null && versionedFlowDto.getFlowId() == null) { + if (StringUtils.isEmpty(versionedFlowDto.getFlowName()) && StringUtils.isEmpty(versionedFlowDto.getFlowId())) { throw new IllegalArgumentException("The Flow Name or Flow ID must be supplied."); } - if (versionedFlowDto.getRegistryId() == null) { + if (versionedFlowDto.getFlowName().length() > 1000) { + throw new IllegalArgumentException("The Flow Name cannot exceed 1,000 characters"); + } + if (StringUtils.isEmpty(versionedFlowDto.getRegistryId())) { throw new IllegalArgumentException("The Registry ID must be supplied."); } + if (versionedFlowDto.getDescription() != null && versionedFlowDto.getDescription().length() > 65535) { + throw new IllegalArgumentException("Flow Description cannot exceed 65,535 characters"); + } + if (versionedFlowDto.getComments() != null && versionedFlowDto.getComments().length() > 65535) { + throw new IllegalArgumentException("Comments cannot exceed 65,535 characters"); + } // ensure we're not attempting to version the root group final ProcessGroupEntity root = serviceFacade.getProcessGroup(FlowController.ROOT_GROUP_ID_ALIAS); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 8e5974ac54..7a1442d630 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1792,7 +1792,7 @@ public final class DtoFactory { componentDto.setId(processorDto.getId()); componentDto.setName(processorDto.getName()); componentDto.setProcessGroupId(processorDto.getParentGroupId()); - componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + componentDto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); componentDto.setState(processorDto.getState()); componentDto.setValidationErrors(processorDto.getValidationErrors()); component.setComponent(componentDto); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index c570dfcd4b..941aae07b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -85,6 +85,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot // create the remote process group RemoteProcessGroup remoteProcessGroup = flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), targetUris); + remoteProcessGroup.initialize(); // set other properties updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);