NIFI-9754: Introduced VersionedExternalFlow

- Updated stateless and StandardProcessGroup, etc. to make use of VersionedExternalFlow
- Updated StatelessDataflowDefinition to use ExternalVersionedFlow instead of generic type
- Updated Stateless Bootstrap to avoid loading stateless engine libs from root class path but instead use a NarClassLoader to load the statelss nar

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5832.
This commit is contained in:
Mark Payne 2022-02-07 12:40:31 -05:00 committed by Joe Gresock
parent 7438bc9244
commit 8959226b50
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
60 changed files with 479 additions and 288 deletions

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow;
package org.apache.nifi.flow;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.flow;
import java.util.Map;
public class VersionedExternalFlow {
private VersionedProcessGroup flowContents;
private Map<String, ExternalControllerServiceReference> externalControllerServices;
private Map<String, VersionedParameterContext> parameterContexts;
private VersionedExternalFlowMetadata metadata;
public VersionedProcessGroup getFlowContents() {
return flowContents;
}
public void setFlowContents(final VersionedProcessGroup flowContents) {
this.flowContents = flowContents;
}
public Map<String, ExternalControllerServiceReference> getExternalControllerServices() {
return externalControllerServices;
}
public void setExternalControllerServices(final Map<String, ExternalControllerServiceReference> externalControllerServices) {
this.externalControllerServices = externalControllerServices;
}
public Map<String, VersionedParameterContext> getParameterContexts() {
return parameterContexts;
}
public void setParameterContexts(final Map<String, VersionedParameterContext> parameterContexts) {
this.parameterContexts = parameterContexts;
}
public VersionedExternalFlowMetadata getMetadata() {
return metadata;
}
public void setMetadata(final VersionedExternalFlowMetadata metadata) {
this.metadata = metadata;
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.flow;
public class VersionedExternalFlowMetadata {
private String bucketId;
private String flowId;
private int version;
private String flowName;
private String author;
private String comments;
private long timestamp;
public String getBucketIdentifier() {
return bucketId;
}
public void setBucketIdentifier(final String bucketIdentifier) {
this.bucketId = bucketIdentifier;
}
public String getFlowIdentifier() {
return flowId;
}
public void setFlowIdentifier(final String flowId) {
this.flowId = flowId;
}
public int getVersion() {
return version;
}
public void setVersion(final int version) {
this.version = version;
}
public String getFlowName() {
return flowName;
}
public void setFlowName(final String flowName) {
this.flowName = flowName;
}
public String getAuthor() {
return author;
}
public void setAuthor(final String author) {
this.author = author;
}
public String getComments() {
return comments;
}
public void setComments(final String comments) {
this.comments = comments;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(final long timestamp) {
this.timestamp = timestamp;
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow;
package org.apache.nifi.flow;
import io.swagger.annotations.ApiModelProperty;

View File

@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow;
package org.apache.nifi.flow;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.flow.ComponentType;
import org.apache.nifi.flow.VersionedComponent;
import java.util.List;
import java.util.Set;

View File

@ -155,7 +155,7 @@ public class StatelessKafkaConnectorUtil {
final List<ParameterOverride> parameterOverrides = parseParameterOverrides(properties);
final String dataflowName = properties.get(DATAFLOW_NAME);
final DataflowDefinition<?> dataflowDefinition;
final DataflowDefinition dataflowDefinition;
final StatelessBootstrap bootstrap;
try {
final Map<String, String> dataflowDefinitionProperties = new HashMap<>();

View File

@ -18,18 +18,18 @@
package org.apache.nifi.groups;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
public interface ProcessGroupSynchronizer {
/**
* Synchronize the given Process Group to match the proposed snaphsot
* @param group the Process Group to update
* @param proposedSnapshot the proposed/desired state for the process group
* @param proposedFlow the proposed/desired state for the process group
* @param synchronizationOptions options for how to synchronize the group
*/
void synchronize(ProcessGroup group, VersionedFlowSnapshot proposedSnapshot, GroupSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException;
void synchronize(ProcessGroup group, VersionedExternalFlow proposedFlow, GroupSynchronizationOptions synchronizationOptions) throws ProcessorInstantiationException;
void verifyCanSynchronize(ProcessGroup group, VersionedProcessGroup proposed, boolean verifyConnectionRemoval);

View File

@ -64,6 +64,7 @@ import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
@ -3778,7 +3779,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings,
public void updateFlow(final VersionedExternalFlow proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings,
final boolean updateDescendantVersionedFlows) {
final ComponentIdGenerator idGenerator = (proposedId, instanceId, destinationGroupId) -> generateUuid(proposedId, destinationGroupId, componentIdSeed);
@ -3815,7 +3816,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
public void synchronizeFlow(final VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
writeLock.lock();
try {
verifyCanUpdate(proposedSnapshot, true, !synchronizationOptions.isIgnoreLocalModifications());
@ -3924,13 +3925,13 @@ public final class StandardProcessGroup implements ProcessGroup {
}
@Override
public void verifyCanUpdate(final VersionedFlowSnapshot updatedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
public void verifyCanUpdate(final VersionedExternalFlow updatedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
readLock.lock();
try {
// flow id match and not dirty check concepts are only applicable to versioned flows
final VersionControlInformation versionControlInfo = getVersionControlInformation();
if (versionControlInfo != null) {
if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getMetadata().getFlowIdentifier())) {
throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
}

View File

@ -48,6 +48,7 @@ import org.apache.nifi.flow.ConnectableComponent;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedFunnel;
import org.apache.nifi.flow.VersionedLabel;
@ -71,8 +72,8 @@ import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedParameter;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.FlowComparator;
@ -142,13 +143,13 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
}
@Override
public void synchronize(final ProcessGroup group, final VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions options) {
public void synchronize(final ProcessGroup group, final VersionedExternalFlow versionedExternalFlow, final GroupSynchronizationOptions options) {
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(context.getExtensionManager(), context.getFlowMappingOptions());
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(group, context.getControllerServiceProvider(), context.getFlowRegistryClient(), true);
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Currently Loaded Flow", versionedGroup);
final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", proposedSnapshot.getFlowContents());
final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", versionedExternalFlow.getFlowContents());
final PropertyDecryptor decryptor = options.getPropertyDecryptor();
final FlowComparator flowComparator = new StandardFlowComparator(proposedFlow, localFlow, group.getAncestorServiceIds(), new StaticDifferenceDescriptor(), decryptor::decrypt);
@ -199,7 +200,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
.map(FlowDifference::toString)
.collect(Collectors.joining("\n"));
LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, proposedSnapshot,
LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow,
flowComparison.getDifferences().size(), differencesByLine);
}
@ -217,7 +218,7 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
context.getFlowManager().withParameterContextResolution(() -> {
try {
synchronize(group, proposedSnapshot.getFlowContents(), proposedSnapshot.getParameterContexts());
synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts());
} catch (final ProcessorInstantiationException pie) {
throw new RuntimeException(pie);
}

View File

@ -18,7 +18,9 @@
package org.apache.nifi.registry.flow;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.BucketClient;

View File

@ -17,7 +17,7 @@
package org.apache.nifi.registry.flow.mapping;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.VersionedProcessGroup;
import javax.xml.bind.annotation.XmlTransient;

View File

@ -67,12 +67,12 @@ import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedParameter;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;

View File

@ -20,7 +20,7 @@ package org.apache.nifi.registry.flow.mapping;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import java.util.Collections;

View File

@ -20,7 +20,7 @@ package org.apache.nifi.controller.flow;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterContext;
import java.util.List;
import java.util.Set;

View File

@ -33,6 +33,7 @@ import org.apache.nifi.controller.Triggerable;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterUpdate;
@ -40,7 +41,6 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
import org.apache.nifi.remote.RemoteGroupPort;
@ -880,7 +880,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @param updateDescendantVersionedFlows if a child/descendant Process Group is under Version Control, specifies whether or not to
* update the contents of that Process Group
*/
void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVersionedFlows);
void updateFlow(VersionedExternalFlow proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVersionedFlows);
/**
* Updates the Process Group to match the proposed flow
@ -889,7 +889,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @param synchronizationOptions options for how the synchronization should occur
* @param flowMappingOptions options for how to map the existing dataflow into Versioned components so that it can be compared to the proposed snapshot
*/
void synchronizeFlow(VersionedFlowSnapshot proposedSnapshot, GroupSynchronizationOptions synchronizationOptions, FlowMappingOptions flowMappingOptions);
void synchronizeFlow(VersionedExternalFlow proposedSnapshot, GroupSynchronizationOptions synchronizationOptions, FlowMappingOptions flowMappingOptions);
/**
* Verifies a template with the specified name can be created.
@ -976,7 +976,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
*
* @throws IllegalStateException if the Process Group is not in a state that will allow the update
*/
void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty);
void verifyCanUpdate(VersionedExternalFlow updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty);
/**
* Ensures that the Process Group can have any local changes reverted

View File

@ -18,6 +18,8 @@
package org.apache.nifi.registry.flow;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryException;

View File

@ -36,7 +36,7 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.mapping.ComponentIdLookup;
import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;

View File

@ -52,6 +52,9 @@ import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedReportingTask;
@ -69,9 +72,6 @@ import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.persistence.FlowConfigurationArchiveManager;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedParameter;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.DifferenceDescriptor;
import org.apache.nifi.registry.flow.diff.FlowComparator;
@ -298,9 +298,9 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
final Map<String, VersionedParameterContext> versionedParameterContextMap = new HashMap<>();
versionedFlow.getParameterContexts().forEach(context -> versionedParameterContextMap.put(context.getName(), context));
final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
versionedFlowSnapshot.setParameterContexts(versionedParameterContextMap);
versionedFlowSnapshot.setFlowContents(versionedFlow.getRootGroup());
final VersionedExternalFlow versionedExternalFlow = new VersionedExternalFlow();
versionedExternalFlow.setParameterContexts(versionedParameterContextMap);
versionedExternalFlow.setFlowContents(versionedFlow.getRootGroup());
// Inherit controller-level components.
inheritControllerServices(controller, versionedFlow, affectedComponentSet);
@ -313,7 +313,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
final ComponentScheduler componentScheduler = new FlowControllerComponentScheduler(controller);
if (rootGroup.isEmpty()) {
final VersionedProcessGroup versionedRoot = versionedFlowSnapshot.getFlowContents();
final VersionedProcessGroup versionedRoot = versionedExternalFlow.getFlowContents();
rootGroup = controller.getFlowManager().createProcessGroup(versionedRoot.getInstanceIdentifier());
rootGroup.setComments(versionedRoot.getComments());
rootGroup.setPosition(new Position(versionedRoot.getPosition().getX(), versionedRoot.getPosition().getY()));
@ -350,7 +350,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
.mapControllerServiceReferencesToVersionedId(false)
.build();
rootGroup.synchronizeFlow(versionedFlowSnapshot, syncOptions, flowMappingOptions);
rootGroup.synchronizeFlow(versionedExternalFlow, syncOptions, flowMappingOptions);
// Inherit templates, now that all necessary Process Groups have been created
inheritTemplates(controller, versionedFlow);

View File

@ -33,6 +33,7 @@ import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.groups.BatchCounts;
import org.apache.nifi.groups.DataValve;
import org.apache.nifi.groups.FlowFileConcurrency;
@ -48,7 +49,6 @@ import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.RemoteGroupPort;
@ -703,7 +703,7 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
public void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty) {
public void verifyCanUpdate(VersionedExternalFlow updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty) {
}
@Override
@ -715,11 +715,11 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVerisonedFlows) {
public void updateFlow(VersionedExternalFlow proposedFlow, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean updateDescendantVerisonedFlows) {
}
@Override
public void synchronizeFlow(final VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final GroupSynchronizationOptions synchronizationOptions, final FlowMappingOptions flowMappingOptions) {
}
@Override

View File

@ -19,13 +19,13 @@ package org.apache.nifi.integration;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import java.io.IOException;

View File

@ -26,9 +26,17 @@ import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedExternalFlowMetadata;
import org.apache.nifi.flow.VersionedFunnel;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.integration.DirectInjectionExtensionManager;
import org.apache.nifi.integration.FrameworkIntegrationTest;
@ -44,15 +52,7 @@ import org.apache.nifi.parameter.StandardParameterContext;
import org.apache.nifi.parameter.StandardParameterReferenceManager;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedParameter;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
import org.apache.nifi.registry.flow.diff.DifferenceType;
@ -103,7 +103,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
processor.setAutoTerminatedRelationships(Collections.singleton(REL_SUCCESS));
processor.setProperties(Collections.singletonMap(NopServiceReferencingProcessor.SERVICE.getName(), controllerService.getIdentifier()));
final VersionedFlowSnapshot proposedFlow = createFlowSnapshot(Collections.singletonList(controllerService), Collections.singletonList(processor), null);
final VersionedExternalFlow proposedFlow = createFlowSnapshot(Collections.singletonList(controllerService), Collections.singletonList(processor), null);
// Create an Inner Process Group and update it to match the Versioned Flow.
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@ -146,8 +146,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class);
processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "password"));
// Create a VersionedFlowSnapshot that contains the processor
final VersionedFlowSnapshot versionedFlowWithExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null);
// Create a VersionedExternalFlow that contains the processor
final VersionedExternalFlow versionedFlowWithExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null);
// Create child group
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@ -173,7 +173,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
assertEquals(DifferenceType.PROPERTY_PARAMETERIZED, differences.iterator().next().getDifferenceType());
// Create a Versioned Flow that contains the Parameter Reference.
final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null);
final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null);
// Ensure no difference between the current configuration and the versioned flow
differences = getLocalModifications(innerGroup, versionedFlowWithParameterReference);
@ -191,9 +191,9 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class);
processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"));
// Create a VersionedFlowSnapshot that contains the processor
// Create a VersionedExternalFlow that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
// Create child group
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@ -223,9 +223,9 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class);
processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"));
// Create a VersionedFlowSnapshot that contains the processor
// Create a VersionedExternalFlow that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
// Create child group
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@ -253,15 +253,15 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final ProcessorNode initialProcessor = createProcessorNode(UsernamePasswordProcessor.class);
initialProcessor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"));
// Create a VersionedFlowSnapshot that contains the processor
// Create a VersionedExternalFlow that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
Collections.singletonList(initialProcessor), Collections.singleton(parameter));
// Update processor to have an explicit value for the second version of the flow.
initialProcessor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value"));
final VersionedFlowSnapshot versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@ -290,7 +290,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "pass");
initialProcessor.setProperties(initialProperties);
final VersionedFlowSnapshot initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
// Update processor to have a different explicit value for both sensitive and non-sensitive properties and create a versioned flow for it.
final Map<String, String> updatedProperties = new HashMap<>();
@ -298,7 +298,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "pass");
initialProcessor.setProperties(updatedProperties);
final VersionedFlowSnapshot updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@ -335,7 +335,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}");
initialProcessor.setProperties(initialProperties);
final VersionedFlowSnapshot initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
// Update processor to have a different explicit value for both sensitive and non-sensitive properties and create a versioned flow for it.
final Map<String, String> updatedProperties = new HashMap<>();
@ -343,7 +343,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "#{other-param}");
initialProcessor.setProperties(updatedProperties);
final VersionedFlowSnapshot updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@ -369,12 +369,12 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
processorWithExplicitValue.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value"));
// Create a VersionedFlowSnapshot that contains the processor
// Create a VersionedExternalFlow that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
final VersionedFlowSnapshot versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
Collections.singletonList(processorWithParamRef), Collections.singleton(parameter));
final VersionedFlowSnapshot versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processorWithExplicitValue), null);
final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processorWithExplicitValue), null);
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@ -406,7 +406,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
groupA.addInputPort(port);
//Create a snapshot
final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
final VersionedExternalFlow version1 = createFlowSnapshot(groupA);
//Create Process Group B under Process Group A
final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA);
@ -421,7 +421,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final Connection connection = connect(groupA, processor, port, processor.getRelationships());
//Create another snapshot
final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
final VersionedExternalFlow version2 = createFlowSnapshot(groupA);
//Change Process Group A version to Version 1
groupA.updateFlow(version1, null, false, true, true);
@ -463,7 +463,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final Connection connection = connect(group, processor, port, processor.getRelationships());
//Create a snapshot
final VersionedFlowSnapshot version1 = createFlowSnapshot(group);
final VersionedExternalFlow version1 = createFlowSnapshot(group);
//Create Funnel under Process Group
Funnel funnel = getFlowController().getFlowManager().createFunnel("funnel-id");
@ -476,7 +476,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
group.removeOutputPort(port);
//Create another snapshot
final VersionedFlowSnapshot version2 = createFlowSnapshot(group);
final VersionedExternalFlow version2 = createFlowSnapshot(group);
//Change Process Group version to Version 1
group.updateFlow(version1, null, false, true, true);
@ -534,7 +534,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships());
//Create a snapshot
final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
final VersionedExternalFlow version1 = createFlowSnapshot(groupA);
//Modify Connection 1 to point to Processor 2
connection1.setDestination(processor2);
@ -543,7 +543,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
moveOutputPort(outputPort, groupB);
//Create another snapshot
final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
final VersionedExternalFlow version2 = createFlowSnapshot(groupA);
//Delete connection 2
groupA.removeConnection(connection2);
@ -552,7 +552,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
groupB.removeInputPort(inputPort);
//Create another snapshot
final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA);
final VersionedExternalFlow version3 = createFlowSnapshot(groupA);
//Change Process Group version to Version 1
groupA.updateFlow(version1, null, false, true, true);
@ -612,7 +612,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships());
//Create a snapshot
final VersionedFlowSnapshot version1 = createFlowSnapshot(groupA);
final VersionedExternalFlow version1 = createFlowSnapshot(groupA);
//Modify Connection 1 to point to Processor 2
connection1.setDestination(processor2);
@ -621,7 +621,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
groupA.removeOutputPort(outputPort);
//Create another snapshot
final VersionedFlowSnapshot version2 = createFlowSnapshot(groupA);
final VersionedExternalFlow version2 = createFlowSnapshot(groupA);
//Delete connection 2
groupA.removeConnection(connection2);
@ -630,7 +630,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
moveInputPort(inputPort, groupA);
//Create another snapshot
final VersionedFlowSnapshot version3 = createFlowSnapshot(groupA);
final VersionedExternalFlow version3 = createFlowSnapshot(groupA);
//Change Process Group version to Version 1
groupA.updateFlow(version1, null, false, true, true);
@ -686,10 +686,10 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
}
private Set<FlowDifference> getLocalModifications(final ProcessGroup processGroup, final VersionedFlowSnapshot versionedFlowSnapshot) {
private Set<FlowDifference> getLocalModifications(final ProcessGroup processGroup, final VersionedExternalFlow VersionedExternalFlow) {
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(getFlowController().getExtensionManager());
final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, getFlowController().getControllerServiceProvider(), getFlowController().getFlowRegistryClient(), true);
final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
final VersionedProcessGroup registryGroup = VersionedExternalFlow.getFlowContents();
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);
@ -707,14 +707,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
return differences;
}
private VersionedFlowSnapshot createFlowSnapshot(final ProcessGroup group, final List<ControllerServiceNode> controllerServices,
private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group, final List<ControllerServiceNode> controllerServices,
final List<ProcessorNode> processors, final Set<Parameter> parameters) {
final VersionedFlowSnapshotMetadata snapshotMetadata = createSnapshotMetadata();
final Bucket bucket = createBucket();
final VersionedFlow flow = createVersionedFlow();
createBundle();
final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(getExtensionManager());
@ -804,7 +798,14 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
flowContents.setFunnels(versionedFunnels);
flowContents.setConnections(versionedConnections);
final VersionedFlowSnapshot versionedFlowSnapshot = createVersionedFlowSnapshot(snapshotMetadata, bucket, flow, flowContents);
final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
final VersionedExternalFlowMetadata metadata = new VersionedExternalFlowMetadata();
externalFlow.setMetadata(metadata);
metadata.setBucketIdentifier("unit-test-bucket");
metadata.setFlowIdentifier("unit-test-flow");
metadata.setVersion(1);
metadata.setFlowName("unit-test-flow");
if (parameters != null) {
final Set<VersionedParameter> versionedParameters = new HashSet<>();
@ -820,31 +821,22 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final VersionedParameterContext versionedParameterContext = new VersionedParameterContext();
versionedParameterContext.setName("Unit Test Context");
versionedParameterContext.setParameters(versionedParameters);
versionedFlowSnapshot.setParameterContexts(Collections.singletonMap(versionedParameterContext.getName(), versionedParameterContext));
externalFlow.setParameterContexts(Collections.singletonMap(versionedParameterContext.getName(), versionedParameterContext));
flowContents.setParameterContextName("Unit Test Context");
}
return versionedFlowSnapshot;
return externalFlow;
}
private VersionedFlowSnapshot createFlowSnapshot(final List<ControllerServiceNode> controllerServices, final List<ProcessorNode> processors, final Set<Parameter> parameters) {
private VersionedExternalFlow createFlowSnapshot(final List<ControllerServiceNode> controllerServices, final List<ProcessorNode> processors, final Set<Parameter> parameters) {
return createFlowSnapshot(null, controllerServices, processors, parameters);
}
private VersionedFlowSnapshot createFlowSnapshot(final ProcessGroup group) {
return createFlowSnapshot(group, Collections.EMPTY_LIST, Collections.EMPTY_LIST, null);
private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group) {
return createFlowSnapshot(group, Collections.emptyList(), Collections.emptyList(), null);
}
@NotNull
private VersionedFlowSnapshot createVersionedFlowSnapshot(VersionedFlowSnapshotMetadata snapshotMetadata, Bucket bucket, VersionedFlow flow, VersionedProcessGroup flowContents) {
final VersionedFlowSnapshot versionedFlowSnapshot = new VersionedFlowSnapshot();
versionedFlowSnapshot.setSnapshotMetadata(snapshotMetadata);
versionedFlowSnapshot.setBucket(bucket);
versionedFlowSnapshot.setFlow(flow);
versionedFlowSnapshot.setFlowContents(flowContents);
return versionedFlowSnapshot;
}
@NotNull
private VersionedProcessGroup createFlowContents() {
@ -882,14 +874,4 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
return bucket;
}
@NotNull
private VersionedFlowSnapshotMetadata createSnapshotMetadata() {
final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
snapshotMetadata.setAuthor("unit-test");
snapshotMetadata.setBucketIdentifier("unit-test-bucket");
snapshotMetadata.setFlowIdentifier("unit-test-flow");
snapshotMetadata.setTimestamp(System.currentTimeMillis());
snapshotMetadata.setVersion(1);
return snapshotMetadata;
}
}

View File

@ -51,7 +51,7 @@ import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.flow.ComponentType;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.flow.PortType;
@ -61,8 +61,8 @@ import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedFunnel;
import org.apache.nifi.flow.VersionedLabel;
import org.apache.nifi.registry.flow.VersionedParameter;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;

View File

@ -25,8 +25,8 @@ import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterParser;
import org.apache.nifi.parameter.ParameterTokenList;
import org.apache.nifi.registry.flow.VersionedParameter;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;

View File

@ -29,10 +29,10 @@ import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.web.api.dto.AccessPolicyDTO;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.BulletinBoardDTO;

View File

@ -98,11 +98,14 @@ import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConfigurableComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
@ -129,10 +132,10 @@ import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VersionedFlowConverter;
import org.apache.nifi.registry.authorization.Permissions;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.RestBasedFlowRegistry;
@ -141,7 +144,6 @@ import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
import org.apache.nifi.registry.flow.diff.DifferenceType;
@ -4919,7 +4921,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public void verifyCanUpdate(final String groupId, final VersionedFlowSnapshot proposedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty);
final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(proposedFlow);
group.verifyCanUpdate(externalFlow, verifyConnectionRemoval, verifyNotDirty);
}
@Override
@ -4936,7 +4939,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// verify that the process group can be updated to the given snapshot. We do not verify that connections can
// be removed, because the flow may still be running, and it only matters that the connections can be removed once the components
// have been stopped.
group.verifyCanUpdate(versionedFlowSnapshot, false, false);
final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(versionedFlowSnapshot);
group.verifyCanUpdate(externalFlow, false, false);
}
@Override
@ -5347,7 +5351,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public RevisionUpdate<ProcessGroupDTO> update() {
// update the Process Group
final ProcessGroup updatedProcessGroup = processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings,
final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(proposedFlowSnapshot);
processGroupDAO.updateProcessGroupFlow(groupId, externalFlow, versionControlInfo, componentIdSeed, verifyNotModified, updateSettings,
updateDescendantVersionedFlows);
// update the revisions

View File

@ -31,7 +31,7 @@ import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.ResumeFlowException;

View File

@ -108,7 +108,7 @@ import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;

View File

@ -19,8 +19,8 @@ package org.apache.nifi.web.dao;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
@ -136,7 +136,7 @@ public interface ProcessGroupDAO {
* update the contents of that Process Group
* @return the process group
*/
ProcessGroup updateProcessGroupFlow(String groupId, VersionedFlowSnapshot proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed,
ProcessGroup updateProcessGroupFlow(String groupId, VersionedExternalFlow proposedSnapshot, VersionControlInformationDTO versionControlInformation, String componentIdSeed,
boolean verifyNotModified, boolean updateSettings, boolean updateDescendantVersionedFlows);
/**

View File

@ -28,6 +28,8 @@ import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.groups.ProcessGroup;
@ -36,8 +38,6 @@ import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.ResourceNotFoundException;
@ -424,7 +424,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
}
@Override
public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedFlowSnapshot proposedSnapshot, final VersionControlInformationDTO versionControlInformation,
public ProcessGroup updateProcessGroupFlow(final String groupId, final VersionedExternalFlow proposedSnapshot, final VersionControlInformationDTO versionControlInformation,
final String componentIdSeed, final boolean verifyNotModified, final boolean updateSettings, final boolean updateDescendantVersionedFlows) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);

View File

@ -40,11 +40,11 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.RestBasedFlowRegistry;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.web.api.dto.DtoFactory;

View File

@ -41,6 +41,7 @@ import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedLabel;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
@ -60,6 +61,7 @@ import org.apache.nifi.processors.stateless.retrieval.CachingDataflowProvider;
import org.apache.nifi.processors.stateless.retrieval.DataflowProvider;
import org.apache.nifi.processors.stateless.retrieval.FileSystemDataflowProvider;
import org.apache.nifi.processors.stateless.retrieval.RegistryDataflowProvider;
import org.apache.nifi.registry.VersionedFlowConverter;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
@ -462,7 +464,7 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
final StatelessEngineConfiguration engineConfiguration = createEngineConfiguration(context, dataflowIndex);
final StatelessBootstrap bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, Thread.currentThread().getContextClassLoader());
final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition = createDataflowDefinition(context, flowSnapshot);
final DataflowDefinition dataflowDefinition = createDataflowDefinition(context, flowSnapshot);
final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition);
dataflow.initialize();
@ -746,7 +748,8 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
}
private DataflowDefinition<VersionedFlowSnapshot> createDataflowDefinition(final ProcessContext context, final VersionedFlowSnapshot flowSnapshot) {
private DataflowDefinition createDataflowDefinition(final ProcessContext context, final VersionedFlowSnapshot flowSnapshot) {
final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(flowSnapshot);
final ParameterValueProviderDefinition parameterValueProviderDefinition = new ParameterValueProviderDefinition();
parameterValueProviderDefinition.setType("org.apache.nifi.stateless.parameter.OverrideParameterValueProvider");
parameterValueProviderDefinition.setName("Parameter Override");
@ -783,10 +786,10 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
}
};
return new DataflowDefinition<VersionedFlowSnapshot>() {
return new DataflowDefinition() {
@Override
public VersionedFlowSnapshot getFlowSnapshot() {
return flowSnapshot;
public VersionedExternalFlow getVersionedExternalFlow() {
return externalFlow;
}
@Override

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedExternalFlowMetadata;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
public class VersionedFlowConverter {
public static VersionedExternalFlow createVersionedExternalFlow(final VersionedFlowSnapshot flowSnapshot) {
final VersionedExternalFlowMetadata externalFlowMetadata = new VersionedExternalFlowMetadata();
final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
if (snapshotMetadata != null) {
externalFlowMetadata.setAuthor(snapshotMetadata.getAuthor());
externalFlowMetadata.setBucketIdentifier(snapshotMetadata.getBucketIdentifier());
externalFlowMetadata.setComments(snapshotMetadata.getComments());
externalFlowMetadata.setFlowIdentifier(snapshotMetadata.getFlowIdentifier());
externalFlowMetadata.setTimestamp(snapshotMetadata.getTimestamp());
externalFlowMetadata.setVersion(snapshotMetadata.getVersion());
}
final VersionedFlow versionedFlow = flowSnapshot.getFlow();
if (versionedFlow == null) {
externalFlowMetadata.setFlowName(flowSnapshot.getFlowContents().getName());
} else {
externalFlowMetadata.setFlowName(versionedFlow.getName());
}
final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
externalFlow.setFlowContents(flowSnapshot.getFlowContents());
externalFlow.setExternalControllerServices(flowSnapshot.getExternalControllerServices());
externalFlow.setParameterContexts(flowSnapshot.getParameterContexts());
externalFlow.setMetadata(externalFlowMetadata);
return externalFlow;
}
}

View File

@ -19,6 +19,8 @@ package org.apache.nifi.registry.flow;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.bucket.Bucket;
@ -117,7 +119,7 @@ public class VersionedFlowSnapshot {
@ApiModelProperty(value = "The parameter contexts referenced by process groups in the flow contents. " +
"The mapping is from the name of the context to the context instance, and it is expected that any " +
"context in this map is referenced by at least one process group in this flow.")
public Map<String,VersionedParameterContext> getParameterContexts() {
public Map<String, VersionedParameterContext> getParameterContexts() {
return parameterContexts;
}

View File

@ -20,7 +20,7 @@ package org.apache.nifi.registry.flow.diff;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterContext;
import java.util.Set;

View File

@ -20,7 +20,7 @@ package org.apache.nifi.registry.flow.diff;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterContext;
import java.util.Collections;
import java.util.HashSet;

View File

@ -30,8 +30,8 @@ import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.flow.VersionedRemoteGroupPort;
import org.apache.nifi.flow.VersionedRemoteProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.registry.flow.VersionedParameter;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import java.util.Collection;
import java.util.Collections;

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.registry.serialization;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;

View File

@ -60,12 +60,12 @@ import org.apache.nifi.registry.extension.repo.ExtensionRepoGroup;
import org.apache.nifi.registry.extension.repo.ExtensionRepoVersion;
import org.apache.nifi.registry.extension.repo.ExtensionRepoVersionSummary;
import org.apache.nifi.registry.field.Fields;
import org.apache.nifi.registry.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.registry.flow.VersionedParameter;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedPropertyDescriptor;

View File

@ -17,6 +17,7 @@
package org.apache.nifi.stateless.flow;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterValueProviderDefinition;
import org.apache.nifi.stateless.config.ReportingTaskDefinition;
@ -24,8 +25,8 @@ import org.apache.nifi.stateless.config.ReportingTaskDefinition;
import java.util.List;
import java.util.Set;
public interface DataflowDefinition<T> {
T getFlowSnapshot();
public interface DataflowDefinition {
VersionedExternalFlow getVersionedExternalFlow();
String getFlowName();

View File

@ -27,9 +27,9 @@ import java.util.List;
import java.util.Map;
public interface DataflowDefinitionParser {
DataflowDefinition<?> parseFlowDefinition(File configurationFile, StatelessEngineConfiguration engineConfiguration, List<ParameterOverride> parameterOverrides)
DataflowDefinition parseFlowDefinition(File configurationFile, StatelessEngineConfiguration engineConfiguration, List<ParameterOverride> parameterOverrides)
throws StatelessConfigurationException, IOException;
DataflowDefinition<?> parseFlowDefinition(Map<String, String> configurationProperties, StatelessEngineConfiguration engineConfiguration, List<ParameterOverride> parameterOverrides)
DataflowDefinition parseFlowDefinition(Map<String, String> configurationProperties, StatelessEngineConfiguration engineConfiguration, List<ParameterOverride> parameterOverrides)
throws StatelessConfigurationException, IOException;
}

View File

@ -22,7 +22,7 @@ import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import java.io.IOException;
public interface StatelessDataflowFactory<T> {
StatelessDataflow createDataflow(StatelessEngineConfiguration statelessEngineConfiguration, DataflowDefinition<T> dataflowDefinition, ClassLoader extensionClassLoader)
public interface StatelessDataflowFactory {
StatelessDataflow createDataflow(StatelessEngineConfiguration statelessEngineConfiguration, DataflowDefinition dataflowDefinition, ClassLoader extensionClassLoader)
throws IOException, StatelessConfigurationException;
}

View File

@ -89,7 +89,7 @@ public class RunStatelessFlow {
final long initializeStart = System.currentTimeMillis();
final StatelessBootstrap bootstrap = StatelessBootstrap.bootstrap(engineConfiguration);
final DataflowDefinition<?> dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile, parameterOverrides);
final DataflowDefinition dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile, parameterOverrides);
final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition);
dataflow.initialize();

View File

@ -19,6 +19,7 @@ package org.apache.nifi.stateless.bootstrap;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.nar.NarClassLoader;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.NarUnpacker;
import org.apache.nifi.nar.SystemBundle;
@ -67,24 +68,24 @@ public class StatelessBootstrap {
this.engineConfiguration = engineConfiguration;
}
public <T> StatelessDataflow createDataflow(final DataflowDefinition<T> dataflowDefinition)
public StatelessDataflow createDataflow(final DataflowDefinition dataflowDefinition)
throws IOException, StatelessConfigurationException {
final StatelessDataflowFactory<T> dataflowFactory = getSingleInstance(engineClassLoader, StatelessDataflowFactory.class);
final StatelessDataflowFactory dataflowFactory = getSingleInstance(engineClassLoader, StatelessDataflowFactory.class);
final StatelessDataflow dataflow = dataflowFactory.createDataflow(engineConfiguration, dataflowDefinition, extensionClassLoader);
return dataflow;
}
public DataflowDefinition<?> parseDataflowDefinition(final File flowDefinitionFile, final List<ParameterOverride> parameterOverrides)
public DataflowDefinition parseDataflowDefinition(final File flowDefinitionFile, final List<ParameterOverride> parameterOverrides)
throws StatelessConfigurationException, IOException {
final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(engineClassLoader, DataflowDefinitionParser.class);
final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionFile, engineConfiguration, parameterOverrides);
final DataflowDefinition dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionFile, engineConfiguration, parameterOverrides);
return dataflowDefinition;
}
public DataflowDefinition<?> parseDataflowDefinition(final Map<String, String> flowDefinitionProperties, final List<ParameterOverride> parameterOverrides)
public DataflowDefinition parseDataflowDefinition(final Map<String, String> flowDefinitionProperties, final List<ParameterOverride> parameterOverrides)
throws StatelessConfigurationException, IOException {
final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(engineClassLoader, DataflowDefinitionParser.class);
final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionProperties, engineConfiguration, parameterOverrides);
final DataflowDefinition dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionProperties, engineConfiguration, parameterOverrides);
return dataflowDefinition;
}
@ -119,47 +120,18 @@ public class StatelessBootstrap {
final long unpackMillis = System.currentTimeMillis() - unpackStart;
logger.info("Unpacked NAR files in {} millis", unpackMillis);
final BlockListClassLoader statelessClassLoader = createExtensionRootClassLoader(narDirectory, rootClassLoader);
final File statelessNarWorkingDir = locateStatelessNarWorkingDirectory(extensionsWorkingDir);
final File statelessNarInf = new File(statelessNarWorkingDir, "NAR-INF");
final File statelessNarDependencies = new File(statelessNarInf, "bundled-dependencies");
final File[] statelessNarContents = statelessNarDependencies.listFiles();
if (statelessNarContents == null || statelessNarContents.length == 0) {
throw new IOException("Could not access contents of Stateless NAR dependencies at " + statelessNarDependencies);
final NarClassLoader engineClassLoader;
try {
engineClassLoader = new NarClassLoader(statelessNarWorkingDir, statelessClassLoader);
} catch (final ClassNotFoundException e) {
throw new IOException("Could not create NarClassLoader for Stateless NAR located at " + statelessNarWorkingDir.getAbsolutePath(), e);
}
final List<URL> urls = new ArrayList<>();
final List<String> filenames = new ArrayList<>();
for (final File dependency : statelessNarContents) {
final URL url = dependency.toURI().toURL();
urls.add(url);
filenames.add(dependency.getName());
}
logger.info("Creating Stateless Bootstrap with the following files in the classpath: {}", filenames);
final URL[] urlArray = urls.toArray(new URL[0]);
final BlockListClassLoader extensionClassLoader = createExtensionRootClassLoader(narDirectory, rootClassLoader);
final Set<String> classesBlockedExtensions = extensionClassLoader.getClassesBlocked();
// For the engine ClassLoader, we also want to block everything that we block for extensions except for the classes
// that the engine needs specifically (i.e., the classes in the stateless engine nar). We do this because there are some
// classes that may need to be shared between the bootstrap and the caller. For example, VersionedFlowSnapshot.
final URLClassLoader engineUrlClassLoader = new URLClassLoader(urlArray, rootClassLoader);
final Set<String> engineSpecificClassNames = new HashSet<>();
final Set<String> engineSpecificFiles = new HashSet<>();
findClassNamesInJars(urls, engineSpecificClassNames, engineSpecificFiles);
final Set<String> classesBlockedEngine = new HashSet<>(classesBlockedExtensions);
classesBlockedEngine.removeAll(engineSpecificClassNames);
logger.debug("Blocking the following classes from being loaded from parent {} for Engine by Stateless ClassLoaders: {}", engineUrlClassLoader, classesBlockedEngine);
logger.debug("Blocking the following files from being loaded from parent {} for Engine by Stateless ClassLoaders: {}", engineUrlClassLoader, engineSpecificFiles);
final BlockListClassLoader engineClassLoader = new BlockListClassLoader(engineUrlClassLoader, classesBlockedEngine);
Thread.currentThread().setContextClassLoader(engineClassLoader);
return new StatelessBootstrap(engineClassLoader, extensionClassLoader, engineConfiguration);
return new StatelessBootstrap(engineClassLoader, statelessClassLoader, engineConfiguration);
}
/**

View File

@ -30,10 +30,10 @@ import org.apache.nifi.stateless.engine.StatelessEngine;
import java.util.Map;
public class StatelessReportingContext extends AbstractReportingContext implements ReportingContext {
private final StatelessEngine<?> statelessEngine;
private final StatelessEngine statelessEngine;
private final FlowManager flowManager;
public StatelessReportingContext(final StatelessEngine<?> statelessEngine, final FlowManager flowManager,
public StatelessReportingContext(final StatelessEngine statelessEngine, final FlowManager flowManager,
final Map<PropertyDescriptor, String> properties, final ReportingTask reportingTask,
final VariableRegistry variableRegistry, final ParameterLookup parameterLookup) {
super(reportingTask, statelessEngine.getBulletinRepository(), properties, statelessEngine.getControllerServiceProvider(), parameterLookup, variableRegistry);

View File

@ -35,9 +35,9 @@ import org.apache.nifi.stateless.engine.StatelessEngine;
public class StatelessReportingTaskNode extends AbstractReportingTaskNode implements ReportingTaskNode {
private final FlowManager flowManager;
private final StatelessEngine<?> statelessEngine;
private final StatelessEngine statelessEngine;
public StatelessReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final StatelessEngine<?> statelessEngine,
public StatelessReportingTaskNode(final LoggableComponent<ReportingTask> reportingTask, final String id, final StatelessEngine statelessEngine,
final FlowManager flowManager, final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
final ComponentVariableRegistry variableRegistry, final ReloadComponent reloadComponent, final ExtensionManager extensionManager,
final ValidationTrigger validationTrigger) {

View File

@ -104,7 +104,7 @@ public class StatelessProcessScheduler implements ProcessScheduler {
}
}
public void initialize(final ProcessContextFactory processContextFactory, final DataflowDefinition<?> dataflowDefinition) {
public void initialize(final ProcessContextFactory processContextFactory, final DataflowDefinition dataflowDefinition) {
this.processContextFactory = processContextFactory;
final String threadNameSuffix = dataflowDefinition.getFlowName() == null ? "" : " for dataflow " + dataflowDefinition.getFlowName();

View File

@ -18,6 +18,10 @@
package org.apache.nifi.registry.flow;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedExternalFlowMetadata;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryException;
@ -40,7 +44,7 @@ public class InMemoryFlowRegistry implements FlowRegistry {
private volatile String name;
private volatile String url;
private final Map<FlowCoordinates, List<VersionedFlowSnapshot>> flowSnapshots = new ConcurrentHashMap<>();
private final Map<FlowCoordinates, List<VersionedExternalFlow>> flowSnapshots = new ConcurrentHashMap<>();
@Override
public String getIdentifier() {
@ -133,21 +137,44 @@ public class InMemoryFlowRegistry implements FlowRegistry {
@Override
public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows) throws NiFiRegistryException {
final FlowCoordinates flowCoordinates = new FlowCoordinates(bucketId, flowId);
final List<VersionedFlowSnapshot> snapshots = flowSnapshots.get(flowCoordinates);
final List<VersionedExternalFlow> snapshots = flowSnapshots.get(flowCoordinates);
final VersionedFlowSnapshot versionedFlowSnapshot = snapshots.stream()
.filter(snapshot -> snapshot.getSnapshotMetadata().getVersion() == version)
final VersionedExternalFlow versionedExternalFlow = snapshots.stream()
.filter(snapshot -> snapshot.getMetadata().getVersion() == version)
.findAny()
.orElseThrow(() -> new NiFiRegistryException("Could not find flow: bucketId=" + bucketId + ", flowId=" + flowId + ", version=" + version));
final VersionedFlowSnapshot versionedFlowSnapshot = convertToVersionedFlowSnapshot(versionedExternalFlow);
return versionedFlowSnapshot;
}
private VersionedFlowSnapshot convertToVersionedFlowSnapshot(final VersionedExternalFlow externalFlow) {
final VersionedExternalFlowMetadata externalFlowMetadata = externalFlow.getMetadata();
final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
snapshotMetadata.setBucketIdentifier(externalFlowMetadata.getBucketIdentifier());
snapshotMetadata.setVersion(externalFlowMetadata.getVersion());
snapshotMetadata.setFlowIdentifier(externalFlowMetadata.getFlowIdentifier());
final VersionedFlow versionedFlow = new VersionedFlow();
versionedFlow.setName(externalFlowMetadata.getFlowName());
versionedFlow.setIdentifier(externalFlowMetadata.getFlowIdentifier());
versionedFlow.setBucketIdentifier(externalFlowMetadata.getBucketIdentifier());
final VersionedFlowSnapshot flowSnapshot = new VersionedFlowSnapshot();
flowSnapshot.setExternalControllerServices(externalFlow.getExternalControllerServices());
flowSnapshot.setFlowContents(externalFlow.getFlowContents());
flowSnapshot.setParameterContexts(externalFlow.getParameterContexts());
flowSnapshot.setSnapshotMetadata(snapshotMetadata);
flowSnapshot.setFlow(versionedFlow);
return flowSnapshot;
}
@Override
public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) {
final FlowCoordinates flowCoordinates = new FlowCoordinates(bucketId, flowId);
final List<VersionedFlowSnapshot> snapshots = flowSnapshots.get(flowCoordinates);
final List<VersionedExternalFlow> snapshots = flowSnapshots.get(flowCoordinates);
final VersionedFlow versionedFlow = new VersionedFlow();
versionedFlow.setBucketIdentifier(bucketId);
@ -160,8 +187,8 @@ public class InMemoryFlowRegistry implements FlowRegistry {
}
public synchronized void addFlowSnapshot(final VersionedFlowSnapshot flowSnapshot) {
final VersionedFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
public synchronized void addFlowSnapshot(final VersionedExternalFlow versionedExternalFlow) {
final VersionedExternalFlowMetadata metadata = versionedExternalFlow.getMetadata();
final String bucketId;
final String flowId;
final int version;
@ -177,16 +204,16 @@ public class InMemoryFlowRegistry implements FlowRegistry {
final FlowCoordinates coordinates = new FlowCoordinates(bucketId, flowId);
final List<VersionedFlowSnapshot> snapshots = flowSnapshots.computeIfAbsent(coordinates, key -> Collections.synchronizedList(new ArrayList<>()));
final Optional<VersionedFlowSnapshot> optionalSnapshot = snapshots.stream()
.filter(snapshot -> snapshot.getSnapshotMetadata().getVersion() == version)
final List<VersionedExternalFlow> snapshots = flowSnapshots.computeIfAbsent(coordinates, key -> Collections.synchronizedList(new ArrayList<>()));
final Optional<VersionedExternalFlow> optionalSnapshot = snapshots.stream()
.filter(snapshot -> snapshot.getMetadata().getVersion() == version)
.findAny();
if (optionalSnapshot.isPresent()) {
throw new IllegalStateException("Versioned Flow Snapshot already exists for bucketId=" + bucketId + ", flowId=" + flowId + ", version=" + version);
}
snapshots.add(flowSnapshot);
snapshots.add(versionedExternalFlow);
}
private static class FlowCoordinates {

View File

@ -24,7 +24,9 @@ import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.registry.VersionedFlowConverter;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
@ -98,13 +100,13 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar
private static final String TRANSACTION_THRESHOLD_TIME = "nifi.stateless.transaction.thresholds.time";
public DataflowDefinition<VersionedFlowSnapshot> parseFlowDefinition(final File propertiesFile, final StatelessEngineConfiguration engineConfig, final List<ParameterOverride> parameterOverrides)
public DataflowDefinition parseFlowDefinition(final File propertiesFile, final StatelessEngineConfiguration engineConfig, final List<ParameterOverride> parameterOverrides)
throws IOException, StatelessConfigurationException {
final Map<String, String> properties = readPropertyValues(propertiesFile);
return parseFlowDefinition(properties, engineConfig, parameterOverrides);
}
public DataflowDefinition<VersionedFlowSnapshot> parseFlowDefinition(final Map<String, String> properties, final StatelessEngineConfiguration engineConfig,
public DataflowDefinition parseFlowDefinition(final Map<String, String> properties, final StatelessEngineConfiguration engineConfig,
final List<ParameterOverride> parameterOverrides) throws IOException, StatelessConfigurationException {
// A common problem is users accidentally including whitespace at the beginning or end of property values.
@ -115,17 +117,16 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar
final Set<String> failurePortNames = getFailurePortNames(properties);
final VersionedFlowSnapshot flowSnapshot = fetchVersionedFlowSnapshot(properties, engineConfig.getSslContext());
final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(flowSnapshot);
final List<ParameterContextDefinition> parameterContextDefinitions = getParameterContexts(properties);
final List<ReportingTaskDefinition> reportingTaskDefinitions = getReportingTasks(properties);
final List<ParameterValueProviderDefinition> parameterValueProviderDefinitions = getParameterValueProviders(properties, parameterOverrides);
final TransactionThresholds transactionThresholds = getTransactionThresholds(properties);
final String rootGroupName = flowSnapshot.getFlowContents().getName();
final String flowName = properties.getOrDefault(FLOW_NAME, rootGroupName);
final String flowName = properties.getOrDefault(FLOW_NAME, externalFlow.getMetadata().getFlowName());
return new StandardDataflowDefinition.Builder()
.flowSnapshot(flowSnapshot)
.flowName(flowName)
.versionedExternalFlow(externalFlow)
.failurePortNames(failurePortNames)
.parameterContexts(parameterContextDefinitions)
.reportingTasks(reportingTaskDefinitions)
@ -480,6 +481,7 @@ public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionPar
return envValue == null ? "" : envValue;
}
private VersionedFlowSnapshot fetchVersionedFlowSnapshot(final Map<String, String> properties, final SslContextDefinition sslContextDefinition)
throws IOException, StatelessConfigurationException {

View File

@ -56,7 +56,6 @@ import org.apache.nifi.processor.StandardProcessorInitializationContext;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
@ -74,14 +73,14 @@ import java.util.Set;
public class ComponentBuilder {
private static final Logger logger = LoggerFactory.getLogger(ComponentBuilder.class);
private StatelessEngine<VersionedFlowSnapshot> statelessEngine;
private StatelessEngine statelessEngine;
private FlowManager flowManager;
private String identifier;
private String type;
private BundleCoordinate bundleCoordinate;
private Set<URL> additionalClassPathUrls;
public ComponentBuilder statelessEngine(final StatelessEngine<VersionedFlowSnapshot> statelessEngine) {
public ComponentBuilder statelessEngine(final StatelessEngine statelessEngine) {
this.statelessEngine = statelessEngine;
return this;
}

View File

@ -43,6 +43,7 @@ import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.extensions.ExtensionRepository;
import org.apache.nifi.flow.VersionedExternalFlowMetadata;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogRepositoryFactory;
@ -57,8 +58,6 @@ import org.apache.nifi.processor.StandardValidationContext;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
@ -98,7 +97,7 @@ import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSnapshot> {
public class StandardStatelessEngine implements StatelessEngine {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessEngine.class);
private static final int CONCURRENT_EXTENSION_DOWNLOADS = 8;
public static final Duration DEFAULT_STATUS_TASK_PERIOD = Duration.of(1, ChronoUnit.MINUTES);
@ -158,13 +157,14 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
}
@Override
public StatelessDataflow createFlow(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) {
public StatelessDataflow createFlow(final DataflowDefinition dataflowDefinition) {
if (!this.initialized) {
throw new IllegalStateException("Cannot create Flow without first initializing Stateless Engine");
}
final VersionedFlow versionedFlow = dataflowDefinition.getFlowSnapshot().getFlow();
logger.info("Building Dataflow {}", versionedFlow == null ? "" : versionedFlow.getName());
final VersionedExternalFlowMetadata metadata = dataflowDefinition.getVersionedExternalFlow().getMetadata();
final String flowName = metadata == null ? "" : metadata.getFlowName();
logger.info("Building Dataflow {}", flowName);
loadNecessaryExtensions(dataflowDefinition);
@ -178,7 +178,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
rootGroup.addProcessGroup(childGroup);
LogRepositoryFactory.purge();
childGroup.updateFlow(dataflowDefinition.getFlowSnapshot(), "stateless-component-id-seed", false, true, true);
childGroup.updateFlow(dataflowDefinition.getVersionedExternalFlow(), "stateless-component-id-seed", false, true, true);
final ParameterValueProvider parameterValueProvider = createParameterValueProvider(dataflowDefinition);
@ -205,7 +205,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return dataflow;
}
private ParameterValueProvider createParameterValueProvider(final DataflowDefinition<?> dataflowDefinition) {
private ParameterValueProvider createParameterValueProvider(final DataflowDefinition dataflowDefinition) {
// Create a Provider for each definition
final List<ParameterValueProvider> providers = new ArrayList<>();
for (final ParameterValueProviderDefinition definition : dataflowDefinition.getParameterValueProviderDefinitions()) {
@ -300,8 +300,8 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
}
}
private void loadNecessaryExtensions(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) {
final VersionedProcessGroup group = dataflowDefinition.getFlowSnapshot().getFlowContents();
private void loadNecessaryExtensions(final DataflowDefinition dataflowDefinition) {
final VersionedProcessGroup group = dataflowDefinition.getVersionedExternalFlow().getFlowContents();
final Set<BundleCoordinate> requiredBundles = gatherRequiredBundles(group);
for (final ReportingTaskDefinition reportingTaskDefinition : dataflowDefinition.getReportingTaskDefinitions()) {
@ -380,7 +380,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
}
private List<ReportingTaskNode> createReportingTasks(final DataflowDefinition<?> dataflowDefinition) {
private List<ReportingTaskNode> createReportingTasks(final DataflowDefinition dataflowDefinition) {
final List<ReportingTaskNode> reportingTaskNodes = new ArrayList<>();
for (final ReportingTaskDefinition taskDefinition : dataflowDefinition.getReportingTaskDefinitions()) {
final ReportingTaskNode taskNode = createReportingTask(taskDefinition);

View File

@ -37,11 +37,11 @@ import org.apache.nifi.stateless.flow.StatelessDataflow;
import java.time.Duration;
public interface StatelessEngine<T> {
public interface StatelessEngine {
void initialize(StatelessEngineInitializationContext initializationContext);
StatelessDataflow createFlow(DataflowDefinition<T> dataflowDefinition);
StatelessDataflow createFlow(DataflowDefinition dataflowDefinition);
ExtensionManager getExtensionManager();

View File

@ -65,7 +65,6 @@ import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.StandardRemoteProcessGroup;
import org.apache.nifi.reporting.BulletinRepository;
@ -91,12 +90,12 @@ import static java.util.Objects.requireNonNull;
public class StatelessFlowManager extends AbstractFlowManager implements FlowManager {
private static final Logger logger = LoggerFactory.getLogger(StatelessFlowManager.class);
private final StatelessEngine<VersionedFlowSnapshot> statelessEngine;
private final StatelessEngine statelessEngine;
private final SSLContext sslContext;
private final BulletinRepository bulletinRepository;
public StatelessFlowManager(final FlowFileEventRepository flowFileEventRepository, final ParameterContextManager parameterContextManager,
final StatelessEngine<VersionedFlowSnapshot> statelessEngine, final BooleanSupplier flowInitializedCheck,
final StatelessEngine statelessEngine, final BooleanSupplier flowInitializedCheck,
final SSLContext sslContext, final BulletinRepository bulletinRepository) {
super(flowFileEventRepository, parameterContextManager, statelessEngine.getFlowRegistryClient(), flowInitializedCheck);

View File

@ -29,7 +29,6 @@ import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.TerminationAwareLogger;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.StandardConfigurationContext;
@ -40,7 +39,6 @@ import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
@ -52,9 +50,9 @@ import java.util.Set;
public class StatelessReloadComponent implements ReloadComponent {
private static final Logger logger = LoggerFactory.getLogger(StatelessReloadComponent.class);
private final StatelessEngine<VersionedFlowSnapshot> statelessEngine;
private final StatelessEngine statelessEngine;
public StatelessReloadComponent(final StatelessEngine<VersionedFlowSnapshot> statelessEngine) {
public StatelessReloadComponent(final StatelessEngine statelessEngine) {
this.statelessEngine = statelessEngine;
}
@ -169,7 +167,7 @@ public class StatelessReloadComponent implements ReloadComponent {
}
@Override
public void reload(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls) throws ReportingTaskInstantiationException {
public void reload(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls) {
if (existingNode == null) {
throw new IllegalStateException("Existing ReportingTaskNode cannot be null");
}

View File

@ -19,10 +19,10 @@ package org.apache.nifi.stateless.flow;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.stateless.config.ParameterContextDefinition;
import org.apache.nifi.stateless.config.ParameterValueProviderDefinition;
import org.apache.nifi.stateless.config.ReportingTaskDefinition;
@ -35,33 +35,31 @@ import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
public class StandardDataflowDefinition implements DataflowDefinition<VersionedFlowSnapshot> {
private final VersionedFlowSnapshot flowSnapshot;
public class StandardDataflowDefinition implements DataflowDefinition {
private final VersionedExternalFlow versionedExternalFlow;
private final Set<String> failurePortNames;
private final List<ParameterContextDefinition> parameterContexts;
private final List<ReportingTaskDefinition> reportingTaskDefinitions;
private final List<ParameterValueProviderDefinition> parameterValueProviderDefinitions;
private final TransactionThresholds transactionThresholds;
private final String flowName;
private StandardDataflowDefinition(final Builder builder) {
flowSnapshot = requireNonNull(builder.flowSnapshot, "Flow Snapshot must be provided");
versionedExternalFlow = requireNonNull(builder.versionedExternalFlow, "Flow Snapshot must be provided");
failurePortNames = builder.failurePortNames == null ? Collections.emptySet() : builder.failurePortNames;
parameterContexts = builder.parameterContexts == null ? Collections.emptyList() : builder.parameterContexts;
reportingTaskDefinitions = builder.reportingTaskDefinitions == null ? Collections.emptyList() : builder.reportingTaskDefinitions;
transactionThresholds = builder.transactionThresholds == null ? TransactionThresholds.SINGLE_FLOWFILE : builder.transactionThresholds;
parameterValueProviderDefinitions = builder.parameterValueProviderDefinitions == null ? Collections.emptyList() : builder.parameterValueProviderDefinitions;
flowName = builder.flowName;
}
@Override
public VersionedFlowSnapshot getFlowSnapshot() {
return flowSnapshot;
public VersionedExternalFlow getVersionedExternalFlow() {
return versionedExternalFlow;
}
@Override
public String getFlowName() {
return flowName;
return versionedExternalFlow.getMetadata().getFlowName();
}
@Override
@ -71,14 +69,14 @@ public class StandardDataflowDefinition implements DataflowDefinition<VersionedF
@Override
public Set<String> getInputPortNames() {
return flowSnapshot.getFlowContents().getInputPorts().stream()
return versionedExternalFlow.getFlowContents().getInputPorts().stream()
.map(VersionedPort::getName)
.collect(Collectors.toSet());
}
@Override
public Set<String> getOutputPortNames() {
return flowSnapshot.getFlowContents().getOutputPorts().stream()
return versionedExternalFlow.getFlowContents().getOutputPorts().stream()
.map(VersionedPort::getName)
.collect(Collectors.toSet());
}
@ -105,7 +103,7 @@ public class StandardDataflowDefinition implements DataflowDefinition<VersionedF
public Set<Bundle> getReferencedBundles() {
final Set<Bundle> referenced = new HashSet<>();
final VersionedProcessGroup rootGroup = flowSnapshot.getFlowContents();
final VersionedProcessGroup rootGroup = versionedExternalFlow.getFlowContents();
discoverReferencedBundles(rootGroup, referenced);
return referenced;
}
@ -125,21 +123,15 @@ public class StandardDataflowDefinition implements DataflowDefinition<VersionedF
}
public static class Builder {
private VersionedFlowSnapshot flowSnapshot;
private VersionedExternalFlow versionedExternalFlow;
private Set<String> failurePortNames;
private List<ParameterContextDefinition> parameterContexts;
private List<ReportingTaskDefinition> reportingTaskDefinitions;
private List<ParameterValueProviderDefinition> parameterValueProviderDefinitions;
private TransactionThresholds transactionThresholds;
private String flowName;
public Builder flowSnapshot(final VersionedFlowSnapshot flowSnapshot) {
this.flowSnapshot = flowSnapshot;
return this;
}
public Builder flowName(final String flowName) {
this.flowName = flowName;
public Builder versionedExternalFlow(final VersionedExternalFlow versionedExternalFlow) {
this.versionedExternalFlow = versionedExternalFlow;
return this;
}

View File

@ -52,7 +52,6 @@ import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.InMemoryFlowRegistry;
import org.apache.nifi.registry.flow.StandardFlowRegistryClient;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
@ -88,17 +87,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
public class StandardStatelessDataflowFactory implements StatelessDataflowFactory<VersionedFlowSnapshot> {
public class StandardStatelessDataflowFactory implements StatelessDataflowFactory {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);
@Override
public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition,
public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition dataflowDefinition,
final ClassLoader extensionRootClassLoader)
throws IOException, StatelessConfigurationException {
final long start = System.currentTimeMillis();
final VersionedFlowSnapshot flowSnapshot = dataflowDefinition.getFlowSnapshot();
ProvenanceRepository provenanceRepo = null;
ContentRepository contentRepo = null;
StatelessProcessScheduler processScheduler = null;
@ -114,7 +111,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
}
final InMemoryFlowRegistry flowRegistry = new InMemoryFlowRegistry();
flowRegistry.addFlowSnapshot(flowSnapshot);
flowRegistry.addFlowSnapshot(dataflowDefinition.getVersionedExternalFlow());
final FlowRegistryClient flowRegistryClient = new StandardFlowRegistryClient();
flowRegistryClient.addFlowRegistry(flowRegistry);
@ -184,7 +181,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
}
final StatelessEngine<VersionedFlowSnapshot> statelessEngine = new StandardStatelessEngine.Builder()
final StatelessEngine statelessEngine = new StandardStatelessEngine.Builder()
.bulletinRepository(bulletinRepository)
.encryptor(lazyInitializedEncryptor)
.extensionManager(extensionManager)

View File

@ -107,7 +107,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
private final ProcessContextFactory processContextFactory;
private final RepositoryContextFactory repositoryContextFactory;
private final List<FlowFileQueue> internalFlowFileQueues;
private final DataflowDefinition<?> dataflowDefinition;
private final DataflowDefinition dataflowDefinition;
private final StatelessStateManagerProvider stateManagerProvider;
private final ObjectMapper objectMapper = new ObjectMapper();
private final ProcessScheduler processScheduler;
@ -122,7 +122,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
private volatile Boolean stateful = null;
public StandardStatelessFlow(final ProcessGroup rootGroup, final List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider controllerServiceProvider,
final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory, final DataflowDefinition<?> dataflowDefinition,
final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory, final DataflowDefinition dataflowDefinition,
final StatelessStateManagerProvider stateManagerProvider, final ProcessScheduler processScheduler, final BulletinRepository bulletinRepository) {
this.rootGroup = rootGroup;
this.allConnections = rootGroup.findAllConnections();

View File

@ -45,7 +45,7 @@ public class TestPropertiesFileFlowDefinitionParser {
final List<ParameterOverride> parameterOverrides = new ArrayList<>();
final StatelessEngineConfiguration engineConfig = createStatelessEngineConfiguration();
final DataflowDefinition<?> dataflowDefinition = parser.parseFlowDefinition(new File("src/test/resources/flow-configuration.properties"), engineConfig, parameterOverrides);
final DataflowDefinition dataflowDefinition = parser.parseFlowDefinition(new File("src/test/resources/flow-configuration.properties"), engineConfig, parameterOverrides);
assertEquals(new HashSet<>(Arrays.asList("foo", "bar", "baz")), dataflowDefinition.getFailurePortNames());
final List<ParameterContextDefinition> contextDefinitions = dataflowDefinition.getParameterContexts();

View File

@ -19,7 +19,9 @@ package org.apache.nifi.stateless;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.registry.VersionedFlowConverter;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
import org.apache.nifi.stateless.config.ExtensionClientDefinition;
@ -164,13 +166,22 @@ public class StatelessSystemIT {
}
protected StatelessDataflow loadDataflow(final VersionedFlowSnapshot versionedFlowSnapshot, final List<ParameterContextDefinition> parameterContexts,
final List<ParameterValueProviderDefinition> parameterValueProviderDefinitions, final Set<String> failurePortNames,
final TransactionThresholds transactionThresholds)
throws IOException, StatelessConfigurationException {
final VersionedExternalFlow externalFlow = VersionedFlowConverter.createVersionedExternalFlow(versionedFlowSnapshot);
return loadDataflow(externalFlow, parameterContexts, parameterValueProviderDefinitions, failurePortNames, transactionThresholds);
}
protected StatelessDataflow loadDataflow(final VersionedExternalFlow versionedExternalFlow, final List<ParameterContextDefinition> parameterContexts,
final List<ParameterValueProviderDefinition> parameterValueProviderDefinitions, final Set<String> failurePortNames,
final TransactionThresholds transactionThresholds) throws IOException, StatelessConfigurationException {
final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition = new DataflowDefinition<VersionedFlowSnapshot>() {
final DataflowDefinition dataflowDefinition = new DataflowDefinition() {
@Override
public VersionedFlowSnapshot getFlowSnapshot() {
return versionedFlowSnapshot;
public VersionedExternalFlow getVersionedExternalFlow() {
return versionedExternalFlow;
}
@Override
@ -185,14 +196,14 @@ public class StatelessSystemIT {
@Override
public Set<String> getInputPortNames() {
return versionedFlowSnapshot.getFlowContents().getInputPorts().stream()
return versionedExternalFlow.getFlowContents().getInputPorts().stream()
.map(VersionedPort::getName)
.collect(Collectors.toSet());
}
@Override
public Set<String> getOutputPortNames() {
return versionedFlowSnapshot.getFlowContents().getOutputPorts().stream()
return versionedExternalFlow.getFlowContents().getOutputPorts().stream()
.map(VersionedPort::getName)
.collect(Collectors.toSet());
}

View File

@ -29,7 +29,7 @@ import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;

View File

@ -20,8 +20,8 @@ package org.apache.nifi.stateless.parameters;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedParameter;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;