From 8959226b50cb2f3fc46722f32810ec06037985e4 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 7 Feb 2022 12:40:31 -0500 Subject: [PATCH] NIFI-9754: Introduced VersionedExternalFlow - Updated stateless and StandardProcessGroup, etc. to make use of VersionedExternalFlow - Updated StatelessDataflowDefinition to use ExternalVersionedFlow instead of generic type - Updated Stateless Bootstrap to avoid loading stateless engine libs from root class path but instead use a NarClassLoader to load the statelss nar Signed-off-by: Joe Gresock This closes #5832. --- .../ExternalControllerServiceReference.java | 2 +- .../nifi/flow/VersionedExternalFlow.java | 59 +++++++++ .../flow/VersionedExternalFlowMetadata.java | 84 ++++++++++++ .../apache/nifi}/flow/VersionedParameter.java | 2 +- .../nifi}/flow/VersionedParameterContext.java | 4 +- .../connect/StatelessKafkaConnectorUtil.java | 2 +- .../nifi/groups/ProcessGroupSynchronizer.java | 6 +- .../nifi/groups/StandardProcessGroup.java | 9 +- .../StandardProcessGroupSynchronizer.java | 13 +- .../registry/flow/RestBasedFlowRegistry.java | 2 + .../InstantiatedVersionedProcessGroup.java | 2 +- .../flow/mapping/NiFiRegistryFlowMapper.java | 6 +- .../mapping/StandardComparableDataFlow.java | 2 +- .../controller/flow/VersionedDataflow.java | 2 +- .../org/apache/nifi/groups/ProcessGroup.java | 8 +- .../nifi/registry/flow/FlowRegistry.java | 2 + .../VersionedDataflowMapper.java | 2 +- .../VersionedFlowSynchronizer.java | 16 +-- .../service/mock/MockProcessGroup.java | 8 +- .../MockSingleFlowRegistryClient.java | 4 +- .../integration/versioned/ImportFlowIT.java | 122 ++++++++---------- .../mapping/NiFiRegistryFlowMapperTest.java | 6 +- .../AuthorizeParameterReference.java | 4 +- .../apache/nifi/web/NiFiServiceFacade.java | 4 +- .../nifi/web/StandardNiFiServiceFacade.java | 15 ++- .../nifi/web/api/FlowUpdateResource.java | 2 +- .../nifi/web/api/ProcessGroupResource.java | 2 +- .../apache/nifi/web/dao/ProcessGroupDAO.java | 4 +- .../web/dao/impl/StandardProcessGroupDAO.java | 6 +- .../web/StandardNiFiServiceFacadeTest.java | 4 +- .../stateless/ExecuteStateless.java | 13 +- .../nifi/registry/VersionedFlowConverter.java | 54 ++++++++ .../registry/flow/VersionedFlowSnapshot.java | 4 +- .../flow/diff/ComparableDataFlow.java | 2 +- .../flow/diff/StandardComparableDataFlow.java | 2 +- .../flow/diff/StandardFlowComparator.java | 4 +- .../TestFlowContentSerializer.java | 2 +- .../api/UnsecuredNiFiRegistryClientIT.java | 6 +- .../stateless/flow/DataflowDefinition.java | 5 +- .../flow/DataflowDefinitionParser.java | 4 +- .../flow/StatelessDataflowFactory.java | 4 +- .../stateless/bootstrap/RunStatelessFlow.java | 2 +- .../bootstrap/StatelessBootstrap.java | 58 +++------ .../reporting/StatelessReportingContext.java | 4 +- .../reporting/StatelessReportingTaskNode.java | 4 +- .../scheduling/StatelessProcessScheduler.java | 2 +- .../registry/flow/InMemoryFlowRegistry.java | 55 ++++++-- .../PropertiesFileFlowDefinitionParser.java | 14 +- .../stateless/engine/ComponentBuilder.java | 5 +- .../engine/StandardStatelessEngine.java | 22 ++-- .../stateless/engine/StatelessEngine.java | 4 +- .../engine/StatelessFlowManager.java | 5 +- .../engine/StatelessReloadComponent.java | 8 +- .../flow/StandardDataflowDefinition.java | 34 ++--- .../StandardStatelessDataflowFactory.java | 13 +- .../stateless/flow/StandardStatelessFlow.java | 4 +- ...estPropertiesFileFlowDefinitionParser.java | 2 +- .../nifi/stateless/StatelessSystemIT.java | 21 ++- .../nifi/stateless/VersionedFlowBuilder.java | 2 +- .../parameters/ParameterContextIT.java | 4 +- 60 files changed, 479 insertions(+), 288 deletions(-) rename {nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry => nifi-api/src/main/java/org/apache/nifi}/flow/ExternalControllerServiceReference.java (97%) create mode 100644 nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlow.java create mode 100644 nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlowMetadata.java rename {nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry => nifi-api/src/main/java/org/apache/nifi}/flow/VersionedParameter.java (98%) rename {nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry => nifi-api/src/main/java/org/apache/nifi}/flow/VersionedParameterContext.java (94%) create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/VersionedFlowConverter.java diff --git a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/ExternalControllerServiceReference.java b/nifi-api/src/main/java/org/apache/nifi/flow/ExternalControllerServiceReference.java similarity index 97% rename from nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/ExternalControllerServiceReference.java rename to nifi-api/src/main/java/org/apache/nifi/flow/ExternalControllerServiceReference.java index 3bc269e7a9..2e7ba92fbc 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/ExternalControllerServiceReference.java +++ b/nifi-api/src/main/java/org/apache/nifi/flow/ExternalControllerServiceReference.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.registry.flow; +package org.apache.nifi.flow; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; diff --git a/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlow.java b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlow.java new file mode 100644 index 0000000000..469ae7eeac --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlow.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.flow; + +import java.util.Map; + +public class VersionedExternalFlow { + private VersionedProcessGroup flowContents; + private Map externalControllerServices; + private Map parameterContexts; + private VersionedExternalFlowMetadata metadata; + + public VersionedProcessGroup getFlowContents() { + return flowContents; + } + + public void setFlowContents(final VersionedProcessGroup flowContents) { + this.flowContents = flowContents; + } + + public Map getExternalControllerServices() { + return externalControllerServices; + } + + public void setExternalControllerServices(final Map externalControllerServices) { + this.externalControllerServices = externalControllerServices; + } + + public Map getParameterContexts() { + return parameterContexts; + } + + public void setParameterContexts(final Map parameterContexts) { + this.parameterContexts = parameterContexts; + } + + public VersionedExternalFlowMetadata getMetadata() { + return metadata; + } + + public void setMetadata(final VersionedExternalFlowMetadata metadata) { + this.metadata = metadata; + } +} diff --git a/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlowMetadata.java b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlowMetadata.java new file mode 100644 index 0000000000..4486b5ec40 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedExternalFlowMetadata.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.flow; + +public class VersionedExternalFlowMetadata { + private String bucketId; + private String flowId; + private int version; + private String flowName; + private String author; + private String comments; + private long timestamp; + + public String getBucketIdentifier() { + return bucketId; + } + + public void setBucketIdentifier(final String bucketIdentifier) { + this.bucketId = bucketIdentifier; + } + + public String getFlowIdentifier() { + return flowId; + } + + public void setFlowIdentifier(final String flowId) { + this.flowId = flowId; + } + + public int getVersion() { + return version; + } + + public void setVersion(final int version) { + this.version = version; + } + + public String getFlowName() { + return flowName; + } + + public void setFlowName(final String flowName) { + this.flowName = flowName; + } + + public String getAuthor() { + return author; + } + + public void setAuthor(final String author) { + this.author = author; + } + + public String getComments() { + return comments; + } + + public void setComments(final String comments) { + this.comments = comments; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(final long timestamp) { + this.timestamp = timestamp; + } +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameter.java b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedParameter.java similarity index 98% rename from nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameter.java rename to nifi-api/src/main/java/org/apache/nifi/flow/VersionedParameter.java index 857dd167da..ea5ee7bced 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameter.java +++ b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedParameter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.registry.flow; +package org.apache.nifi.flow; import io.swagger.annotations.ApiModelProperty; diff --git a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameterContext.java b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedParameterContext.java similarity index 94% rename from nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameterContext.java rename to nifi-api/src/main/java/org/apache/nifi/flow/VersionedParameterContext.java index 5cfefcdaf8..a3b1d71ce7 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedParameterContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/flow/VersionedParameterContext.java @@ -14,11 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.registry.flow; +package org.apache.nifi.flow; import io.swagger.annotations.ApiModelProperty; -import org.apache.nifi.flow.ComponentType; -import org.apache.nifi.flow.VersionedComponent; import java.util.List; import java.util.Set; diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java index 91e3e5263c..4e33a485b9 100644 --- a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java +++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java @@ -155,7 +155,7 @@ public class StatelessKafkaConnectorUtil { final List parameterOverrides = parseParameterOverrides(properties); final String dataflowName = properties.get(DATAFLOW_NAME); - final DataflowDefinition dataflowDefinition; + final DataflowDefinition dataflowDefinition; final StatelessBootstrap bootstrap; try { final Map dataflowDefinitionProperties = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java index 688ea1c31f..25cea8802b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/ProcessGroupSynchronizer.java @@ -18,18 +18,18 @@ package org.apache.nifi.groups; import org.apache.nifi.controller.exception.ProcessorInstantiationException; +import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; public interface ProcessGroupSynchronizer { /** * Synchronize the given Process Group to match the proposed snaphsot * @param group the Process Group to update - * @param proposedSnapshot the proposed/desired state for the process group + * @param proposedFlow the proposed/desired state for the process group * @param synchronizationOptions options for how to synchronize the group */ - void synchronize(ProcessGroup group, VersionedFlowSnapshot proposedSnapshot, GroupSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException; + void synchronize(ProcessGroup group, VersionedExternalFlow proposedFlow, GroupSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException; void verifyCanSynchronize(ProcessGroup group, VersionedProcessGroup proposed, boolean verifyConnectionRemoval); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 3e679c727e..041d2cc0cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -64,6 +64,7 @@ import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.encrypt.PropertyEncryptor; +import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.logging.LogRepository; import org.apache.nifi.logging.LogRepositoryFactory; @@ -3778,7 +3779,7 @@ public final class StandardProcessGroup implements ProcessGroup { } @Override - public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings, + public void updateFlow(final VersionedExternalFlow proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings, final boolean updateDescendantVersionedFlows) { final ComponentIdGenerator idGenerator = (proposedId, instanceId, destinationGroupId) -> generateUuid(proposedId, destinationGroupId, componentIdSeed); @@ -3815,7 +3816,7 @@ public final class StandardProcessGroup implements ProcessGroup { } @Override - public void synchronizeFlow(final VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) { + public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) { writeLock.lock(); try { verifyCanUpdate(proposedSnapshot, true, !synchronizationOptions.isIgnoreLocalModifications()); @@ -3924,13 +3925,13 @@ public final class StandardProcessGroup implements ProcessGroup { } @Override - public void verifyCanUpdate(final VersionedFlowSnapshot updatedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) { + public void verifyCanUpdate(final VersionedExternalFlow updatedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) { readLock.lock(); try { // flow id match and not dirty check concepts are only applicable to versioned flows final VersionControlInformation versionControlInfo = getVersionControlInformation(); if (versionControlInfo != null) { - if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) { + if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getMetadata().getFlowIdentifier())) { throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with"); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java index baad3c48e1..3761f65f73 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java @@ -48,6 +48,7 @@ import org.apache.nifi.flow.ConnectableComponent; import org.apache.nifi.flow.VersionedComponent; import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedFlowCoordinates; import org.apache.nifi.flow.VersionedFunnel; import org.apache.nifi.flow.VersionedLabel; @@ -71,8 +72,8 @@ import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowState; -import org.apache.nifi.registry.flow.VersionedParameter; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameter; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; import org.apache.nifi.registry.flow.diff.DifferenceType; import org.apache.nifi.registry.flow.diff.FlowComparator; @@ -142,13 +143,13 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize } @Override - public void synchronize(final ProcessGroup group, final VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions options) { + public void synchronize(final ProcessGroup group, final VersionedExternalFlow versionedExternalFlow, final GroupSynchronizationOptions options) { final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(context.getExtensionManager(), context.getFlowMappingOptions()); final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(group, context.getControllerServiceProvider(), context.getFlowRegistryClient(), true); final ComparableDataFlow localFlow = new StandardComparableDataFlow("Currently Loaded Flow", versionedGroup); - final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", proposedSnapshot.getFlowContents()); + final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", versionedExternalFlow.getFlowContents()); final PropertyDecryptor decryptor = options.getPropertyDecryptor(); final FlowComparator flowComparator = new StandardFlowComparator(proposedFlow, localFlow, group.getAncestorServiceIds(), new StaticDifferenceDescriptor(), decryptor::decrypt); @@ -199,7 +200,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize .map(FlowDifference::toString) .collect(Collectors.joining("\n")); - LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, proposedSnapshot, + LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow, flowComparison.getDifferences().size(), differencesByLine); } @@ -217,7 +218,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize context.getFlowManager().withParameterContextResolution(() -> { try { - synchronize(group, proposedSnapshot.getFlowContents(), proposedSnapshot.getParameterContexts()); + synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts()); } catch (final ProcessorInstantiationException pie) { throw new RuntimeException(pie); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java index 8ca2c37dac..97d78f657a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java @@ -18,7 +18,9 @@ package org.apache.nifi.registry.flow; import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.flow.VersionedFlowCoordinates; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.registry.bucket.Bucket; import org.apache.nifi.registry.client.BucketClient; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java index b1d1a13f1b..bcc35caf0e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java @@ -17,7 +17,7 @@ package org.apache.nifi.registry.flow.mapping; -import org.apache.nifi.registry.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.flow.VersionedProcessGroup; import javax.xml.bind.annotation.XmlTransient; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index 2f0d8dbd83..626544d202 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -67,12 +67,12 @@ import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterDescriptor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.registry.VariableDescriptor; -import org.apache.nifi.registry.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; -import org.apache.nifi.registry.flow.VersionedParameter; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameter; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.remote.PublicPort; import org.apache.nifi.remote.RemoteGroupPort; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java index 2f210a0ed9..c1514edbae 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java @@ -20,7 +20,7 @@ package org.apache.nifi.registry.flow.mapping; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedReportingTask; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; import java.util.Collections; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/VersionedDataflow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/VersionedDataflow.java index b7e4a031cd..b5ab3fe2fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/VersionedDataflow.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/VersionedDataflow.java @@ -20,7 +20,7 @@ package org.apache.nifi.controller.flow; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedReportingTask; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameterContext; import java.util.List; import java.util.Set; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index f642d36ef0..5d253c5f55 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -33,6 +33,7 @@ import org.apache.nifi.controller.Triggerable; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterUpdate; @@ -40,7 +41,6 @@ import org.apache.nifi.processor.Processor; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.mapping.FlowMappingOptions; import org.apache.nifi.remote.RemoteGroupPort; @@ -880,7 +880,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to * update the contents of that Process Group */ - void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVersionedFlows); + void updateFlow(VersionedExternalFlow proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVersionedFlows); /** * Updates the Process Group to match the proposed flow @@ -889,7 +889,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * @param synchronizationOptions options for how the synchronization should occur * @param flowMappingOptions options for how to map the existing dataflow into Versioned components so that it can be compared to the proposed snapshot */ - void synchronizeFlow(VersionedFlowSnapshot proposedSnapshot, GroupSynchronizationOptions synchronizationOptions, FlowMappingOptions flowMappingOptions); + void synchronizeFlow(VersionedExternalFlow proposedSnapshot, GroupSynchronizationOptions synchronizationOptions, FlowMappingOptions flowMappingOptions); /** * Verifies a template with the specified name can be created. @@ -976,7 +976,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * * @throws IllegalStateException if the Process Group is not in a state that will allow the update */ - void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty); + void verifyCanUpdate(VersionedExternalFlow updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty); /** * Ensures that the Process Group can have any local changes reverted diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java index 69b891d1d2..5a7d10e147 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java @@ -18,6 +18,8 @@ package org.apache.nifi.registry.flow; import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.registry.bucket.Bucket; import org.apache.nifi.registry.client.NiFiRegistryException; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedDataflowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedDataflowMapper.java index 2c7e7847b3..d6951ebd8e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedDataflowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedDataflowMapper.java @@ -36,7 +36,7 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.registry.flow.mapping.ComponentIdLookup; import org.apache.nifi.registry.flow.mapping.FlowMappingOptions; import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java index 226210344d..14f2799009 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java @@ -52,6 +52,9 @@ import org.apache.nifi.encrypt.PropertyEncryptor; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.ScheduledState; import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedExternalFlow; +import org.apache.nifi.flow.VersionedParameter; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.flow.VersionedReportingTask; @@ -69,9 +72,6 @@ import org.apache.nifi.parameter.ParameterDescriptor; import org.apache.nifi.persistence.FlowConfigurationArchiveManager; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; -import org.apache.nifi.registry.flow.VersionedParameter; -import org.apache.nifi.registry.flow.VersionedParameterContext; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; import org.apache.nifi.registry.flow.diff.DifferenceDescriptor; import org.apache.nifi.registry.flow.diff.FlowComparator; @@ -298,9 +298,9 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { final Map versionedParameterContextMap = new HashMap<>(); versionedFlow.getParameterContexts().forEach(context -> versionedParameterContextMap.put(context.getName(), context)); - final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot(); - versionedFlowSnapshot.setParameterContexts(versionedParameterContextMap); - versionedFlowSnapshot.setFlowContents(versionedFlow.getRootGroup()); + final VersionedExternalFlow versionedExternalFlow = new VersionedExternalFlow(); + versionedExternalFlow.setParameterContexts(versionedParameterContextMap); + versionedExternalFlow.setFlowContents(versionedFlow.getRootGroup()); // Inherit controller-level components. inheritControllerServices(controller, versionedFlow, affectedComponentSet); @@ -313,7 +313,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { final ComponentScheduler componentScheduler = new FlowControllerComponentScheduler(controller); if (rootGroup.isEmpty()) { - final VersionedProcessGroup versionedRoot = versionedFlowSnapshot.getFlowContents(); + final VersionedProcessGroup versionedRoot = versionedExternalFlow.getFlowContents(); rootGroup = controller.getFlowManager().createProcessGroup(versionedRoot.getInstanceIdentifier()); rootGroup.setComments(versionedRoot.getComments()); rootGroup.setPosition(new Position(versionedRoot.getPosition().getX(), versionedRoot.getPosition().getY())); @@ -350,7 +350,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer { .mapControllerServiceReferencesToVersionedId(false) .build(); - rootGroup.synchronizeFlow(versionedFlowSnapshot, syncOptions, flowMappingOptions); + rootGroup.synchronizeFlow(versionedExternalFlow, syncOptions, flowMappingOptions); // Inherit templates, now that all necessary Process Groups have been created inheritTemplates(controller, versionedFlow); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index 0c9e711562..2a2e641bcf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -33,6 +33,7 @@ import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.groups.BatchCounts; import org.apache.nifi.groups.DataValve; import org.apache.nifi.groups.FlowFileConcurrency; @@ -48,7 +49,6 @@ import org.apache.nifi.parameter.ParameterUpdate; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.mapping.FlowMappingOptions; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.remote.RemoteGroupPort; @@ -703,7 +703,7 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty) { + public void verifyCanUpdate(VersionedExternalFlow updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty) { } @Override @@ -715,11 +715,11 @@ public class MockProcessGroup implements ProcessGroup { } @Override - public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVerisonedFlows) { + public void updateFlow(VersionedExternalFlow proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVerisonedFlows) { } @Override - public void synchronizeFlow(final VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) { + public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) { } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java index 7ee71b57ee..e226e0902b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/MockSingleFlowRegistryClient.java @@ -19,13 +19,13 @@ package org.apache.nifi.integration; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.registry.bucket.Bucket; import org.apache.nifi.registry.client.NiFiRegistryException; -import org.apache.nifi.registry.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; import java.io.IOException; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java index 97a51b1eb5..92f711e87a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java @@ -26,9 +26,17 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.StandardSnippet; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedExternalFlow; +import org.apache.nifi.flow.VersionedExternalFlowMetadata; import org.apache.nifi.flow.VersionedFunnel; +import org.apache.nifi.flow.VersionedParameter; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedPort; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.integration.DirectInjectionExtensionManager; import org.apache.nifi.integration.FrameworkIntegrationTest; @@ -44,15 +52,7 @@ import org.apache.nifi.parameter.StandardParameterContext; import org.apache.nifi.parameter.StandardParameterReferenceManager; import org.apache.nifi.processor.Processor; import org.apache.nifi.registry.bucket.Bucket; -import org.apache.nifi.flow.Bundle; -import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.registry.flow.VersionedFlow; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; -import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; -import org.apache.nifi.registry.flow.VersionedParameter; -import org.apache.nifi.registry.flow.VersionedParameterContext; -import org.apache.nifi.flow.VersionedProcessGroup; -import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor; import org.apache.nifi.registry.flow.diff.DifferenceType; @@ -103,7 +103,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { processor.setAutoTerminatedRelationships(Collections.singleton(REL_SUCCESS)); processor.setProperties(Collections.singletonMap(NopServiceReferencingProcessor.SERVICE.getName(), controllerService.getIdentifier())); - final VersionedFlowSnapshot proposedFlow = createFlowSnapshot(Collections.singletonList(controllerService), Collections.singletonList(processor), null); + final VersionedExternalFlow proposedFlow = createFlowSnapshot(Collections.singletonList(controllerService), Collections.singletonList(processor), null); // Create an Inner Process Group and update it to match the Versioned Flow. final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -146,8 +146,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest { final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class); processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "password")); - // Create a VersionedFlowSnapshot that contains the processor - final VersionedFlowSnapshot versionedFlowWithExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null); + // Create a VersionedExternalFlow that contains the processor + final VersionedExternalFlow versionedFlowWithExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null); // Create child group final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -173,7 +173,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { assertEquals(DifferenceType.PROPERTY_PARAMETERIZED, differences.iterator().next().getDifferenceType()); // Create a Versioned Flow that contains the Parameter Reference. - final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null); + final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null); // Ensure no difference between the current configuration and the versioned flow differences = getLocalModifications(innerGroup, versionedFlowWithParameterReference); @@ -191,9 +191,9 @@ public class ImportFlowIT extends FrameworkIntegrationTest { final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class); processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}")); - // Create a VersionedFlowSnapshot that contains the processor + // Create a VersionedExternalFlow that contains the processor final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null); - final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter)); + final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter)); // Create child group final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -223,9 +223,9 @@ public class ImportFlowIT extends FrameworkIntegrationTest { final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class); processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}")); - // Create a VersionedFlowSnapshot that contains the processor + // Create a VersionedExternalFlow that contains the processor final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null); - final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter)); + final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter)); // Create child group final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -253,15 +253,15 @@ public class ImportFlowIT extends FrameworkIntegrationTest { final ProcessorNode initialProcessor = createProcessorNode(UsernamePasswordProcessor.class); initialProcessor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}")); - // Create a VersionedFlowSnapshot that contains the processor + // Create a VersionedExternalFlow that contains the processor final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null); - final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), + final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), Collections.singleton(parameter)); // Update processor to have an explicit value for the second version of the flow. initialProcessor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value")); - final VersionedFlowSnapshot versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); + final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); // Create child group and update to the first version of the flow, with parameter ref final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -290,7 +290,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "pass"); initialProcessor.setProperties(initialProperties); - final VersionedFlowSnapshot initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); + final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); // Update processor to have a different explicit value for both sensitive and non-sensitive properties and create a versioned flow for it. final Map updatedProperties = new HashMap<>(); @@ -298,7 +298,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "pass"); initialProcessor.setProperties(updatedProperties); - final VersionedFlowSnapshot updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); + final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); // Create child group and update to the first version of the flow, with parameter ref final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -335,7 +335,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"); initialProcessor.setProperties(initialProperties); - final VersionedFlowSnapshot initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); + final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); // Update processor to have a different explicit value for both sensitive and non-sensitive properties and create a versioned flow for it. final Map updatedProperties = new HashMap<>(); @@ -343,7 +343,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "#{other-param}"); initialProcessor.setProperties(updatedProperties); - final VersionedFlowSnapshot updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); + final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null); // Create child group and update to the first version of the flow, with parameter ref final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -369,12 +369,12 @@ public class ImportFlowIT extends FrameworkIntegrationTest { processorWithExplicitValue.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value")); - // Create a VersionedFlowSnapshot that contains the processor + // Create a VersionedExternalFlow that contains the processor final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null); - final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), + final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processorWithParamRef), Collections.singleton(parameter)); - final VersionedFlowSnapshot versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processorWithExplicitValue), null); + final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processorWithExplicitValue), null); // Create child group and update to the first version of the flow, with parameter ref final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id"); @@ -406,7 +406,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { groupA.addInputPort(port); //Create a snapshot - final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA); + final VersionedExternalFlow version1 = createFlowSnapshot(groupA); //Create Process Group B under Process Group A final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA); @@ -421,7 +421,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { final Connection connection = connect(groupA, processor, port, processor.getRelationships()); //Create another snapshot - final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA); + final VersionedExternalFlow version2 = createFlowSnapshot(groupA); //Change Process Group A version to Version 1 groupA.updateFlow(version1, null, false, true, true); @@ -463,7 +463,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { final Connection connection = connect(group, processor, port, processor.getRelationships()); //Create a snapshot - final VersionedFlowSnapshot version1 = createFlowSnapshot(group); + final VersionedExternalFlow version1 = createFlowSnapshot(group); //Create Funnel under Process Group Funnel funnel = getFlowController().getFlowManager().createFunnel("funnel-id"); @@ -476,7 +476,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { group.removeOutputPort(port); //Create another snapshot - final VersionedFlowSnapshot version2 = createFlowSnapshot(group); + final VersionedExternalFlow version2 = createFlowSnapshot(group); //Change Process Group version to Version 1 group.updateFlow(version1, null, false, true, true); @@ -534,7 +534,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships()); //Create a snapshot - final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA); + final VersionedExternalFlow version1 = createFlowSnapshot(groupA); //Modify Connection 1 to point to Processor 2 connection1.setDestination(processor2); @@ -543,7 +543,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { moveOutputPort(outputPort, groupB); //Create another snapshot - final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA); + final VersionedExternalFlow version2 = createFlowSnapshot(groupA); //Delete connection 2 groupA.removeConnection(connection2); @@ -552,7 +552,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { groupB.removeInputPort(inputPort); //Create another snapshot - final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA); + final VersionedExternalFlow version3 = createFlowSnapshot(groupA); //Change Process Group version to Version 1 groupA.updateFlow(version1, null, false, true, true); @@ -612,7 +612,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships()); //Create a snapshot - final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA); + final VersionedExternalFlow version1 = createFlowSnapshot(groupA); //Modify Connection 1 to point to Processor 2 connection1.setDestination(processor2); @@ -621,7 +621,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { groupA.removeOutputPort(outputPort); //Create another snapshot - final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA); + final VersionedExternalFlow version2 = createFlowSnapshot(groupA); //Delete connection 2 groupA.removeConnection(connection2); @@ -630,7 +630,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest { moveInputPort(inputPort, groupA); //Create another snapshot - final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA); + final VersionedExternalFlow version3 = createFlowSnapshot(groupA); //Change Process Group version to Version 1 groupA.updateFlow(version1, null, false, true, true); @@ -686,10 +686,10 @@ public class ImportFlowIT extends FrameworkIntegrationTest { } - private Set getLocalModifications(final ProcessGroup processGroup, final VersionedFlowSnapshot versionedFlowSnapshot) { + private Set getLocalModifications(final ProcessGroup processGroup, final VersionedExternalFlow VersionedExternalFlow) { final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(getFlowController().getExtensionManager()); final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, getFlowController().getControllerServiceProvider(), getFlowController().getFlowRegistryClient(), true); - final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents(); + final VersionedProcessGroup registryGroup = VersionedExternalFlow.getFlowContents(); final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup); final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup); @@ -707,14 +707,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest { return differences; } - private VersionedFlowSnapshot createFlowSnapshot(final ProcessGroup group, final List controllerServices, + private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group, final List controllerServices, final List processors, final Set parameters) { - final VersionedFlowSnapshotMetadata snapshotMetadata = createSnapshotMetadata(); - - final Bucket bucket = createBucket(); - - final VersionedFlow flow = createVersionedFlow(); - createBundle(); final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(getExtensionManager()); @@ -804,7 +798,14 @@ public class ImportFlowIT extends FrameworkIntegrationTest { flowContents.setFunnels(versionedFunnels); flowContents.setConnections(versionedConnections); - final VersionedFlowSnapshot versionedFlowSnapshot = createVersionedFlowSnapshot(snapshotMetadata, bucket, flow, flowContents); + final VersionedExternalFlow externalFlow = new VersionedExternalFlow(); + + final VersionedExternalFlowMetadata metadata = new VersionedExternalFlowMetadata(); + externalFlow.setMetadata(metadata); + metadata.setBucketIdentifier("unit-test-bucket"); + metadata.setFlowIdentifier("unit-test-flow"); + metadata.setVersion(1); + metadata.setFlowName("unit-test-flow"); if (parameters != null) { final Set versionedParameters = new HashSet<>(); @@ -820,31 +821,22 @@ public class ImportFlowIT extends FrameworkIntegrationTest { final VersionedParameterContext versionedParameterContext = new VersionedParameterContext(); versionedParameterContext.setName("Unit Test Context"); versionedParameterContext.setParameters(versionedParameters); - versionedFlowSnapshot.setParameterContexts(Collections.singletonMap(versionedParameterContext.getName(), versionedParameterContext)); + externalFlow.setParameterContexts(Collections.singletonMap(versionedParameterContext.getName(), versionedParameterContext)); flowContents.setParameterContextName("Unit Test Context"); } - return versionedFlowSnapshot; + return externalFlow; } - private VersionedFlowSnapshot createFlowSnapshot(final List controllerServices, final List processors, final Set parameters) { + private VersionedExternalFlow createFlowSnapshot(final List controllerServices, final List processors, final Set parameters) { return createFlowSnapshot(null, controllerServices, processors, parameters); } - private VersionedFlowSnapshot createFlowSnapshot(final ProcessGroup group) { - return createFlowSnapshot(group, Collections.EMPTY_LIST, Collections.EMPTY_LIST, null); + private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group) { + return createFlowSnapshot(group, Collections.emptyList(), Collections.emptyList(), null); } - @NotNull - private VersionedFlowSnapshot createVersionedFlowSnapshot(VersionedFlowSnapshotMetadata snapshotMetadata, Bucket bucket, VersionedFlow flow, VersionedProcessGroup flowContents) { - final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot(); - versionedFlowSnapshot.setSnapshotMetadata(snapshotMetadata); - versionedFlowSnapshot.setBucket(bucket); - versionedFlowSnapshot.setFlow(flow); - versionedFlowSnapshot.setFlowContents(flowContents); - return versionedFlowSnapshot; - } @NotNull private VersionedProcessGroup createFlowContents() { @@ -882,14 +874,4 @@ public class ImportFlowIT extends FrameworkIntegrationTest { return bucket; } - @NotNull - private VersionedFlowSnapshotMetadata createSnapshotMetadata() { - final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); - snapshotMetadata.setAuthor("unit-test"); - snapshotMetadata.setBucketIdentifier("unit-test-bucket"); - snapshotMetadata.setFlowIdentifier("unit-test-flow"); - snapshotMetadata.setTimestamp(System.currentTimeMillis()); - snapshotMetadata.setVersion(1); - return snapshotMetadata; - } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java index a02246acd1..b5e52ae19e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java @@ -51,7 +51,7 @@ import org.apache.nifi.parameter.ParameterDescriptor; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableDescriptor; import org.apache.nifi.flow.ComponentType; -import org.apache.nifi.registry.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.flow.PortType; @@ -61,8 +61,8 @@ import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedFlowCoordinates; import org.apache.nifi.flow.VersionedFunnel; import org.apache.nifi.flow.VersionedLabel; -import org.apache.nifi.registry.flow.VersionedParameter; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameter; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedPort; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizeParameterReference.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizeParameterReference.java index 95d454054d..5bd8eed98f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizeParameterReference.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/authorization/AuthorizeParameterReference.java @@ -25,8 +25,8 @@ import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterDescriptor; import org.apache.nifi.parameter.ParameterParser; import org.apache.nifi.parameter.ParameterTokenList; -import org.apache.nifi.registry.flow.VersionedParameter; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameter; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index feb0fbe73b..1bbc4d9326 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -29,10 +29,10 @@ import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.parameter.ParameterContext; -import org.apache.nifi.registry.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.web.api.dto.AccessPolicyDTO; import org.apache.nifi.web.api.dto.AffectedComponentDTO; import org.apache.nifi.web.api.dto.BulletinBoardDTO; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 9c50abaf55..066b1310f2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -98,11 +98,14 @@ import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.flow.VersionedComponent; import org.apache.nifi.flow.VersionedConfigurableComponent; import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedFlowCoordinates; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.flow.VersionedPropertyDescriptor; @@ -129,10 +132,10 @@ import org.apache.nifi.prometheus.util.JvmMetricsRegistry; import org.apache.nifi.prometheus.util.NiFiMetricsRegistry; import org.apache.nifi.prometheus.util.PrometheusMetricsUtil; import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.registry.VersionedFlowConverter; import org.apache.nifi.registry.authorization.Permissions; import org.apache.nifi.registry.bucket.Bucket; import org.apache.nifi.registry.client.NiFiRegistryException; -import org.apache.nifi.registry.flow.ExternalControllerServiceReference; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.RestBasedFlowRegistry; @@ -141,7 +144,6 @@ import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.flow.VersionedFlowState; -import org.apache.nifi.registry.flow.VersionedParameterContext; import org.apache.nifi.registry.flow.diff.ComparableDataFlow; import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor; import org.apache.nifi.registry.flow.diff.DifferenceType; @@ -4919,7 +4921,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public void verifyCanUpdate(final String groupId, final VersionedFlowSnapshot proposedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty); + final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(proposedFlow); + group.verifyCanUpdate(externalFlow, verifyConnectionRemoval, verifyNotDirty); } @Override @@ -4936,7 +4939,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // verify that the process group can be updated to the given snapshot. We do not verify that connections can // be removed, because the flow may still be running, and it only matters that the connections can be removed once the components // have been stopped. - group.verifyCanUpdate(versionedFlowSnapshot, false, false); + final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(versionedFlowSnapshot); + group.verifyCanUpdate(externalFlow, false, false); } @Override @@ -5347,7 +5351,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public RevisionUpdate update() { // update the Process Group - final ProcessGroup updatedProcessGroup = processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, + final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(proposedFlowSnapshot); + processGroupDAO.updateProcessGroupFlow(groupId, externalFlow, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings, updateDescendantVersionedFlows); // update the revisions diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java index 97a23f8492..bcb0924a78 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java @@ -31,7 +31,7 @@ import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.FlowRegistryUtils; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.ResumeFlowException; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 4061a8c112..c04204b9d8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -108,7 +108,7 @@ import org.apache.nifi.registry.flow.FlowRegistryUtils; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowState; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest; import org.apache.nifi.registry.variable.VariableRegistryUpdateStep; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java index 7d5739aec4..47abf91b66 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java @@ -19,8 +19,8 @@ package org.apache.nifi.web.dao; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.VariableRegistryDTO; import org.apache.nifi.web.api.dto.VersionControlInformationDTO; @@ -136,7 +136,7 @@ public interface ProcessGroupDAO { * update the contents of that Process Group * @return the process group */ - ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed, + ProcessGroup updateProcessGroupFlow(String groupId, VersionedExternalFlow proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed, boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows); /** diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index 278262466e..45ee5c252d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -28,6 +28,8 @@ import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.flow.VersionedExternalFlow; +import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.groups.FlowFileConcurrency; import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.groups.ProcessGroup; @@ -36,8 +38,6 @@ import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; -import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.web.ResourceNotFoundException; @@ -424,7 +424,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } @Override - public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation, + public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedExternalFlow proposedSnapshot, final VersionControlInformationDTO versionControlInformation, final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) { final ProcessGroup group = locateProcessGroup(flowController, groupId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java index 616e0620c1..87d80c1a24 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java @@ -40,11 +40,11 @@ import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.history.History; import org.apache.nifi.history.HistoryQuery; import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.registry.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.registry.flow.RestBasedFlowRegistry; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; import org.apache.nifi.web.api.dto.DtoFactory; diff --git a/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java b/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java index 3cd7a77a5a..f23af814bf 100644 --- a/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java +++ b/nifi-nar-bundles/nifi-stateless-processor-bundle/nifi-stateless-processor/src/main/java/org/apache/nifi/processors/stateless/ExecuteStateless.java @@ -41,6 +41,7 @@ import org.apache.nifi.components.resource.ResourceType; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedLabel; import org.apache.nifi.flow.VersionedPort; import org.apache.nifi.flow.VersionedProcessGroup; @@ -60,6 +61,7 @@ import org.apache.nifi.processors.stateless.retrieval.CachingDataflowProvider; import org.apache.nifi.processors.stateless.retrieval.DataflowProvider; import org.apache.nifi.processors.stateless.retrieval.FileSystemDataflowProvider; import org.apache.nifi.processors.stateless.retrieval.RegistryDataflowProvider; +import org.apache.nifi.registry.VersionedFlowConverter; import org.apache.nifi.registry.bucket.Bucket; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; @@ -462,7 +464,7 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable { final StatelessEngineConfiguration engineConfiguration = createEngineConfiguration(context, dataflowIndex); final StatelessBootstrap bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, Thread.currentThread().getContextClassLoader()); - final DataflowDefinition dataflowDefinition = createDataflowDefinition(context, flowSnapshot); + final DataflowDefinition dataflowDefinition = createDataflowDefinition(context, flowSnapshot); final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition); dataflow.initialize(); @@ -746,7 +748,8 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable { } - private DataflowDefinition createDataflowDefinition(final ProcessContext context, final VersionedFlowSnapshot flowSnapshot) { + private DataflowDefinition createDataflowDefinition(final ProcessContext context, final VersionedFlowSnapshot flowSnapshot) { + final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(flowSnapshot); final ParameterValueProviderDefinition parameterValueProviderDefinition = new ParameterValueProviderDefinition(); parameterValueProviderDefinition.setType("org.apache.nifi.stateless.parameter.OverrideParameterValueProvider"); parameterValueProviderDefinition.setName("Parameter Override"); @@ -783,10 +786,10 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable { } }; - return new DataflowDefinition() { + return new DataflowDefinition() { @Override - public VersionedFlowSnapshot getFlowSnapshot() { - return flowSnapshot; + public VersionedExternalFlow getVersionedExternalFlow() { + return externalFlow; } @Override diff --git a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/VersionedFlowConverter.java b/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/VersionedFlowConverter.java new file mode 100644 index 0000000000..37718bce5b --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/VersionedFlowConverter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry; + +import org.apache.nifi.flow.VersionedExternalFlow; +import org.apache.nifi.flow.VersionedExternalFlowMetadata; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; + +public class VersionedFlowConverter { + public static VersionedExternalFlow createVersionedExternalFlow(final VersionedFlowSnapshot flowSnapshot) { + final VersionedExternalFlowMetadata externalFlowMetadata = new VersionedExternalFlowMetadata(); + final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata(); + if (snapshotMetadata != null) { + externalFlowMetadata.setAuthor(snapshotMetadata.getAuthor()); + externalFlowMetadata.setBucketIdentifier(snapshotMetadata.getBucketIdentifier()); + externalFlowMetadata.setComments(snapshotMetadata.getComments()); + externalFlowMetadata.setFlowIdentifier(snapshotMetadata.getFlowIdentifier()); + externalFlowMetadata.setTimestamp(snapshotMetadata.getTimestamp()); + externalFlowMetadata.setVersion(snapshotMetadata.getVersion()); + } + + final VersionedFlow versionedFlow = flowSnapshot.getFlow(); + if (versionedFlow == null) { + externalFlowMetadata.setFlowName(flowSnapshot.getFlowContents().getName()); + } else { + externalFlowMetadata.setFlowName(versionedFlow.getName()); + } + + final VersionedExternalFlow externalFlow = new VersionedExternalFlow(); + externalFlow.setFlowContents(flowSnapshot.getFlowContents()); + externalFlow.setExternalControllerServices(flowSnapshot.getExternalControllerServices()); + externalFlow.setParameterContexts(flowSnapshot.getParameterContexts()); + externalFlow.setMetadata(externalFlowMetadata); + + return externalFlow; + } +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java b/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java index e702f5599f..a9a48e506c 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java @@ -19,6 +19,8 @@ package org.apache.nifi.registry.flow; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import org.apache.nifi.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.registry.bucket.Bucket; @@ -117,7 +119,7 @@ public class VersionedFlowSnapshot { @ApiModelProperty(value = "The parameter contexts referenced by process groups in the flow contents. " + "The mapping is from the name of the context to the context instance, and it is expected that any " + "context in this map is referenced by at least one process group in this flow.") - public Map getParameterContexts() { + public Map getParameterContexts() { return parameterContexts; } diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ComparableDataFlow.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ComparableDataFlow.java index 5e7470cfa1..b47def5771 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ComparableDataFlow.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/ComparableDataFlow.java @@ -20,7 +20,7 @@ package org.apache.nifi.registry.flow.diff; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedReportingTask; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameterContext; import java.util.Set; diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardComparableDataFlow.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardComparableDataFlow.java index 68b112a10e..44ada2c000 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardComparableDataFlow.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardComparableDataFlow.java @@ -20,7 +20,7 @@ package org.apache.nifi.registry.flow.diff; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedReportingTask; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameterContext; import java.util.Collections; import java.util.HashSet; diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java index 266442511c..02751c6c41 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java @@ -30,8 +30,8 @@ import org.apache.nifi.flow.VersionedPropertyDescriptor; import org.apache.nifi.flow.VersionedRemoteGroupPort; import org.apache.nifi.flow.VersionedRemoteProcessGroup; import org.apache.nifi.flow.VersionedReportingTask; -import org.apache.nifi.registry.flow.VersionedParameter; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameter; +import org.apache.nifi.flow.VersionedParameterContext; import java.util.Collection; import java.util.Collections; diff --git a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java index f598370461..8bfcaabd94 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowContentSerializer.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.registry.serialization; -import org.apache.nifi.registry.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; diff --git a/nifi-registry/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java b/nifi-registry/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java index 18a5756a08..bc43b8f87e 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/UnsecuredNiFiRegistryClientIT.java @@ -60,12 +60,12 @@ import org.apache.nifi.registry.extension.repo.ExtensionRepoGroup; import org.apache.nifi.registry.extension.repo.ExtensionRepoVersion; import org.apache.nifi.registry.extension.repo.ExtensionRepoVersionSummary; import org.apache.nifi.registry.field.Fields; -import org.apache.nifi.registry.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; -import org.apache.nifi.registry.flow.VersionedParameter; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameter; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.flow.VersionedPropertyDescriptor; diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinition.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinition.java index 24326847ba..b8a66cd5c2 100644 --- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinition.java +++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinition.java @@ -17,6 +17,7 @@ package org.apache.nifi.stateless.flow; +import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.stateless.config.ParameterContextDefinition; import org.apache.nifi.stateless.config.ParameterValueProviderDefinition; import org.apache.nifi.stateless.config.ReportingTaskDefinition; @@ -24,8 +25,8 @@ import org.apache.nifi.stateless.config.ReportingTaskDefinition; import java.util.List; import java.util.Set; -public interface DataflowDefinition { - T getFlowSnapshot(); +public interface DataflowDefinition { + VersionedExternalFlow getVersionedExternalFlow(); String getFlowName(); diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinitionParser.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinitionParser.java index 821171f5b4..7f0b29b203 100644 --- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinitionParser.java +++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/DataflowDefinitionParser.java @@ -27,9 +27,9 @@ import java.util.List; import java.util.Map; public interface DataflowDefinitionParser { - DataflowDefinition parseFlowDefinition(File configurationFile, StatelessEngineConfiguration engineConfiguration, List parameterOverrides) + DataflowDefinition parseFlowDefinition(File configurationFile, StatelessEngineConfiguration engineConfiguration, List parameterOverrides) throws StatelessConfigurationException, IOException; - DataflowDefinition parseFlowDefinition(Map configurationProperties, StatelessEngineConfiguration engineConfiguration, List parameterOverrides) + DataflowDefinition parseFlowDefinition(Map configurationProperties, StatelessEngineConfiguration engineConfiguration, List parameterOverrides) throws StatelessConfigurationException, IOException; } diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflowFactory.java index df61ababe3..7dd538ca54 100644 --- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflowFactory.java +++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflowFactory.java @@ -22,7 +22,7 @@ import org.apache.nifi.stateless.engine.StatelessEngineConfiguration; import java.io.IOException; -public interface StatelessDataflowFactory { - StatelessDataflow createDataflow(StatelessEngineConfiguration statelessEngineConfiguration, DataflowDefinition dataflowDefinition, ClassLoader extensionClassLoader) +public interface StatelessDataflowFactory { + StatelessDataflow createDataflow(StatelessEngineConfiguration statelessEngineConfiguration, DataflowDefinition dataflowDefinition, ClassLoader extensionClassLoader) throws IOException, StatelessConfigurationException; } diff --git a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java index a4ec889c17..3a9c1819b9 100644 --- a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java +++ b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java @@ -89,7 +89,7 @@ public class RunStatelessFlow { final long initializeStart = System.currentTimeMillis(); final StatelessBootstrap bootstrap = StatelessBootstrap.bootstrap(engineConfiguration); - final DataflowDefinition dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile, parameterOverrides); + final DataflowDefinition dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile, parameterOverrides); final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition); dataflow.initialize(); diff --git a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java index 80754ad2e8..58b1d4eaa7 100644 --- a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java +++ b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java @@ -19,6 +19,7 @@ package org.apache.nifi.stateless.bootstrap; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.nar.NarClassLoader; import org.apache.nifi.nar.NarClassLoaders; import org.apache.nifi.nar.NarUnpacker; import org.apache.nifi.nar.SystemBundle; @@ -67,24 +68,24 @@ public class StatelessBootstrap { this.engineConfiguration = engineConfiguration; } - public StatelessDataflow createDataflow(final DataflowDefinition dataflowDefinition) + public StatelessDataflow createDataflow(final DataflowDefinition dataflowDefinition) throws IOException, StatelessConfigurationException { - final StatelessDataflowFactory dataflowFactory = getSingleInstance(engineClassLoader, StatelessDataflowFactory.class); + final StatelessDataflowFactory dataflowFactory = getSingleInstance(engineClassLoader, StatelessDataflowFactory.class); final StatelessDataflow dataflow = dataflowFactory.createDataflow(engineConfiguration, dataflowDefinition, extensionClassLoader); return dataflow; } - public DataflowDefinition parseDataflowDefinition(final File flowDefinitionFile, final List parameterOverrides) + public DataflowDefinition parseDataflowDefinition(final File flowDefinitionFile, final List parameterOverrides) throws StatelessConfigurationException, IOException { final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(engineClassLoader, DataflowDefinitionParser.class); - final DataflowDefinition dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionFile, engineConfiguration, parameterOverrides); + final DataflowDefinition dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionFile, engineConfiguration, parameterOverrides); return dataflowDefinition; } - public DataflowDefinition parseDataflowDefinition(final Map flowDefinitionProperties, final List parameterOverrides) + public DataflowDefinition parseDataflowDefinition(final Map flowDefinitionProperties, final List parameterOverrides) throws StatelessConfigurationException, IOException { final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(engineClassLoader, DataflowDefinitionParser.class); - final DataflowDefinition dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionProperties, engineConfiguration, parameterOverrides); + final DataflowDefinition dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionProperties, engineConfiguration, parameterOverrides); return dataflowDefinition; } @@ -119,47 +120,18 @@ public class StatelessBootstrap { final long unpackMillis = System.currentTimeMillis() - unpackStart; logger.info("Unpacked NAR files in {} millis", unpackMillis); + final BlockListClassLoader statelessClassLoader = createExtensionRootClassLoader(narDirectory, rootClassLoader); + final File statelessNarWorkingDir = locateStatelessNarWorkingDirectory(extensionsWorkingDir); - final File statelessNarInf = new File(statelessNarWorkingDir, "NAR-INF"); - final File statelessNarDependencies = new File(statelessNarInf, "bundled-dependencies"); - final File[] statelessNarContents = statelessNarDependencies.listFiles(); - if (statelessNarContents == null || statelessNarContents.length == 0) { - throw new IOException("Could not access contents of Stateless NAR dependencies at " + statelessNarDependencies); + final NarClassLoader engineClassLoader; + try { + engineClassLoader = new NarClassLoader(statelessNarWorkingDir, statelessClassLoader); + } catch (final ClassNotFoundException e) { + throw new IOException("Could not create NarClassLoader for Stateless NAR located at " + statelessNarWorkingDir.getAbsolutePath(), e); } - final List urls = new ArrayList<>(); - final List filenames = new ArrayList<>(); - for (final File dependency : statelessNarContents) { - final URL url = dependency.toURI().toURL(); - urls.add(url); - filenames.add(dependency.getName()); - } - - logger.info("Creating Stateless Bootstrap with the following files in the classpath: {}", filenames); - - final URL[] urlArray = urls.toArray(new URL[0]); - final BlockListClassLoader extensionClassLoader = createExtensionRootClassLoader(narDirectory, rootClassLoader); - - final Set classesBlockedExtensions = extensionClassLoader.getClassesBlocked(); - - // For the engine ClassLoader, we also want to block everything that we block for extensions except for the classes - // that the engine needs specifically (i.e., the classes in the stateless engine nar). We do this because there are some - // classes that may need to be shared between the bootstrap and the caller. For example, VersionedFlowSnapshot. - final URLClassLoader engineUrlClassLoader = new URLClassLoader(urlArray, rootClassLoader); - final Set engineSpecificClassNames = new HashSet<>(); - final Set engineSpecificFiles = new HashSet<>(); - findClassNamesInJars(urls, engineSpecificClassNames, engineSpecificFiles); - - final Set classesBlockedEngine = new HashSet<>(classesBlockedExtensions); - classesBlockedEngine.removeAll(engineSpecificClassNames); - - logger.debug("Blocking the following classes from being loaded from parent {} for Engine by Stateless ClassLoaders: {}", engineUrlClassLoader, classesBlockedEngine); - logger.debug("Blocking the following files from being loaded from parent {} for Engine by Stateless ClassLoaders: {}", engineUrlClassLoader, engineSpecificFiles); - - final BlockListClassLoader engineClassLoader = new BlockListClassLoader(engineUrlClassLoader, classesBlockedEngine); - Thread.currentThread().setContextClassLoader(engineClassLoader); - return new StatelessBootstrap(engineClassLoader, extensionClassLoader, engineConfiguration); + return new StatelessBootstrap(engineClassLoader, statelessClassLoader, engineConfiguration); } /** diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java index 6ae6760bcd..f0b1d2912e 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingContext.java @@ -30,10 +30,10 @@ import org.apache.nifi.stateless.engine.StatelessEngine; import java.util.Map; public class StatelessReportingContext extends AbstractReportingContext implements ReportingContext { - private final StatelessEngine statelessEngine; + private final StatelessEngine statelessEngine; private final FlowManager flowManager; - public StatelessReportingContext(final StatelessEngine statelessEngine, final FlowManager flowManager, + public StatelessReportingContext(final StatelessEngine statelessEngine, final FlowManager flowManager, final Map properties, final ReportingTask reportingTask, final VariableRegistry variableRegistry, final ParameterLookup parameterLookup) { super(reportingTask, statelessEngine.getBulletinRepository(), properties, statelessEngine.getControllerServiceProvider(), parameterLookup, variableRegistry); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java index 86e4c21186..5a24f371f7 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/reporting/StatelessReportingTaskNode.java @@ -35,9 +35,9 @@ import org.apache.nifi.stateless.engine.StatelessEngine; public class StatelessReportingTaskNode extends AbstractReportingTaskNode implements ReportingTaskNode { private final FlowManager flowManager; - private final StatelessEngine statelessEngine; + private final StatelessEngine statelessEngine; - public StatelessReportingTaskNode(final LoggableComponent reportingTask, final String id, final StatelessEngine statelessEngine, + public StatelessReportingTaskNode(final LoggableComponent reportingTask, final String id, final StatelessEngine statelessEngine, final FlowManager flowManager, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory, final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger) { diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java index c1292491f7..5b32d2882d 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java @@ -104,7 +104,7 @@ public class StatelessProcessScheduler implements ProcessScheduler { } } - public void initialize(final ProcessContextFactory processContextFactory, final DataflowDefinition dataflowDefinition) { + public void initialize(final ProcessContextFactory processContextFactory, final DataflowDefinition dataflowDefinition) { this.processContextFactory = processContextFactory; final String threadNameSuffix = dataflowDefinition.getFlowName() == null ? "" : " for dataflow " + dataflowDefinition.getFlowName(); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java index a058f59fe7..f72baf3337 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/registry/flow/InMemoryFlowRegistry.java @@ -18,6 +18,10 @@ package org.apache.nifi.registry.flow; import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.VersionedExternalFlow; +import org.apache.nifi.flow.VersionedExternalFlowMetadata; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.registry.bucket.Bucket; import org.apache.nifi.registry.client.NiFiRegistryException; @@ -40,7 +44,7 @@ public class InMemoryFlowRegistry implements FlowRegistry { private volatile String name; private volatile String url; - private final Map> flowSnapshots = new ConcurrentHashMap<>(); + private final Map> flowSnapshots = new ConcurrentHashMap<>(); @Override public String getIdentifier() { @@ -109,9 +113,9 @@ public class InMemoryFlowRegistry implements FlowRegistry { @Override public VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, - final Map externalControllerServices, - final Map parameterContexts, final String comments, - final int expectedVersion, final NiFiUser user) { + final Map externalControllerServices, + final Map parameterContexts, final String comments, + final int expectedVersion, final NiFiUser user) { throw new UnsupportedOperationException(USER_SPECIFIC_ACTIONS_NOT_SUPPORTED); } @@ -133,21 +137,44 @@ public class InMemoryFlowRegistry implements FlowRegistry { @Override public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows) throws NiFiRegistryException { final FlowCoordinates flowCoordinates = new FlowCoordinates(bucketId, flowId); - final List snapshots = flowSnapshots.get(flowCoordinates); + final List snapshots = flowSnapshots.get(flowCoordinates); - final VersionedFlowSnapshot versionedFlowSnapshot = snapshots.stream() - .filter(snapshot -> snapshot.getSnapshotMetadata().getVersion() == version) + final VersionedExternalFlow versionedExternalFlow = snapshots.stream() + .filter(snapshot -> snapshot.getMetadata().getVersion() == version) .findAny() .orElseThrow(() -> new NiFiRegistryException("Could not find flow: bucketId=" + bucketId + ", flowId=" + flowId + ", version=" + version)); + final VersionedFlowSnapshot versionedFlowSnapshot = convertToVersionedFlowSnapshot(versionedExternalFlow); return versionedFlowSnapshot; } + private VersionedFlowSnapshot convertToVersionedFlowSnapshot(final VersionedExternalFlow externalFlow) { + final VersionedExternalFlowMetadata externalFlowMetadata = externalFlow.getMetadata(); + + final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); + snapshotMetadata.setBucketIdentifier(externalFlowMetadata.getBucketIdentifier()); + snapshotMetadata.setVersion(externalFlowMetadata.getVersion()); + snapshotMetadata.setFlowIdentifier(externalFlowMetadata.getFlowIdentifier()); + + final VersionedFlow versionedFlow = new VersionedFlow(); + versionedFlow.setName(externalFlowMetadata.getFlowName()); + versionedFlow.setIdentifier(externalFlowMetadata.getFlowIdentifier()); + versionedFlow.setBucketIdentifier(externalFlowMetadata.getBucketIdentifier()); + + final VersionedFlowSnapshot flowSnapshot = new VersionedFlowSnapshot(); + flowSnapshot.setExternalControllerServices(externalFlow.getExternalControllerServices()); + flowSnapshot.setFlowContents(externalFlow.getFlowContents()); + flowSnapshot.setParameterContexts(externalFlow.getParameterContexts()); + flowSnapshot.setSnapshotMetadata(snapshotMetadata); + flowSnapshot.setFlow(versionedFlow); + + return flowSnapshot; + } @Override public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) { final FlowCoordinates flowCoordinates = new FlowCoordinates(bucketId, flowId); - final List snapshots = flowSnapshots.get(flowCoordinates); + final List snapshots = flowSnapshots.get(flowCoordinates); final VersionedFlow versionedFlow = new VersionedFlow(); versionedFlow.setBucketIdentifier(bucketId); @@ -160,8 +187,8 @@ public class InMemoryFlowRegistry implements FlowRegistry { } - public synchronized void addFlowSnapshot(final VersionedFlowSnapshot flowSnapshot) { - final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata(); + public synchronized void addFlowSnapshot(final VersionedExternalFlow versionedExternalFlow) { + final VersionedExternalFlowMetadata metadata = versionedExternalFlow.getMetadata(); final String bucketId; final String flowId; final int version; @@ -177,16 +204,16 @@ public class InMemoryFlowRegistry implements FlowRegistry { final FlowCoordinates coordinates = new FlowCoordinates(bucketId, flowId); - final List snapshots = flowSnapshots.computeIfAbsent(coordinates, key -> Collections.synchronizedList(new ArrayList<>())); - final Optional optionalSnapshot = snapshots.stream() - .filter(snapshot -> snapshot.getSnapshotMetadata().getVersion() == version) + final List snapshots = flowSnapshots.computeIfAbsent(coordinates, key -> Collections.synchronizedList(new ArrayList<>())); + final Optional optionalSnapshot = snapshots.stream() + .filter(snapshot -> snapshot.getMetadata().getVersion() == version) .findAny(); if (optionalSnapshot.isPresent()) { throw new IllegalStateException("Versioned Flow Snapshot already exists for bucketId=" + bucketId + ", flowId=" + flowId + ", version=" + version); } - snapshots.add(flowSnapshot); + snapshots.add(versionedExternalFlow); } private static class FlowCoordinates { diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java index 92f23fa203..9221bdbf45 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java @@ -24,7 +24,9 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import okhttp3.ResponseBody; +import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.registry.VersionedFlowConverter; import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; @@ -98,13 +100,13 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar private static final String TRANSACTION_THRESHOLD_TIME = "nifi.stateless.transaction.thresholds.time"; - public DataflowDefinition parseFlowDefinition(final File propertiesFile, final StatelessEngineConfiguration engineConfig, final List parameterOverrides) + public DataflowDefinition parseFlowDefinition(final File propertiesFile, final StatelessEngineConfiguration engineConfig, final List parameterOverrides) throws IOException, StatelessConfigurationException { final Map properties = readPropertyValues(propertiesFile); return parseFlowDefinition(properties, engineConfig, parameterOverrides); } - public DataflowDefinition parseFlowDefinition(final Map properties, final StatelessEngineConfiguration engineConfig, + public DataflowDefinition parseFlowDefinition(final Map properties, final StatelessEngineConfiguration engineConfig, final List parameterOverrides) throws IOException, StatelessConfigurationException { // A common problem is users accidentally including whitespace at the beginning or end of property values. @@ -115,17 +117,16 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar final Set failurePortNames = getFailurePortNames(properties); final VersionedFlowSnapshot flowSnapshot = fetchVersionedFlowSnapshot(properties, engineConfig.getSslContext()); + final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(flowSnapshot); final List parameterContextDefinitions = getParameterContexts(properties); final List reportingTaskDefinitions = getReportingTasks(properties); final List parameterValueProviderDefinitions = getParameterValueProviders(properties, parameterOverrides); final TransactionThresholds transactionThresholds = getTransactionThresholds(properties); - final String rootGroupName = flowSnapshot.getFlowContents().getName(); - final String flowName = properties.getOrDefault(FLOW_NAME, rootGroupName); + final String flowName = properties.getOrDefault(FLOW_NAME, externalFlow.getMetadata().getFlowName()); return new StandardDataflowDefinition.Builder() - .flowSnapshot(flowSnapshot) - .flowName(flowName) + .versionedExternalFlow(externalFlow) .failurePortNames(failurePortNames) .parameterContexts(parameterContextDefinitions) .reportingTasks(reportingTaskDefinitions) @@ -480,6 +481,7 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar return envValue == null ? "" : envValue; } + private VersionedFlowSnapshot fetchVersionedFlowSnapshot(final Map properties, final SslContextDefinition sslContextDefinition) throws IOException, StatelessConfigurationException { diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java index f61816cf38..0bbf66bdb3 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/ComponentBuilder.java @@ -56,7 +56,6 @@ import org.apache.nifi.processor.StandardProcessorInitializationContext; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.VariableRegistry; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; @@ -74,14 +73,14 @@ import java.util.Set; public class ComponentBuilder { private static final Logger logger = LoggerFactory.getLogger(ComponentBuilder.class); - private StatelessEngine statelessEngine; + private StatelessEngine statelessEngine; private FlowManager flowManager; private String identifier; private String type; private BundleCoordinate bundleCoordinate; private Set additionalClassPathUrls; - public ComponentBuilder statelessEngine(final StatelessEngine statelessEngine) { + public ComponentBuilder statelessEngine(final StatelessEngine statelessEngine) { this.statelessEngine = statelessEngine; return this; } diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java index d683b633dd..d66d269c61 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java @@ -43,6 +43,7 @@ import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.encrypt.PropertyEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.extensions.ExtensionRepository; +import org.apache.nifi.flow.VersionedExternalFlowMetadata; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.LogRepositoryFactory; @@ -57,8 +58,6 @@ import org.apache.nifi.processor.StandardValidationContext; import org.apache.nifi.provenance.ProvenanceRepository; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; -import org.apache.nifi.registry.flow.VersionedFlow; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -98,7 +97,7 @@ import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; -public class StandardStatelessEngine implements StatelessEngine { +public class StandardStatelessEngine implements StatelessEngine { private static final Logger logger = LoggerFactory.getLogger(StandardStatelessEngine.class); private static final int CONCURRENT_EXTENSION_DOWNLOADS = 8; public static final Duration DEFAULT_STATUS_TASK_PERIOD = Duration.of(1, ChronoUnit.MINUTES); @@ -158,13 +157,14 @@ public class StandardStatelessEngine implements StatelessEngine dataflowDefinition) { + public StatelessDataflow createFlow(final DataflowDefinition dataflowDefinition) { if (!this.initialized) { throw new IllegalStateException("Cannot create Flow without first initializing Stateless Engine"); } - final VersionedFlow versionedFlow = dataflowDefinition.getFlowSnapshot().getFlow(); - logger.info("Building Dataflow {}", versionedFlow == null ? "" : versionedFlow.getName()); + final VersionedExternalFlowMetadata metadata = dataflowDefinition.getVersionedExternalFlow().getMetadata(); + final String flowName = metadata == null ? "" : metadata.getFlowName(); + logger.info("Building Dataflow {}", flowName); loadNecessaryExtensions(dataflowDefinition); @@ -178,7 +178,7 @@ public class StandardStatelessEngine implements StatelessEngine dataflowDefinition) { + private ParameterValueProvider createParameterValueProvider(final DataflowDefinition dataflowDefinition) { // Create a Provider for each definition final List providers = new ArrayList<>(); for (final ParameterValueProviderDefinition definition : dataflowDefinition.getParameterValueProviderDefinitions()) { @@ -300,8 +300,8 @@ public class StandardStatelessEngine implements StatelessEngine dataflowDefinition) { - final VersionedProcessGroup group = dataflowDefinition.getFlowSnapshot().getFlowContents(); + private void loadNecessaryExtensions(final DataflowDefinition dataflowDefinition) { + final VersionedProcessGroup group = dataflowDefinition.getVersionedExternalFlow().getFlowContents(); final Set requiredBundles = gatherRequiredBundles(group); for (final ReportingTaskDefinition reportingTaskDefinition : dataflowDefinition.getReportingTaskDefinitions()) { @@ -380,7 +380,7 @@ public class StandardStatelessEngine implements StatelessEngine createReportingTasks(final DataflowDefinition dataflowDefinition) { + private List createReportingTasks(final DataflowDefinition dataflowDefinition) { final List reportingTaskNodes = new ArrayList<>(); for (final ReportingTaskDefinition taskDefinition : dataflowDefinition.getReportingTaskDefinitions()) { final ReportingTaskNode taskNode = createReportingTask(taskDefinition); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java index d6533d732c..941aca28ad 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessEngine.java @@ -37,11 +37,11 @@ import org.apache.nifi.stateless.flow.StatelessDataflow; import java.time.Duration; -public interface StatelessEngine { +public interface StatelessEngine { void initialize(StatelessEngineInitializationContext initializationContext); - StatelessDataflow createFlow(DataflowDefinition dataflowDefinition); + StatelessDataflow createFlow(DataflowDefinition dataflowDefinition); ExtensionManager getExtensionManager(); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java index 6cd43b9910..83727b7e2f 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java @@ -65,7 +65,6 @@ import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.parameter.ParameterContextManager; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.StandardProcessContext; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.remote.StandardRemoteProcessGroup; import org.apache.nifi.reporting.BulletinRepository; @@ -91,12 +90,12 @@ import static java.util.Objects.requireNonNull; public class StatelessFlowManager extends AbstractFlowManager implements FlowManager { private static final Logger logger = LoggerFactory.getLogger(StatelessFlowManager.class); - private final StatelessEngine statelessEngine; + private final StatelessEngine statelessEngine; private final SSLContext sslContext; private final BulletinRepository bulletinRepository; public StatelessFlowManager(final FlowFileEventRepository flowFileEventRepository, final ParameterContextManager parameterContextManager, - final StatelessEngine statelessEngine, final BooleanSupplier flowInitializedCheck, + final StatelessEngine statelessEngine, final BooleanSupplier flowInitializedCheck, final SSLContext sslContext, final BulletinRepository bulletinRepository) { super(flowFileEventRepository, parameterContextManager, statelessEngine.getFlowRegistryClient(), flowInitializedCheck); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java index 48a0dfdad2..831bf71560 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessReloadComponent.java @@ -29,7 +29,6 @@ import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.TerminationAwareLogger; import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; import org.apache.nifi.controller.exception.ProcessorInstantiationException; -import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.service.ControllerServiceInvocationHandler; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.StandardConfigurationContext; @@ -40,7 +39,6 @@ import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessContext; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.util.ReflectionUtils; import org.slf4j.Logger; @@ -52,9 +50,9 @@ import java.util.Set; public class StatelessReloadComponent implements ReloadComponent { private static final Logger logger = LoggerFactory.getLogger(StatelessReloadComponent.class); - private final StatelessEngine statelessEngine; + private final StatelessEngine statelessEngine; - public StatelessReloadComponent(final StatelessEngine statelessEngine) { + public StatelessReloadComponent(final StatelessEngine statelessEngine) { this.statelessEngine = statelessEngine; } @@ -169,7 +167,7 @@ public class StatelessReloadComponent implements ReloadComponent { } @Override - public void reload(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set additionalUrls) throws ReportingTaskInstantiationException { + public void reload(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set additionalUrls) { if (existingNode == null) { throw new IllegalStateException("Existing ReportingTaskNode cannot be null"); } diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardDataflowDefinition.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardDataflowDefinition.java index e1bb1e622f..25e1b0a21a 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardDataflowDefinition.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardDataflowDefinition.java @@ -19,10 +19,10 @@ package org.apache.nifi.stateless.flow; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedPort; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.stateless.config.ParameterContextDefinition; import org.apache.nifi.stateless.config.ParameterValueProviderDefinition; import org.apache.nifi.stateless.config.ReportingTaskDefinition; @@ -35,33 +35,31 @@ import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; -public class StandardDataflowDefinition implements DataflowDefinition { - private final VersionedFlowSnapshot flowSnapshot; +public class StandardDataflowDefinition implements DataflowDefinition { + private final VersionedExternalFlow versionedExternalFlow; private final Set failurePortNames; private final List parameterContexts; private final List reportingTaskDefinitions; private final List parameterValueProviderDefinitions; private final TransactionThresholds transactionThresholds; - private final String flowName; private StandardDataflowDefinition(final Builder builder) { - flowSnapshot = requireNonNull(builder.flowSnapshot, "Flow Snapshot must be provided"); + versionedExternalFlow = requireNonNull(builder.versionedExternalFlow, "Flow Snapshot must be provided"); failurePortNames = builder.failurePortNames == null ? Collections.emptySet() : builder.failurePortNames; parameterContexts = builder.parameterContexts == null ? Collections.emptyList() : builder.parameterContexts; reportingTaskDefinitions = builder.reportingTaskDefinitions == null ? Collections.emptyList() : builder.reportingTaskDefinitions; transactionThresholds = builder.transactionThresholds == null ? TransactionThresholds.SINGLE_FLOWFILE : builder.transactionThresholds; parameterValueProviderDefinitions = builder.parameterValueProviderDefinitions == null ? Collections.emptyList() : builder.parameterValueProviderDefinitions; - flowName = builder.flowName; } @Override - public VersionedFlowSnapshot getFlowSnapshot() { - return flowSnapshot; + public VersionedExternalFlow getVersionedExternalFlow() { + return versionedExternalFlow; } @Override public String getFlowName() { - return flowName; + return versionedExternalFlow.getMetadata().getFlowName(); } @Override @@ -71,14 +69,14 @@ public class StandardDataflowDefinition implements DataflowDefinition getInputPortNames() { - return flowSnapshot.getFlowContents().getInputPorts().stream() + return versionedExternalFlow.getFlowContents().getInputPorts().stream() .map(VersionedPort::getName) .collect(Collectors.toSet()); } @Override public Set getOutputPortNames() { - return flowSnapshot.getFlowContents().getOutputPorts().stream() + return versionedExternalFlow.getFlowContents().getOutputPorts().stream() .map(VersionedPort::getName) .collect(Collectors.toSet()); } @@ -105,7 +103,7 @@ public class StandardDataflowDefinition implements DataflowDefinition getReferencedBundles() { final Set referenced = new HashSet<>(); - final VersionedProcessGroup rootGroup = flowSnapshot.getFlowContents(); + final VersionedProcessGroup rootGroup = versionedExternalFlow.getFlowContents(); discoverReferencedBundles(rootGroup, referenced); return referenced; } @@ -125,21 +123,15 @@ public class StandardDataflowDefinition implements DataflowDefinition failurePortNames; private List parameterContexts; private List reportingTaskDefinitions; private List parameterValueProviderDefinitions; private TransactionThresholds transactionThresholds; - private String flowName; - public Builder flowSnapshot(final VersionedFlowSnapshot flowSnapshot) { - this.flowSnapshot = flowSnapshot; - return this; - } - - public Builder flowName(final String flowName) { - this.flowName = flowName; + public Builder versionedExternalFlow(final VersionedExternalFlow versionedExternalFlow) { + this.versionedExternalFlow = versionedExternalFlow; return this; } diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java index 459aacbf53..d95c9fe5ad 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java @@ -52,7 +52,6 @@ import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.InMemoryFlowRegistry; import org.apache.nifi.registry.flow.StandardFlowRegistryClient; -import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery; @@ -88,17 +87,15 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -public class StandardStatelessDataflowFactory implements StatelessDataflowFactory { +public class StandardStatelessDataflowFactory implements StatelessDataflowFactory { private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class); - @Override - public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition dataflowDefinition, + + public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition dataflowDefinition, final ClassLoader extensionRootClassLoader) throws IOException, StatelessConfigurationException { final long start = System.currentTimeMillis(); - final VersionedFlowSnapshot flowSnapshot = dataflowDefinition.getFlowSnapshot(); - ProvenanceRepository provenanceRepo = null; ContentRepository contentRepo = null; StatelessProcessScheduler processScheduler = null; @@ -114,7 +111,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor } final InMemoryFlowRegistry flowRegistry = new InMemoryFlowRegistry(); - flowRegistry.addFlowSnapshot(flowSnapshot); + flowRegistry.addFlowSnapshot(dataflowDefinition.getVersionedExternalFlow()); final FlowRegistryClient flowRegistryClient = new StandardFlowRegistryClient(); flowRegistryClient.addFlowRegistry(flowRegistry); @@ -184,7 +181,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath()); } - final StatelessEngine statelessEngine = new StandardStatelessEngine.Builder() + final StatelessEngine statelessEngine = new StandardStatelessEngine.Builder() .bulletinRepository(bulletinRepository) .encryptor(lazyInitializedEncryptor) .extensionManager(extensionManager) diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java index ff03473da6..c335ae7a3a 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java @@ -107,7 +107,7 @@ public class StandardStatelessFlow implements StatelessDataflow { private final ProcessContextFactory processContextFactory; private final RepositoryContextFactory repositoryContextFactory; private final List internalFlowFileQueues; - private final DataflowDefinition dataflowDefinition; + private final DataflowDefinition dataflowDefinition; private final StatelessStateManagerProvider stateManagerProvider; private final ObjectMapper objectMapper = new ObjectMapper(); private final ProcessScheduler processScheduler; @@ -122,7 +122,7 @@ public class StandardStatelessFlow implements StatelessDataflow { private volatile Boolean stateful = null; public StandardStatelessFlow(final ProcessGroup rootGroup, final List reportingTasks, final ControllerServiceProvider controllerServiceProvider, - final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory, final DataflowDefinition dataflowDefinition, + final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory, final DataflowDefinition dataflowDefinition, final StatelessStateManagerProvider stateManagerProvider, final ProcessScheduler processScheduler, final BulletinRepository bulletinRepository) { this.rootGroup = rootGroup; this.allConnections = rootGroup.findAllConnections(); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java index f178b5f25b..2ba428a647 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java @@ -45,7 +45,7 @@ public class TestPropertiesFileFlowDefinitionParser { final List parameterOverrides = new ArrayList<>(); final StatelessEngineConfiguration engineConfig = createStatelessEngineConfiguration(); - final DataflowDefinition dataflowDefinition = parser.parseFlowDefinition(new File("src/test/resources/flow-configuration.properties"), engineConfig, parameterOverrides); + final DataflowDefinition dataflowDefinition = parser.parseFlowDefinition(new File("src/test/resources/flow-configuration.properties"), engineConfig, parameterOverrides); assertEquals(new HashSet<>(Arrays.asList("foo", "bar", "baz")), dataflowDefinition.getFailurePortNames()); final List contextDefinitions = dataflowDefinition.getParameterContexts(); diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java index 697dd05afa..f0df86133e 100644 --- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java @@ -19,7 +19,9 @@ package org.apache.nifi.stateless; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.flow.Bundle; +import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedPort; +import org.apache.nifi.registry.VersionedFlowConverter; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.stateless.bootstrap.StatelessBootstrap; import org.apache.nifi.stateless.config.ExtensionClientDefinition; @@ -164,13 +166,22 @@ public class StatelessSystemIT { } protected StatelessDataflow loadDataflow(final VersionedFlowSnapshot versionedFlowSnapshot, final List parameterContexts, + final List parameterValueProviderDefinitions, final Set failurePortNames, + final TransactionThresholds transactionThresholds) + throws IOException, StatelessConfigurationException { + + final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(versionedFlowSnapshot); + return loadDataflow(externalFlow, parameterContexts, parameterValueProviderDefinitions, failurePortNames, transactionThresholds); + } + + protected StatelessDataflow loadDataflow(final VersionedExternalFlow versionedExternalFlow, final List parameterContexts, final List parameterValueProviderDefinitions, final Set failurePortNames, final TransactionThresholds transactionThresholds) throws IOException, StatelessConfigurationException { - final DataflowDefinition dataflowDefinition = new DataflowDefinition() { + final DataflowDefinition dataflowDefinition = new DataflowDefinition() { @Override - public VersionedFlowSnapshot getFlowSnapshot() { - return versionedFlowSnapshot; + public VersionedExternalFlow getVersionedExternalFlow() { + return versionedExternalFlow; } @Override @@ -185,14 +196,14 @@ public class StatelessSystemIT { @Override public Set getInputPortNames() { - return versionedFlowSnapshot.getFlowContents().getInputPorts().stream() + return versionedExternalFlow.getFlowContents().getInputPorts().stream() .map(VersionedPort::getName) .collect(Collectors.toSet()); } @Override public Set getOutputPortNames() { - return versionedFlowSnapshot.getFlowContents().getOutputPorts().stream() + return versionedExternalFlow.getFlowContents().getOutputPorts().stream() .map(VersionedPort::getName) .collect(Collectors.toSet()); } diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java index 597d301008..07381daa95 100644 --- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/VersionedFlowBuilder.java @@ -29,7 +29,7 @@ import org.apache.nifi.flow.VersionedComponent; import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedPort; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/parameters/ParameterContextIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/parameters/ParameterContextIT.java index 3570fcbe1b..5eeb6e7891 100644 --- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/parameters/ParameterContextIT.java +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/parameters/ParameterContextIT.java @@ -20,8 +20,8 @@ package org.apache.nifi.stateless.parameters; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.Relationship; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; -import org.apache.nifi.registry.flow.VersionedParameter; -import org.apache.nifi.registry.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedParameter; +import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedPort; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor;