NIFI-11464 Improvements for importing nested versioned flows (#7218)

* NIFI-11464 Improvements for importing nested versioned flows
- Introduce FlowSnapshotContainer to return root snapshot + children
- Introduce ControllerServiceResolver to extract logic from service facade
- Update resolution logic to correctly consider all services in the hierarchy
- Merge additional parameter contexts and parameter providers from child to parent
- Add unit test for controller service resolver
- Replace use of emptSet/emptyMap with new set/map instance
This commit is contained in:
Bryan Bende 2023-06-05 10:11:33 -04:00
parent 5d4ced2f22
commit 64b110c292
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
23 changed files with 1835 additions and 179 deletions

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.service;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.bundle.BundleDetails;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ControllerServiceAPI;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.util.BundleUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
public class StandardControllerServiceApiLookup implements ControllerServiceApiLookup {
private final ExtensionManager extensionManager;
public StandardControllerServiceApiLookup(final ExtensionManager extensionManager) {
this.extensionManager = extensionManager;
}
@Override
public Map<String, ControllerServiceAPI> getRequiredServiceApis(final String type, final Bundle bundle) {
final Optional<BundleCoordinate> compatibleBundle = BundleUtils.getOptionalCompatibleBundle(extensionManager, type, BundleUtils.createBundleDto(bundle));
if (!compatibleBundle.isPresent()) {
return Collections.emptyMap();
}
final Map<String, ControllerServiceAPI> serviceApis = new HashMap<>();
final ConfigurableComponent tempComponent = extensionManager.getTempComponent(type, compatibleBundle.get());
for (final PropertyDescriptor descriptor : tempComponent.getPropertyDescriptors()) {
final Class<? extends ControllerService> requiredServiceApiClass = descriptor.getControllerServiceDefinition();
if (requiredServiceApiClass == null) {
continue;
}
final ClassLoader serviceApiClassLoader = requiredServiceApiClass.getClassLoader();
final org.apache.nifi.bundle.Bundle serviceApiBundle = extensionManager.getBundle(serviceApiClassLoader);
final BundleDetails serviceApiBundleDetails = serviceApiBundle.getBundleDetails();
final BundleCoordinate serviceApiBundleCoordinate = serviceApiBundleDetails.getCoordinate();
final ControllerServiceAPI serviceApi = new ControllerServiceAPI();
serviceApi.setType(requiredServiceApiClass.getCanonicalName());
serviceApi.setBundle(new Bundle(serviceApiBundleCoordinate.getGroup(), serviceApiBundleCoordinate.getId(), serviceApiBundleCoordinate.getVersion()));
serviceApis.put(descriptor.getName(), serviceApi);
}
return serviceApis;
}
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.service;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.flow.ControllerServiceAPI;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.VersionedConfigurableExtension;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;
public class StandardControllerServiceResolver implements ControllerServiceResolver {
private final Authorizer authorizer;
private final FlowManager flowManager;
private final NiFiRegistryFlowMapper flowMapper;
private final ControllerServiceProvider controllerServiceProvider;
private final ControllerServiceApiLookup controllerServiceApiLookup;
public StandardControllerServiceResolver(final Authorizer authorizer,
final FlowManager flowManager,
final NiFiRegistryFlowMapper flowMapper,
final ControllerServiceProvider controllerServiceProvider,
final ControllerServiceApiLookup controllerServiceApiLookup) {
this.authorizer = authorizer;
this.flowManager = flowManager;
this.flowMapper = flowMapper;
this.controllerServiceProvider = controllerServiceProvider;
this.controllerServiceApiLookup = controllerServiceApiLookup;
}
@Override
public void resolveInheritedControllerServices(final FlowSnapshotContainer flowSnapshotContainer, final String parentGroupId, final NiFiUser user) {
final RegisteredFlowSnapshot topLevelSnapshot = flowSnapshotContainer.getFlowSnapshot();
final VersionedProcessGroup versionedGroup = topLevelSnapshot.getFlowContents();
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences = topLevelSnapshot.getExternalControllerServices();
final ProcessGroup parentGroup = flowManager.getGroup(parentGroupId);
final Set<VersionedControllerService> ancestorServices = parentGroup.getControllerServices(true).stream()
.filter(serviceNode -> serviceNode.isAuthorized(authorizer, RequestAction.READ, user))
.map(serviceNode -> flowMapper.mapControllerService(serviceNode, controllerServiceProvider, new HashSet<>(), new HashMap<>()))
.collect(Collectors.toSet());
final Stack<Set<VersionedControllerService>> serviceHierarchyStack = new Stack<>();
serviceHierarchyStack.push(ancestorServices);
resolveInheritedControllerServices(flowSnapshotContainer, versionedGroup, externalControllerServiceReferences, serviceHierarchyStack);
}
private void resolveInheritedControllerServices(final FlowSnapshotContainer flowSnapshotContainer, final VersionedProcessGroup versionedGroup,
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences,
final Stack<Set<VersionedControllerService>> serviceHierarchyStack) {
final Set<VersionedControllerService> currentGroupServices = versionedGroup.getControllerServices() == null ? Collections.emptySet() : versionedGroup.getControllerServices();
serviceHierarchyStack.push(currentGroupServices);
final Set<VersionedControllerService> availableControllerServices = serviceHierarchyStack.stream()
.flatMap(Set::stream)
.collect(Collectors.toSet());
for (final VersionedProcessor processor : versionedGroup.getProcessors()) {
resolveInheritedControllerServices(processor, availableControllerServices, externalControllerServiceReferences);
}
for (final VersionedControllerService service : versionedGroup.getControllerServices()) {
resolveInheritedControllerServices(service, availableControllerServices, externalControllerServiceReferences);
}
// If the child group is under version, the external service references need to come from the snapshot of the
// child instead of what was passed into this method which was for the parent group
for (final VersionedProcessGroup child : versionedGroup.getProcessGroups()) {
final Map<String, ExternalControllerServiceReference> childExternalServices;
if (child.getVersionedFlowCoordinates() == null) {
childExternalServices = externalControllerServiceReferences;
} else {
final RegisteredFlowSnapshot childSnapshot = flowSnapshotContainer.getChildSnapshot(child.getIdentifier());
if (childSnapshot == null) {
childExternalServices = Collections.emptyMap();
} else {
childExternalServices = childSnapshot.getExternalControllerServices();
}
}
resolveInheritedControllerServices(flowSnapshotContainer, child, childExternalServices, serviceHierarchyStack);
}
serviceHierarchyStack.pop();
}
private void resolveInheritedControllerServices(final VersionedConfigurableExtension component, final Set<VersionedControllerService> availableControllerServices,
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences) {
final Map<String, ControllerServiceAPI> componentRequiredApis = controllerServiceApiLookup.getRequiredServiceApis(component.getType(), component.getBundle());
if (componentRequiredApis.isEmpty()) {
return;
}
final Map<String, VersionedPropertyDescriptor> propertyDescriptors = component.getPropertyDescriptors();
final Map<String, String> componentProperties = component.getProperties();
for (final Map.Entry<String, String> entry : componentProperties.entrySet()) {
final String propertyName = entry.getKey();
final String propertyValue = entry.getValue();
final VersionedPropertyDescriptor propertyDescriptor = propertyDescriptors.get(propertyName);
if (propertyDescriptor == null) {
continue;
}
if (!propertyDescriptor.getIdentifiesControllerService()) {
continue;
}
final Set<String> availableControllerServiceIds = availableControllerServices.stream()
.map(VersionedControllerService::getIdentifier)
.collect(Collectors.toSet());
// If the referenced Controller Service is available, there is nothing to resolve.
if (availableControllerServiceIds.contains(propertyValue)) {
continue;
}
final ExternalControllerServiceReference externalServiceReference = externalControllerServiceReferences == null ? null : externalControllerServiceReferences.get(propertyValue);
if (externalServiceReference == null) {
continue;
}
final ControllerServiceAPI descriptorRequiredApi = componentRequiredApis.get(propertyName);
if (descriptorRequiredApi == null) {
continue;
}
final String externalControllerServiceName = externalServiceReference.getName();
final List<VersionedControllerService> matchingControllerServices = availableControllerServices.stream()
.filter(service -> service.getName().equals(externalControllerServiceName))
.filter(service -> implementsApi(descriptorRequiredApi, service))
.collect(Collectors.toList());
if (matchingControllerServices.size() != 1) {
continue;
}
final VersionedControllerService matchingService = matchingControllerServices.get(0);
final String resolvedId = matchingService.getIdentifier();;
componentProperties.put(propertyName, resolvedId);
}
}
private boolean implementsApi(final ControllerServiceAPI requiredServiceApi, final VersionedControllerService versionedControllerService) {
if (versionedControllerService.getControllerServiceApis() == null) {
return false;
}
for (final ControllerServiceAPI implementedApi : versionedControllerService.getControllerServiceApis()) {
if (implementedApi.getType().equals(requiredServiceApi.getType())
&& implementedApi.getBundle().getGroup().equals(requiredServiceApi.getBundle().getGroup())
&& implementedApi.getBundle().getArtifact().equals(requiredServiceApi.getBundle().getArtifact())) {
return true;
}
}
return false;
}
}

View File

@ -94,6 +94,7 @@ import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.flow.FlowRegistryClientContextFactory;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
import org.apache.nifi.registry.flow.FlowRegistryException;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
@ -513,33 +514,20 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ProcessGroup> childGroupsByVersionedId,
final Map<String, ParameterProviderReference> parameterProviderReferences, final ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
final Map<String, ProcessGroup> childGroupsByVersionedId, final Map<String, ParameterProviderReference> parameterProviderReferences,
final ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates();
// if there is a nested process group that is version controlled, make sure get the param contexts that go with that snapshot
// instead of the ones from the parent which would have been passed in to this method
Map<String, VersionedParameterContext> childParameterContexts = versionedParameterContexts;
if (childCoordinates != null && syncOptions.isUpdateDescendantVersionedFlows()) {
final String childParameterContextName = proposedChildGroup.getParameterContextName();
if (childParameterContextName != null && !versionedParameterContexts.containsKey(childParameterContextName)) {
childParameterContexts = getVersionedParameterContexts(childCoordinates);
} else {
childParameterContexts = versionedParameterContexts;
}
}
if (childGroup == null) {
final ProcessGroup added = addProcessGroup(group, proposedChildGroup, context.getComponentIdGenerator(), preExistingVariables,
childParameterContexts, parameterProviderReferences, topLevelGroup);
versionedParameterContexts, parameterProviderReferences, topLevelGroup);
context.getFlowManager().onProcessGroupAdded(added);
added.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::initialize);
LOG.info("Added {} to {}", added, group);
} else if (childCoordinates == null || syncOptions.isUpdateDescendantVersionedFlows()) {
final StandardVersionedComponentSynchronizer sync = new StandardVersionedComponentSynchronizer(context);
sync.setPreExistingVariables(preExistingVariables);
sync.setUpdatedVersionedComponentIds(updatedVersionedComponentIds);
@ -548,7 +536,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
.build();
sync.setSynchronizationOptions(options);
sync.synchronize(childGroup, proposedChildGroup, childParameterContexts, parameterProviderReferences, topLevelGroup);
sync.synchronize(childGroup, proposedChildGroup, versionedParameterContexts, parameterProviderReferences, topLevelGroup);
LOG.info("Updated {}", childGroup);
}
@ -2229,7 +2217,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final int flowVersion = versionedFlowCoordinates.getVersion();
try {
final RegisteredFlowSnapshot childSnapshot = flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getAnonymousContext(), bucketId, flowId, flowVersion, false);
final FlowSnapshotContainer snapshotContainer = flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getAnonymousContext(), bucketId, flowId, flowVersion, false);
final RegisteredFlowSnapshot childSnapshot = snapshotContainer.getFlowSnapshot();
return childSnapshot.getParameterContexts();
} catch (final FlowRegistryException e) {
throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "

View File

@ -87,6 +87,7 @@ import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.flow.FlowRegistryClientContextFactory;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
import org.apache.nifi.registry.flow.FlowRegistryException;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
@ -3864,8 +3865,9 @@ public final class StandardProcessGroup implements ProcessGroup {
throw new FlowRegistryException(flowRegistry + " cannot currently be used to synchronize with Flow Registry because it is currently validating");
}
final RegisteredFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(
final FlowSnapshotContainer registrySnapshotContainer = flowRegistry.getFlowContents(
FlowRegistryClientContextFactory.getAnonymousContext(), vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion(), false);
final RegisteredFlowSnapshot registrySnapshot = registrySnapshotContainer.getFlowSnapshot();
final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents();
vci.setFlowSnapshot(registryFlow);
} catch (final IOException | FlowRegistryException e) {

View File

@ -216,19 +216,20 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode
}
@Override
public RegisteredFlowSnapshot getFlowContents(
public FlowSnapshotContainer getFlowContents(
final FlowRegistryClientUserContext context, final String bucketId, final String flowId, final int version, final boolean fetchRemoteFlows
) throws FlowRegistryException, IOException {
final RegisteredFlowSnapshot flowSnapshot = execute(() ->client.get().getComponent().getFlowContents(getConfigurationContext(context), bucketId, flowId, version));
final RegisteredFlowSnapshot flowSnapshot = execute(() -> client.get().getComponent().getFlowContents(getConfigurationContext(context), bucketId, flowId, version));
final FlowSnapshotContainer snapshotContainer = new FlowSnapshotContainer(flowSnapshot);
if (fetchRemoteFlows) {
final VersionedProcessGroup contents = flowSnapshot.getFlowContents();
for (final VersionedProcessGroup child : contents.getProcessGroups()) {
populateVersionedContentsRecursively(context, child);
populateVersionedContentsRecursively(context, child, snapshotContainer);
}
}
return flowSnapshot;
return snapshotContainer;
}
@Override
@ -296,7 +297,8 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode
return context.getNiFiUserIdentity().orElse(null);
}
private void populateVersionedContentsRecursively(final FlowRegistryClientUserContext context, final VersionedProcessGroup group) throws IOException, FlowRegistryException {
private void populateVersionedContentsRecursively(final FlowRegistryClientUserContext context, final VersionedProcessGroup group,
final FlowSnapshotContainer snapshotContainer) throws FlowRegistryException {
if (group == null) {
return;
}
@ -326,10 +328,12 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode
group.setDefaultBackPressureObjectThreshold(contents.getDefaultBackPressureObjectThreshold());
group.setDefaultBackPressureDataSizeThreshold(contents.getDefaultBackPressureDataSizeThreshold());
coordinates.setLatest(snapshot.isLatest());
snapshotContainer.addChildSnapshot(snapshot, group);
}
for (final VersionedProcessGroup child : group.getProcessGroups()) {
populateVersionedContentsRecursively(context, child);
populateVersionedContentsRecursively(context, child, snapshotContainer);
}
}
@ -345,7 +349,8 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode
for (final FlowRegistryClientNode clientNode : clientNodes) {
try {
logger.debug("Attempting to fetch flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
final RegisteredFlowSnapshot snapshot = clientNode.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows);
final FlowSnapshotContainer snapshotContainer = clientNode.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows);
final RegisteredFlowSnapshot snapshot = snapshotContainer.getFlowSnapshot();
coordinates.setRegistryId(clientNode.getIdentifier());
logger.debug("Successfully fetched flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);

View File

@ -0,0 +1,296 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ControllerServiceAPI;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class StandardControllerServiceResolverTest {
private static final String BASE_SNAPSHOT_LOCATION = "src/test/resources/snapshots";
private static final String CHILD_REFERENCES_SERVICES_FROM_PARENT_LOCATION = BASE_SNAPSHOT_LOCATION + "/versioned-child-services-from-parent";
private static final String STANDARD_EXTERNAL_SERVICE_REFERENCE = BASE_SNAPSHOT_LOCATION + "/standard-external-service-reference";
private Authorizer authorizer;
private FlowManager flowManager;
private NiFiRegistryFlowMapper flowMapper;
private ControllerServiceProvider controllerServiceProvider;
private ControllerServiceApiLookup controllerServiceApiLookup;
private NiFiUser nifiUser;
private ProcessGroup parentGroup;
private ControllerServiceResolver serviceResolver;
private final ObjectMapper objectMapper = new ObjectMapper();
@BeforeEach
public void setup() {
authorizer = mock(Authorizer.class);
flowManager = mock(FlowManager.class);
flowMapper = mock(NiFiRegistryFlowMapper.class);
controllerServiceProvider = mock(ControllerServiceProvider.class);
controllerServiceApiLookup = mock(ControllerServiceApiLookup.class);
nifiUser = mock(NiFiUser.class);
parentGroup = mock(ProcessGroup.class);
when(parentGroup.getIdentifier()).thenReturn("parentGroup");
when(parentGroup.getControllerServices(true)).thenReturn(Collections.emptySet());
when(flowManager.getGroup(parentGroup.getIdentifier())).thenReturn(parentGroup);
serviceResolver = new StandardControllerServiceResolver(authorizer, flowManager, flowMapper,
controllerServiceProvider, controllerServiceApiLookup);
}
@Test
public void testVersionedControlledChildResolveServicesFromParent() throws IOException {
// Load individual snapshots
final RegisteredFlowSnapshot parent = loadSnapshot(CHILD_REFERENCES_SERVICES_FROM_PARENT_LOCATION + "/parent.json");
final RegisteredFlowSnapshot child = loadSnapshot(CHILD_REFERENCES_SERVICES_FROM_PARENT_LOCATION + "/child.json");
// Find the child group inside parent where the child snapshot would have been loaded, and simulate loading the contents
final VersionedProcessGroup matchingChildGroup = findChildGroupByName(parent.getFlowContents(), child.getFlowContents().getName());
assertNotNull(matchingChildGroup);
matchingChildGroup.setProcessors(child.getFlowContents().getProcessors());
matchingChildGroup.setControllerServices(child.getFlowContents().getControllerServices());
// Create container with snapshots
final FlowSnapshotContainer snapshotContainer = new FlowSnapshotContainer(parent);
snapshotContainer.addChildSnapshot(child, matchingChildGroup);
// Before resolving, verify that child processor references different service ids
final VersionedProcessor childConvertRecord = findProcessorByName(matchingChildGroup, "ConvertRecord");
assertNotNull(childConvertRecord);
final String childConvertRecordReaderId = childConvertRecord.getProperties().get("record-reader");
final String childConvertRecordWriterId = childConvertRecord.getProperties().get("record-writer");
assertNotNull(childConvertRecordReaderId);
assertNotNull(childConvertRecordWriterId);
final VersionedControllerService parentReader = findServiceByName(parent.getFlowContents(), "MyAvroReader");
final VersionedControllerService parentWriter = findServiceByName(parent.getFlowContents(), "MyAvroRecordSetWriter");
assertNotNull(parentReader);
assertNotNull(parentWriter);
assertNotEquals(childConvertRecordReaderId, parentReader.getIdentifier());
assertNotEquals(childConvertRecordWriterId, parentWriter.getIdentifier());
// Setup the ControllerServiceAPI lookup for ConvertRecord
final Map<String, ControllerServiceAPI> convertRecordRequiredApis = new HashMap<>();
convertRecordRequiredApis.put("record-reader", createServiceApi("org.apache.nifi.serialization.RecordReaderFactory",
"org.apache.nifi", "nifi-standard-services-api-nar", "1.21.0-SNAPSHOT"));
convertRecordRequiredApis.put("record-writer", createServiceApi("org.apache.nifi.serialization.RecordSetWriterFactory",
"org.apache.nifi", "nifi-standard-services-api-nar", "1.21.0-SNAPSHOT"));
when(controllerServiceApiLookup.getRequiredServiceApis(childConvertRecord.getType(), childConvertRecord.getBundle()))
.thenReturn(convertRecordRequiredApis);
// Resolve inherited services
serviceResolver.resolveInheritedControllerServices(snapshotContainer, parentGroup.getIdentifier(), nifiUser);
// Verify child processor now references ids from parent
final String resolvedConvertRecordReaderId = childConvertRecord.getProperties().get("record-reader");
assertEquals(parentReader.getIdentifier(), resolvedConvertRecordReaderId);
final String resolvedConvertRecordWriterId = childConvertRecord.getProperties().get("record-writer");
assertEquals(parentWriter.getIdentifier(), resolvedConvertRecordWriterId);
}
@Test
public void testExternalServiceResolvedFromOutsideSnapshot() throws IOException {
final RegisteredFlowSnapshot snapshot = loadSnapshot(STANDARD_EXTERNAL_SERVICE_REFERENCE + "/convert-record-external-schema-registry.json");
final FlowSnapshotContainer snapshotContainer = new FlowSnapshotContainer(snapshot);
// Get the AvroReader and the id of the SR it is using before resolving external services
final VersionedControllerService avroReader = findServiceByName(snapshot.getFlowContents(), "AvroReader");
assertNotNull(avroReader);
final String avroReaderSchemaRegistryId = avroReader.getProperties().get("schema-registry");
assertNotNull(avroReaderSchemaRegistryId);
// Setup the ControllerServiceAPI lookup for AvroReader
final ControllerServiceAPI schemaRegistryServiceApi = createServiceApi("org.apache.nifi.schemaregistry.services.SchemaRegistry",
"org.apache.nifi", "nifi-standard-services-api-nar", "1.21.0-SNAPSHOT");
final Map<String, ControllerServiceAPI> avroReaderRequiredApis = new HashMap<>();
avroReaderRequiredApis.put("schema-registry", schemaRegistryServiceApi);
when(controllerServiceApiLookup.getRequiredServiceApis(avroReader.getType(), avroReader.getBundle()))
.thenReturn(avroReaderRequiredApis);
// Setup an existing service from the parent group that implements the same API with same name
final ControllerServiceNode schemaRegistryServiceNode = mock(ControllerServiceNode.class);
when(parentGroup.getControllerServices(true)).thenReturn(Collections.singleton(schemaRegistryServiceNode));
when(schemaRegistryServiceNode.isAuthorized(any(Authorizer.class), any(RequestAction.class), any(NiFiUser.class))).thenReturn(true);
final VersionedControllerService schemaRegistryVersionedService = new VersionedControllerService();
schemaRegistryVersionedService.setIdentifier("external-schema-registry");
schemaRegistryVersionedService.setName("AvroSchemaRegistry");
schemaRegistryVersionedService.setControllerServiceApis(Collections.singletonList(schemaRegistryServiceApi));
when(flowMapper.mapControllerService(schemaRegistryServiceNode, controllerServiceProvider, Collections.emptySet(), Collections.emptyMap()))
.thenReturn(schemaRegistryVersionedService);
// Resolve inherited services
serviceResolver.resolveInheritedControllerServices(snapshotContainer, parentGroup.getIdentifier(), nifiUser);
// Verify the SR id in the AvroReader has been resolved to the external service id
final String avroReaderSchemaRegistryIdAfterResolution = avroReader.getProperties().get("schema-registry");
assertNotNull(avroReaderSchemaRegistryIdAfterResolution);
assertNotEquals(avroReaderSchemaRegistryId, avroReaderSchemaRegistryIdAfterResolution);
assertEquals(schemaRegistryVersionedService.getIdentifier(), avroReaderSchemaRegistryIdAfterResolution);
}
@Test
public void testExternalServiceNotResolvedFromOutsideSnapshotBecauseMultipleWithSameNameAndType() throws IOException {
final RegisteredFlowSnapshot snapshot = loadSnapshot(STANDARD_EXTERNAL_SERVICE_REFERENCE + "/convert-record-external-schema-registry.json");
final FlowSnapshotContainer snapshotContainer = new FlowSnapshotContainer(snapshot);
// Get the AvroReader and the id of the SR it is using before resolving external services
final VersionedControllerService avroReader = findServiceByName(snapshot.getFlowContents(), "AvroReader");
assertNotNull(avroReader);
final String avroReaderSchemaRegistryId = avroReader.getProperties().get("schema-registry");
assertNotNull(avroReaderSchemaRegistryId);
// Setup the ControllerServiceAPI lookup for AvroReader
final ControllerServiceAPI schemaRegistryServiceApi = createServiceApi("org.apache.nifi.schemaregistry.services.SchemaRegistry",
"org.apache.nifi", "nifi-standard-services-api-nar", "1.21.0-SNAPSHOT");
final Map<String, ControllerServiceAPI> avroReaderRequiredApis = new HashMap<>();
avroReaderRequiredApis.put("schema-registry", schemaRegistryServiceApi);
when(controllerServiceApiLookup.getRequiredServiceApis(avroReader.getType(), avroReader.getBundle()))
.thenReturn(avroReaderRequiredApis);
// Setup two existing services from the parent group that implements the same API with same name
final ControllerServiceNode schemaRegistryServiceNode1 = mock(ControllerServiceNode.class);
final ControllerServiceNode schemaRegistryServiceNode2 = mock(ControllerServiceNode.class);
when(parentGroup.getControllerServices(true)).thenReturn(new HashSet<>(Arrays.asList(schemaRegistryServiceNode1, schemaRegistryServiceNode2)));
when(schemaRegistryServiceNode1.isAuthorized(any(Authorizer.class), any(RequestAction.class), any(NiFiUser.class))).thenReturn(true);
when(schemaRegistryServiceNode2.isAuthorized(any(Authorizer.class), any(RequestAction.class), any(NiFiUser.class))).thenReturn(true);
final VersionedControllerService schemaRegistryVersionedService1 = new VersionedControllerService();
schemaRegistryVersionedService1.setIdentifier("external-schema-registry-1");
schemaRegistryVersionedService1.setName("AvroSchemaRegistry");
schemaRegistryVersionedService1.setControllerServiceApis(Collections.singletonList(schemaRegistryServiceApi));
when(flowMapper.mapControllerService(schemaRegistryServiceNode1, controllerServiceProvider, Collections.emptySet(), Collections.emptyMap()))
.thenReturn(schemaRegistryVersionedService1);
final VersionedControllerService schemaRegistryVersionedService2 = new VersionedControllerService();
schemaRegistryVersionedService2.setIdentifier("external-schema-registry-2");
schemaRegistryVersionedService2.setName("AvroSchemaRegistry");
schemaRegistryVersionedService2.setControllerServiceApis(Collections.singletonList(schemaRegistryServiceApi));
when(flowMapper.mapControllerService(schemaRegistryServiceNode2, controllerServiceProvider, Collections.emptySet(), Collections.emptyMap()))
.thenReturn(schemaRegistryVersionedService2);
// Resolve inherited services
serviceResolver.resolveInheritedControllerServices(snapshotContainer, parentGroup.getIdentifier(), nifiUser);
// Verify the SR id in the AvroReader has not been resolved due to there being to possible choices
final String avroReaderSchemaRegistryIdAfterResolution = avroReader.getProperties().get("schema-registry");
assertNotNull(avroReaderSchemaRegistryIdAfterResolution);
assertEquals(avroReaderSchemaRegistryId, avroReaderSchemaRegistryIdAfterResolution);
}
private RegisteredFlowSnapshot loadSnapshot(final String snapshotFile) throws IOException {
return objectMapper.readValue(new File(snapshotFile), RegisteredFlowSnapshot.class);
}
private VersionedProcessGroup findChildGroupByName(final VersionedProcessGroup group, final String childGroupName) {
if (group.getProcessGroups() == null || group.getProcessGroups().isEmpty()) {
return null;
}
for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
if (childGroup.getName().equals(childGroupName)) {
return childGroup;
}
}
for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
final VersionedProcessGroup matchingChild = findChildGroupByName(childGroup, childGroupName);
if (matchingChild != null) {
return matchingChild;
}
}
return null;
}
private VersionedControllerService findServiceByName(final VersionedProcessGroup group, final String serviceName) {
if (group.getControllerServices() == null) {
return null;
}
return group.getControllerServices().stream()
.filter(service -> service.getName().equals(serviceName))
.findFirst()
.orElse(null);
}
private VersionedProcessor findProcessorByName(final VersionedProcessGroup group, final String processorName) {
if (group.getProcessors() == null) {
return null;
}
return group.getProcessors().stream()
.filter(processor -> processor.getName().equals(processorName))
.findFirst()
.orElse(null);
}
private ControllerServiceAPI createServiceApi(final String type, final String group, final String artifact, final String version) {
final ControllerServiceAPI serviceAPI = new ControllerServiceAPI();
serviceAPI.setType(type);
serviceAPI.setBundle(new Bundle(group, artifact, version));
return serviceAPI;
}
}

View File

@ -0,0 +1,321 @@
{
"externalControllerServices": {
"8a67de89-f5e7-3e42-892b-cfddad74c496": {
"identifier": "8a67de89-f5e7-3e42-892b-cfddad74c496",
"name": "AvroSchemaRegistry"
}
},
"flowContents": {
"comments": "",
"componentType": "PROCESS_GROUP",
"connections": [],
"controllerServices": [
{
"bulletinLevel": "WARN",
"bundle": {
"artifact": "nifi-record-serialization-services-nar",
"group": "org.apache.nifi",
"version": "2.0.0-SNAPSHOT"
},
"comments": "",
"componentType": "CONTROLLER_SERVICE",
"controllerServiceApis": [
{
"bundle": {
"artifact": "nifi-standard-services-api-nar",
"group": "org.apache.nifi",
"version": "2.0.0-SNAPSHOT"
},
"type": "org.apache.nifi.serialization.RecordReaderFactory"
}
],
"groupIdentifier": "cde10c81-4332-371f-bef9-8e7cccc83924",
"identifier": "3e1ca514-b79e-3b6a-acec-4efb66b8d922",
"instanceIdentifier": "c9a88942-0187-1000-dfb0-14f70307d4f8",
"name": "AvroReader",
"properties": {
"schema-name": "${schema.name}",
"cache-size": "1000",
"schema-registry": "8a67de89-f5e7-3e42-892b-cfddad74c496",
"schema-access-strategy": "schema-name",
"schema-text": "${avro.schema}"
},
"propertyDescriptors": {
"schema-branch": {
"displayName": "Schema Branch",
"identifiesControllerService": false,
"name": "schema-branch",
"sensitive": false
},
"schema-name": {
"displayName": "Schema Name",
"identifiesControllerService": false,
"name": "schema-name",
"sensitive": false
},
"cache-size": {
"displayName": "Cache Size",
"identifiesControllerService": false,
"name": "cache-size",
"sensitive": false
},
"schema-registry": {
"displayName": "Schema Registry",
"identifiesControllerService": true,
"name": "schema-registry",
"sensitive": false
},
"schema-access-strategy": {
"displayName": "Schema Access Strategy",
"identifiesControllerService": false,
"name": "schema-access-strategy",
"sensitive": false
},
"schema-version": {
"displayName": "Schema Version",
"identifiesControllerService": false,
"name": "schema-version",
"sensitive": false
},
"schema-text": {
"displayName": "Schema Text",
"identifiesControllerService": false,
"name": "schema-text",
"sensitive": false
}
},
"scheduledState": "DISABLED",
"type": "org.apache.nifi.avro.AvroReader"
},
{
"bulletinLevel": "WARN",
"bundle": {
"artifact": "nifi-record-serialization-services-nar",
"group": "org.apache.nifi",
"version": "2.0.0-SNAPSHOT"
},
"componentType": "CONTROLLER_SERVICE",
"controllerServiceApis": [
{
"bundle": {
"artifact": "nifi-standard-services-api-nar",
"group": "org.apache.nifi",
"version": "2.0.0-SNAPSHOT"
},
"type": "org.apache.nifi.serialization.RecordSetWriterFactory"
}
],
"groupIdentifier": "cde10c81-4332-371f-bef9-8e7cccc83924",
"identifier": "77c4ccd4-4dc1-3a2a-8fd3-6b9c020ef6dc",
"instanceIdentifier": "c9a912c4-0187-1000-b58b-295f675c6fc6",
"name": "JsonRecordSetWriter",
"properties": {
"compression-level": "1",
"Pretty Print JSON": "false",
"compression-format": "none",
"Schema Write Strategy": "no-schema",
"suppress-nulls": "never-suppress",
"output-grouping": "output-array",
"schema-name": "${schema.name}",
"schema-access-strategy": "inherit-record-schema",
"schema-protocol-version": "1",
"schema-text": "${avro.schema}"
},
"propertyDescriptors": {
"schema-branch": {
"displayName": "Schema Branch",
"identifiesControllerService": false,
"name": "schema-branch",
"sensitive": false
},
"compression-level": {
"displayName": "Compression Level",
"identifiesControllerService": false,
"name": "compression-level",
"sensitive": false
},
"schema-cache": {
"displayName": "Schema Cache",
"identifiesControllerService": true,
"name": "schema-cache",
"sensitive": false
},
"Timestamp Format": {
"displayName": "Timestamp Format",
"identifiesControllerService": false,
"name": "Timestamp Format",
"sensitive": false
},
"Date Format": {
"displayName": "Date Format",
"identifiesControllerService": false,
"name": "Date Format",
"sensitive": false
},
"Pretty Print JSON": {
"displayName": "Pretty Print JSON",
"identifiesControllerService": false,
"name": "Pretty Print JSON",
"sensitive": false
},
"compression-format": {
"displayName": "Compression Format",
"identifiesControllerService": false,
"name": "compression-format",
"sensitive": false
},
"Schema Write Strategy": {
"displayName": "Schema Write Strategy",
"identifiesControllerService": false,
"name": "Schema Write Strategy",
"sensitive": false
},
"suppress-nulls": {
"displayName": "Suppress Null Values",
"identifiesControllerService": false,
"name": "suppress-nulls",
"sensitive": false
},
"output-grouping": {
"displayName": "Output Grouping",
"identifiesControllerService": false,
"name": "output-grouping",
"sensitive": false
},
"schema-name": {
"displayName": "Schema Name",
"identifiesControllerService": false,
"name": "schema-name",
"sensitive": false
},
"schema-registry": {
"displayName": "Schema Registry",
"identifiesControllerService": true,
"name": "schema-registry",
"sensitive": false
},
"Time Format": {
"displayName": "Time Format",
"identifiesControllerService": false,
"name": "Time Format",
"sensitive": false
},
"schema-access-strategy": {
"displayName": "Schema Access Strategy",
"identifiesControllerService": false,
"name": "schema-access-strategy",
"sensitive": false
},
"schema-protocol-version": {
"displayName": "Schema Protocol Version",
"identifiesControllerService": false,
"name": "schema-protocol-version",
"sensitive": false
},
"schema-version": {
"displayName": "Schema Version",
"identifiesControllerService": false,
"name": "schema-version",
"sensitive": false
},
"schema-text": {
"displayName": "Schema Text",
"identifiesControllerService": false,
"name": "schema-text",
"sensitive": false
}
},
"scheduledState": "DISABLED",
"type": "org.apache.nifi.json.JsonRecordSetWriter"
}
],
"defaultBackPressureDataSizeThreshold": "1 GB",
"defaultBackPressureObjectThreshold": 10000,
"defaultFlowFileExpiration": "0 sec",
"flowFileConcurrency": "UNBOUNDED",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"funnels": [],
"identifier": "cde10c81-4332-371f-bef9-8e7cccc83924",
"inputPorts": [],
"instanceIdentifier": "c9a82721-0187-1000-d065-cc6f16cb06d7",
"labels": [],
"name": "Convert Record - External Schema Registry",
"outputPorts": [],
"position": {
"x": 312.0,
"y": 222.0
},
"processGroups": [],
"processors": [
{
"autoTerminatedRelationships": [],
"backoffMechanism": "PENALIZE_FLOWFILE",
"bulletinLevel": "WARN",
"bundle": {
"artifact": "nifi-standard-nar",
"group": "org.apache.nifi",
"version": "2.0.0-SNAPSHOT"
},
"comments": "",
"componentType": "PROCESSOR",
"concurrentlySchedulableTaskCount": 1,
"executionNode": "ALL",
"groupIdentifier": "cde10c81-4332-371f-bef9-8e7cccc83924",
"identifier": "38cde0b4-b4b3-3481-9b90-504f9386022c",
"instanceIdentifier": "c9a84b1c-0187-1000-c999-1b1b1cba8843",
"maxBackoffPeriod": "10 mins",
"name": "ConvertRecord",
"penaltyDuration": "30 sec",
"position": {
"x": 427.0,
"y": 288.0
},
"properties": {
"record-writer": "77c4ccd4-4dc1-3a2a-8fd3-6b9c020ef6dc",
"record-reader": "3e1ca514-b79e-3b6a-acec-4efb66b8d922",
"include-zero-record-flowfiles": "true"
},
"propertyDescriptors": {
"record-writer": {
"displayName": "Record Writer",
"identifiesControllerService": true,
"name": "record-writer",
"sensitive": false
},
"record-reader": {
"displayName": "Record Reader",
"identifiesControllerService": true,
"name": "record-reader",
"sensitive": false
},
"include-zero-record-flowfiles": {
"displayName": "Include Zero Record FlowFiles",
"identifiesControllerService": false,
"name": "include-zero-record-flowfiles",
"sensitive": false
}
},
"retriedRelationships": [],
"retryCount": 10,
"runDurationMillis": 0,
"scheduledState": "ENABLED",
"schedulingPeriod": "0 sec",
"schedulingStrategy": "TIMER_DRIVEN",
"style": {},
"type": "org.apache.nifi.processors.standard.ConvertRecord",
"yieldDuration": "1 sec"
}
],
"remoteProcessGroups": [],
"variables": {}
},
"flowEncodingVersion": "1.0",
"parameterContexts": {},
"parameterProviders": {},
"snapshotMetadata": {
"author": "anonymous",
"comments": "",
"timestamp": 1682715543022,
"version": 1
}
}

View File

@ -0,0 +1,230 @@
{
"externalControllerServices": {
"e314fb24-91a4-38c5-af94-520483cf0579": {
"identifier": "e314fb24-91a4-38c5-af94-520483cf0579",
"name": "MyAvroRecordSetWriter"
},
"9a674922-376f-367c-9dc3-095f8014223e": {
"identifier": "9a674922-376f-367c-9dc3-095f8014223e",
"name": "MyAvroReader"
}
},
"flowContents": {
"comments": "",
"componentType": "PROCESS_GROUP",
"connections": [
{
"backPressureDataSizeThreshold": "1 GB",
"backPressureObjectThreshold": 10000,
"bends": [],
"componentType": "CONNECTION",
"destination": {
"comments": "",
"groupId": "d2cb33fe-d25b-3945-89f2-6283dd46bf9c",
"id": "afe169ce-aada-3055-9489-cd40bde6789d",
"instanceIdentifier": "1117d8ab-8613-3675-83ef-fb6f30b891cb",
"name": "ConvertRecord",
"type": "PROCESSOR"
},
"flowFileExpiration": "0 sec",
"groupIdentifier": "d2cb33fe-d25b-3945-89f2-6283dd46bf9c",
"identifier": "bab085dc-1e3d-3ff4-8c98-61e2ac2da897",
"instanceIdentifier": "b13d9099-7f12-373c-8900-c653d9a5acf0",
"labelIndex": 1,
"loadBalanceCompression": "DO_NOT_COMPRESS",
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"name": "",
"partitioningAttribute": "",
"prioritizers": [],
"selectedRelationships": [
"success"
],
"source": {
"comments": "",
"groupId": "d2cb33fe-d25b-3945-89f2-6283dd46bf9c",
"id": "c84ea0f1-31d6-3c66-a431-526a13ecd953",
"instanceIdentifier": "0c16178a-f864-3b60-11e4-9b5f6d8af199",
"name": "GenerateFlowFile",
"type": "PROCESSOR"
},
"zIndex": 0
}
],
"controllerServices": [],
"defaultBackPressureDataSizeThreshold": "1 GB",
"defaultBackPressureObjectThreshold": 10000,
"defaultFlowFileExpiration": "0 sec",
"flowFileConcurrency": "UNBOUNDED",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"funnels": [],
"identifier": "d2cb33fe-d25b-3945-89f2-6283dd46bf9c",
"inputPorts": [],
"instanceIdentifier": "a44c6370-0187-1000-d0cf-e5bc93fd74d3",
"labels": [],
"name": "Child",
"outputPorts": [],
"position": {
"x": 502.0,
"y": 283.0
},
"processGroups": [],
"processors": [
{
"autoTerminatedRelationships": [],
"backoffMechanism": "PENALIZE_FLOWFILE",
"bulletinLevel": "WARN",
"bundle": {
"artifact": "nifi-standard-nar",
"group": "org.apache.nifi",
"version": "2.0.0-SNAPSHOT"
},
"comments": "",
"componentType": "PROCESSOR",
"concurrentlySchedulableTaskCount": 1,
"executionNode": "ALL",
"groupIdentifier": "d2cb33fe-d25b-3945-89f2-6283dd46bf9c",
"identifier": "c84ea0f1-31d6-3c66-a431-526a13ecd953",
"instanceIdentifier": "0c16178a-f864-3b60-11e4-9b5f6d8af199",
"maxBackoffPeriod": "10 mins",
"name": "GenerateFlowFile",
"penaltyDuration": "30 sec",
"position": {
"x": 432.0,
"y": 64.0
},
"properties": {
"character-set": "UTF-8",
"File Size": "0B",
"Batch Size": "1",
"Unique FlowFiles": "false",
"Data Format": "Text"
},
"propertyDescriptors": {
"character-set": {
"displayName": "Character Set",
"identifiesControllerService": false,
"name": "character-set",
"sensitive": false
},
"File Size": {
"displayName": "File Size",
"identifiesControllerService": false,
"name": "File Size",
"sensitive": false
},
"mime-type": {
"displayName": "Mime Type",
"identifiesControllerService": false,
"name": "mime-type",
"sensitive": false
},
"generate-ff-custom-text": {
"displayName": "Custom Text",
"identifiesControllerService": false,
"name": "generate-ff-custom-text",
"sensitive": false
},
"Batch Size": {
"displayName": "Batch Size",
"identifiesControllerService": false,
"name": "Batch Size",
"sensitive": false
},
"Unique FlowFiles": {
"displayName": "Unique FlowFiles",
"identifiesControllerService": false,
"name": "Unique FlowFiles",
"sensitive": false
},
"Data Format": {
"displayName": "Data Format",
"identifiesControllerService": false,
"name": "Data Format",
"sensitive": false
}
},
"retriedRelationships": [],
"retryCount": 10,
"runDurationMillis": 0,
"scheduledState": "ENABLED",
"schedulingPeriod": "1 min",
"schedulingStrategy": "TIMER_DRIVEN",
"style": {},
"type": "org.apache.nifi.processors.standard.GenerateFlowFile",
"yieldDuration": "1 sec"
},
{
"autoTerminatedRelationships": [
"success",
"failure"
],
"backoffMechanism": "PENALIZE_FLOWFILE",
"bulletinLevel": "WARN",
"bundle": {
"artifact": "nifi-standard-nar",
"group": "org.apache.nifi",
"version": "2.0.0-SNAPSHOT"
},
"comments": "",
"componentType": "PROCESSOR",
"concurrentlySchedulableTaskCount": 1,
"executionNode": "ALL",
"groupIdentifier": "d2cb33fe-d25b-3945-89f2-6283dd46bf9c",
"identifier": "afe169ce-aada-3055-9489-cd40bde6789d",
"instanceIdentifier": "1117d8ab-8613-3675-83ef-fb6f30b891cb",
"maxBackoffPeriod": "10 mins",
"name": "ConvertRecord",
"penaltyDuration": "30 sec",
"position": {
"x": 416.0,
"y": 312.0
},
"properties": {
"record-writer": "e314fb24-91a4-38c5-af94-520483cf0579",
"record-reader": "9a674922-376f-367c-9dc3-095f8014223e",
"include-zero-record-flowfiles": "true"
},
"propertyDescriptors": {
"record-writer": {
"displayName": "Record Writer",
"identifiesControllerService": true,
"name": "record-writer",
"sensitive": false
},
"record-reader": {
"displayName": "Record Reader",
"identifiesControllerService": true,
"name": "record-reader",
"sensitive": false
},
"include-zero-record-flowfiles": {
"displayName": "Include Zero Record FlowFiles",
"identifiesControllerService": false,
"name": "include-zero-record-flowfiles",
"sensitive": false
}
},
"retriedRelationships": [],
"retryCount": 10,
"runDurationMillis": 0,
"scheduledState": "ENABLED",
"schedulingPeriod": "0 sec",
"schedulingStrategy": "TIMER_DRIVEN",
"style": {},
"type": "org.apache.nifi.processors.standard.ConvertRecord",
"yieldDuration": "1 sec"
}
],
"remoteProcessGroups": [],
"variables": {}
},
"flowEncodingVersion": "1.0",
"parameterContexts": {},
"parameterProviders": {},
"snapshotMetadata": {
"author": "anonymous",
"comments": "",
"timestamp": 1682088715936,
"version": 2
}
}

View File

@ -0,0 +1,260 @@
{
"externalControllerServices": {},
"flowContents": {
"comments": "",
"componentType": "PROCESS_GROUP",
"connections": [],
"controllerServices": [
{
"bulletinLevel": "WARN",
"bundle": {
"artifact": "nifi-record-serialization-services-nar",
"group": "org.apache.nifi",
"version": "2.0.0-SNAPSHOT"
},
"comments": "",
"componentType": "CONTROLLER_SERVICE",
"controllerServiceApis": [
{
"bundle": {
"artifact": "nifi-standard-services-api-nar",
"group": "org.apache.nifi",
"version": "2.0.0-SNAPSHOT"
},
"type": "org.apache.nifi.serialization.RecordSetWriterFactory"
}
],
"groupIdentifier": "d4386a93-8013-3a90-a2cc-4cb72322f246",
"identifier": "faddf543-c1f4-3c9c-ab99-dc17a7922e2b",
"instanceIdentifier": "a44aed77-0187-1000-ce63-3170a105bc81",
"name": "MyAvroRecordSetWriter",
"properties": {
"compression-format": "NONE",
"Schema Write Strategy": "avro-embedded",
"schema-name": "${schema.name}",
"cache-size": "1000",
"schema-access-strategy": "inherit-record-schema",
"schema-protocol-version": "1",
"encoder-pool-size": "32",
"schema-text": "${avro.schema}"
},
"propertyDescriptors": {
"compression-format": {
"displayName": "Compression Format",
"identifiesControllerService": false,
"name": "compression-format",
"sensitive": false
},
"Schema Write Strategy": {
"displayName": "Schema Write Strategy",
"identifiesControllerService": false,
"name": "Schema Write Strategy",
"sensitive": false
},
"schema-branch": {
"displayName": "Schema Branch",
"identifiesControllerService": false,
"name": "schema-branch",
"sensitive": false
},
"schema-name": {
"displayName": "Schema Name",
"identifiesControllerService": false,
"name": "schema-name",
"sensitive": false
},
"cache-size": {
"displayName": "Cache Size",
"identifiesControllerService": false,
"name": "cache-size",
"sensitive": false
},
"schema-registry": {
"displayName": "Schema Registry",
"identifiesControllerService": true,
"name": "schema-registry",
"sensitive": false
},
"schema-access-strategy": {
"displayName": "Schema Access Strategy",
"identifiesControllerService": false,
"name": "schema-access-strategy",
"sensitive": false
},
"schema-protocol-version": {
"displayName": "Schema Protocol Version",
"identifiesControllerService": false,
"name": "schema-protocol-version",
"sensitive": false
},
"schema-version": {
"displayName": "Schema Version",
"identifiesControllerService": false,
"name": "schema-version",
"sensitive": false
},
"encoder-pool-size": {
"displayName": "Encoder Pool Size",
"identifiesControllerService": false,
"name": "encoder-pool-size",
"sensitive": false
},
"schema-cache": {
"displayName": "Schema Cache",
"identifiesControllerService": true,
"name": "schema-cache",
"sensitive": false
},
"schema-text": {
"displayName": "Schema Text",
"identifiesControllerService": false,
"name": "schema-text",
"sensitive": false
}
},
"scheduledState": "DISABLED",
"type": "org.apache.nifi.avro.AvroRecordSetWriter"
},
{
"bulletinLevel": "WARN",
"bundle": {
"artifact": "nifi-record-serialization-services-nar",
"group": "org.apache.nifi",
"version": "2.0.0-SNAPSHOT"
},
"comments": "",
"componentType": "CONTROLLER_SERVICE",
"controllerServiceApis": [
{
"bundle": {
"artifact": "nifi-standard-services-api-nar",
"group": "org.apache.nifi",
"version": "2.0.0-SNAPSHOT"
},
"type": "org.apache.nifi.serialization.RecordReaderFactory"
}
],
"groupIdentifier": "d4386a93-8013-3a90-a2cc-4cb72322f246",
"identifier": "317bbb66-135a-398a-8101-976d14ae0882",
"instanceIdentifier": "a44ae24d-0187-1000-cf90-4d00cd5b71dc",
"name": "MyAvroReader",
"properties": {
"schema-name": "${schema.name}",
"cache-size": "1000",
"schema-access-strategy": "embedded-avro-schema",
"schema-text": "${avro.schema}"
},
"propertyDescriptors": {
"schema-branch": {
"displayName": "Schema Branch",
"identifiesControllerService": false,
"name": "schema-branch",
"sensitive": false
},
"schema-name": {
"displayName": "Schema Name",
"identifiesControllerService": false,
"name": "schema-name",
"sensitive": false
},
"cache-size": {
"displayName": "Cache Size",
"identifiesControllerService": false,
"name": "cache-size",
"sensitive": false
},
"schema-registry": {
"displayName": "Schema Registry",
"identifiesControllerService": true,
"name": "schema-registry",
"sensitive": false
},
"schema-access-strategy": {
"displayName": "Schema Access Strategy",
"identifiesControllerService": false,
"name": "schema-access-strategy",
"sensitive": false
},
"schema-version": {
"displayName": "Schema Version",
"identifiesControllerService": false,
"name": "schema-version",
"sensitive": false
},
"schema-text": {
"displayName": "Schema Text",
"identifiesControllerService": false,
"name": "schema-text",
"sensitive": false
}
},
"scheduledState": "DISABLED",
"type": "org.apache.nifi.avro.AvroReader"
}
],
"defaultBackPressureDataSizeThreshold": "1 GB",
"defaultBackPressureObjectThreshold": 10000,
"defaultFlowFileExpiration": "0 sec",
"flowFileConcurrency": "UNBOUNDED",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"funnels": [],
"identifier": "d4386a93-8013-3a90-a2cc-4cb72322f246",
"inputPorts": [],
"instanceIdentifier": "a44ac702-0187-1000-a1cd-1c10c796f849",
"labels": [],
"name": "Parent",
"outputPorts": [],
"position": {
"x": 348.0,
"y": 161.0
},
"processGroups": [
{
"comments": "",
"componentType": "PROCESS_GROUP",
"connections": [],
"controllerServices": [],
"defaultBackPressureDataSizeThreshold": "1 GB",
"defaultBackPressureObjectThreshold": 10000,
"defaultFlowFileExpiration": "0 sec",
"flowFileConcurrency": "UNBOUNDED",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"funnels": [],
"groupIdentifier": "d4386a93-8013-3a90-a2cc-4cb72322f246",
"identifier": "c1976be6-e5d7-31b5-aaa8-64a44810c9d0",
"inputPorts": [],
"instanceIdentifier": "a44b3017-0187-1000-6f44-3881b5ac4217",
"labels": [],
"name": "Child",
"outputPorts": [],
"position": {
"x": 433.0,
"y": 266.0
},
"processGroups": [],
"processors": [],
"remoteProcessGroups": [],
"variables": {},
"versionedFlowCoordinates": {
"bucketId": "6e767fa6-5113-46f0-9b7e-8024b75a40c1",
"flowId": "2192834e-60d7-413b-9ea9-dc4187824a41",
"registryUrl": "http://localhost:18080",
"storageLocation": "http://localhost:18080/nifi-registry-api/buckets/6e767fa6-5113-46f0-9b7e-8024b75a40c1/flows/2192834e-60d7-413b-9ea9-dc4187824a41/versions/2",
"version": 2
}
}
],
"processors": [],
"remoteProcessGroups": [],
"variables": {}
},
"flowEncodingVersion": "1.0",
"parameterContexts": {},
"parameterProviders": {},
"snapshotMetadata": {
"author": "anonymous",
"comments": "",
"timestamp": 1682088790173,
"version": 2
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.service;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ControllerServiceAPI;
import java.util.Map;
public interface ControllerServiceApiLookup {
/**
* Returns the required service APIs for any property descriptors in the component specified by the
* given type and bundle. The key of the returned Map is the name of the PropertyDescriptor.
*
* @param type a component type
* @param bundle a component bundle
* @return the required services APIs for the component's property descriptors, or empty Map if no compatible
* bundle could be found, or if the component doesn't require any service APIs
*/
Map<String, ControllerServiceAPI> getRequiredServiceApis(String type, Bundle bundle);
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.service;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
public interface ControllerServiceResolver {
/**
* Resolves controller service references in any processors or controller services that exist in the flow contents
* of the top level snapshot provided in the flow snapshot container.
*
* The resolution looks for a service in the same group, or ancestor group, that provides the required API and has
* the same name as referenced in the original snapshot. If only one such service exists, it is selected and the
* value of the component's property descriptor is updated. If more than on service exists, the value of the
* component's property descriptor is not modified.
*
* @param flowSnapshotContainer the container encapsulating the top level snapshot being imported, as well as the
* snapshots of any child groups that are also under version control
* @param parentGroupId the id of the process group where the snapshot is being imported
* @param user the user performing the import
*/
void resolveInheritedControllerServices(FlowSnapshotContainer flowSnapshotContainer, String parentGroupId, NiFiUser user);
}

View File

@ -44,7 +44,7 @@ public interface FlowRegistryClientNode extends ComponentNode {
RegisteredFlow getFlow(FlowRegistryClientUserContext context, String bucketId, String flowId) throws FlowRegistryException, IOException;
Set<RegisteredFlow> getFlows(FlowRegistryClientUserContext context, String bucketId) throws FlowRegistryException, IOException;
RegisteredFlowSnapshot getFlowContents(FlowRegistryClientUserContext context, String bucketId, String flowId, int version, boolean fetchRemoteFlows) throws FlowRegistryException, IOException;
FlowSnapshotContainer getFlowContents(FlowRegistryClientUserContext context, String bucketId, String flowId, int version, boolean fetchRemoteFlows) throws FlowRegistryException, IOException;
RegisteredFlowSnapshot registerFlowSnapshot(
FlowRegistryClientUserContext context,
RegisteredFlow flow,

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow;
import org.apache.nifi.flow.ParameterProviderReference;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Holds the results of recursively fetching the contents of a registered flow snapshot where some child groups may
* also be under version control. The top level flow snapshot will have the contents of all the child groups populated,
* and the child snapshots here provide a mechanism to obtain the corresponding metadata for a versioned controlled
* child group, such as the external service references, etc.
*/
public class FlowSnapshotContainer {
private final RegisteredFlowSnapshot flowSnapshot;
/**
* The key of this Map is the id of the corresponding VersionedProcessGroup within the contents of the top level snapshot
* (i.e. within flowSnapshot.getFlowContents()). Any child process group under version control will have an entry in this Map.
*/
private final Map<String, RegisteredFlowSnapshot> childSnapshotsByGroupId;
public FlowSnapshotContainer(final RegisteredFlowSnapshot flowSnapshot) {
this.flowSnapshot = Objects.requireNonNull(flowSnapshot);
if (this.flowSnapshot.getParameterContexts() == null) {
flowSnapshot.setParameterContexts(new HashMap<>());
}
if (this.flowSnapshot.getParameterProviders() == null) {
flowSnapshot.setParameterProviders(new HashMap<>());
}
this.childSnapshotsByGroupId = new HashMap<>();
}
/**
* @return the top level snapshot
*/
public RegisteredFlowSnapshot getFlowSnapshot() {
return flowSnapshot;
}
/**
* Get the snapshot that was used to populate the given group in the top level snapshot.
*
* @param groupId the id of a versioned controlled group within the top level snapshot
* @return the snapshot used to populate that group
*/
public RegisteredFlowSnapshot getChildSnapshot(final String groupId) {
return childSnapshotsByGroupId.get(groupId);
}
/**
* Adds a child snapshot to the container.
*
* @param childSnapshot the snapshot for the inner child PG
* @param destinationGroup the VersionedProcessGroup in the top level snapshot where the child exists
*/
public void addChildSnapshot(final RegisteredFlowSnapshot childSnapshot, final VersionedProcessGroup destinationGroup) {
// We need to use the id of the group that the snapshot is being copied into because that is how this Map
// will be accessed later, the id from the child snapshot's flow contents group is not the same
childSnapshotsByGroupId.put(destinationGroup.getIdentifier(), childSnapshot);
// Merge any parameter contexts and parameter providers from the child into the top level snapshot
mergeParameterContexts(childSnapshot);
mergeParameterProviders(childSnapshot);
}
/**
* For each parameter context in the child snapshot:
* - Check if the top level snapshot has a parameter context with the same name
* - If a context with the same name does not exist, then add the context
* - If a context with the same name does exist, then add any parameters that don't already exist in the context
*/
private void mergeParameterContexts(final RegisteredFlowSnapshot childSnapshot) {
final Map<String, VersionedParameterContext> childContexts = childSnapshot.getParameterContexts();
if (childContexts == null) {
return;
}
for (final Map.Entry<String, VersionedParameterContext> childContextEntry : childContexts.entrySet()) {
final String childContextName = childContextEntry.getKey();
final VersionedParameterContext childContext = childContextEntry.getValue();
final VersionedParameterContext matchingContext = flowSnapshot.getParameterContexts().get(childContextName);
if (matchingContext == null) {
flowSnapshot.getParameterContexts().put(childContextName, childContext);
} else {
if (matchingContext.getParameters() == null) {
matchingContext.setParameters(new HashSet<>());
}
final Set<VersionedParameter> childParameters = childContext.getParameters() == null ? Collections.emptySet() : childContext.getParameters();
for (final VersionedParameter childParameter : childParameters) {
if (!matchingContext.getParameters().contains(childParameter)) {
matchingContext.getParameters().add(childParameter);
}
}
}
}
}
/**
* For each parameter provider reference in the child snapshot:
* - Check if the top level snapshot has a parameter provider reference with the same id
* - If a provider reference does not exist with the same id, then add the provider reference
*/
private void mergeParameterProviders(final RegisteredFlowSnapshot childSnapshot) {
final Map<String, ParameterProviderReference> childParamProviders = childSnapshot.getParameterProviders();
if (childParamProviders == null) {
return;
}
for (final Map.Entry<String, ParameterProviderReference> childProviderEntry : childParamProviders.entrySet()) {
final String childProviderId = childProviderEntry.getKey();
final ParameterProviderReference childProvider = childProviderEntry.getValue();
flowSnapshot.getParameterProviders().putIfAbsent(childProviderId, childProvider);
}
}
}

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.registry.flow;
import org.apache.nifi.flow.ParameterProviderReference;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FlowSnapshotContainerTest {
public static final String CONTEXT_1_NAME = "Context 1";
public static final String CONTEXT_2_NAME = "Context 2";
public static final String CONTEXT_1_PARAM_A_NAME = "Context 1 - Param A";
public static final String CONTEXT_1_PARAM_B_NAME = "Context 1 - Param B";
public static final String CONTEXT_1_PARAM_C_NAME = "Context 1 - Param C";
public static final String CONTEXT_2_PARAM_D_NAME = "Context 2 - Param D";
public static final String PROVIDER_1_ID = "provider1";
public static final String PROVIDER_2_ID = "provider2";
public static final String PROVIDER_3_ID = "provider3";
private FlowSnapshotContainer flowSnapshotContainer;
@BeforeEach
public void setup() {
final VersionedParameter parameterA = createParameter(CONTEXT_1_PARAM_A_NAME);
final VersionedParameter parameterB = createParameter(CONTEXT_1_PARAM_B_NAME);
final VersionedParameterContext context1 = createContext(CONTEXT_1_NAME, parameterA, parameterB);
final Map<String, VersionedParameterContext> topLevelParamContexts = new HashMap<>();
topLevelParamContexts.put(context1.getName(), context1);
final ParameterProviderReference providerReference1 = createProviderReference(PROVIDER_1_ID, "Provider 1");
final ParameterProviderReference providerReference2 = createProviderReference(PROVIDER_2_ID, "Provider 2");
final Map<String, ParameterProviderReference> topLevelProviderReferences = new HashMap<>();
topLevelProviderReferences.put(providerReference1.getIdentifier(), providerReference1);
topLevelProviderReferences.put(providerReference2.getIdentifier(), providerReference2);
final RegisteredFlowSnapshot topLevelSnapshot = new RegisteredFlowSnapshot();
topLevelSnapshot.setFlowContents(new VersionedProcessGroup());
topLevelSnapshot.setParameterContexts(topLevelParamContexts);
topLevelSnapshot.setParameterProviders(topLevelProviderReferences);
flowSnapshotContainer = new FlowSnapshotContainer(topLevelSnapshot);
}
@Test
public void testAddChildSnapshot() {
// Child has same context with an additional parameter
final VersionedParameter parameterC = createParameter(CONTEXT_1_PARAM_C_NAME);
final VersionedParameterContext childContext1 = createContext(CONTEXT_1_NAME, parameterC);
// Child has an additional context that didn't exist in parent
final VersionedParameter parameterD = createParameter(CONTEXT_2_PARAM_D_NAME);
final VersionedParameterContext childContext2 = createContext(CONTEXT_2_NAME, parameterD);
final Map<String, VersionedParameterContext> childParamContexts = new HashMap<>();
childParamContexts.put(childContext1.getName(), childContext1);
childParamContexts.put(childContext2.getName(), childContext2);
// Child has additional provider reference
final ParameterProviderReference providerReference3 = createProviderReference(PROVIDER_3_ID, "Provider 3");
final Map<String, ParameterProviderReference> childProviderReferences = new HashMap<>();
childProviderReferences.put(providerReference3.getIdentifier(), providerReference3);
// Setup child snapshot
final RegisteredFlowSnapshot childSnapshot = new RegisteredFlowSnapshot();
childSnapshot.setFlowContents(new VersionedProcessGroup());
childSnapshot.setParameterContexts(childParamContexts);
childSnapshot.setParameterProviders(childProviderReferences);
// Add to child to container
final VersionedProcessGroup destGroup = new VersionedProcessGroup();
destGroup.setIdentifier(UUID.randomUUID().toString());
flowSnapshotContainer.addChildSnapshot(childSnapshot, destGroup);
// Verify get by group id
final RegisteredFlowSnapshot retrievedChildSnapshot = flowSnapshotContainer.getChildSnapshot(destGroup.getIdentifier());
assertEquals(childSnapshot, retrievedChildSnapshot);
// Verify additional context was added
final Map<String, VersionedParameterContext> topLevelParamContexts = flowSnapshotContainer.getFlowSnapshot().getParameterContexts();
assertEquals(2, topLevelParamContexts.size());
assertTrue(topLevelParamContexts.containsKey(CONTEXT_1_NAME));
assertTrue(topLevelParamContexts.containsKey(CONTEXT_2_NAME));
// Verify additional parameters added to context 1
final VersionedParameterContext context1 = topLevelParamContexts.get(CONTEXT_1_NAME);
final Set<VersionedParameter> context1Parameters = context1.getParameters();
assertEquals(3, context1Parameters.size());
assertNotNull(context1Parameters.stream().filter(p -> p.getName().equals(CONTEXT_1_PARAM_A_NAME)).findFirst().orElse(null));
assertNotNull(context1Parameters.stream().filter(p -> p.getName().equals(CONTEXT_1_PARAM_B_NAME)).findFirst().orElse(null));
assertNotNull(context1Parameters.stream().filter(p -> p.getName().equals(CONTEXT_1_PARAM_C_NAME)).findFirst().orElse(null));
// Verify additional provider added
final Map<String, ParameterProviderReference> topLevelProviders = flowSnapshotContainer.getFlowSnapshot().getParameterProviders();
assertEquals(3, topLevelProviders.size());
assertTrue(topLevelProviders.containsKey(PROVIDER_1_ID));
assertTrue(topLevelProviders.containsKey(PROVIDER_2_ID));
assertTrue(topLevelProviders.containsKey(PROVIDER_3_ID));
}
private VersionedParameter createParameter(final String name) {
final VersionedParameter parameter = new VersionedParameter();
parameter.setName(name);
return parameter;
}
private VersionedParameterContext createContext(final String name, final VersionedParameter ... parameters) {
final VersionedParameterContext paramContext = new VersionedParameterContext();
paramContext.setName(name);
paramContext.setParameters(new HashSet<>(Arrays.asList(parameters)));
return paramContext;
}
private ParameterProviderReference createProviderReference(final String id, final String name) {
final ParameterProviderReference reference = new ParameterProviderReference();
reference.setIdentifier(id);
reference.setName(name);
return reference;
}
}

View File

@ -113,8 +113,11 @@ import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.serialization.ScheduledStateLookup;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceResolver;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.service.StandardControllerServiceResolver;
import org.apache.nifi.controller.service.StandardControllerServiceApiLookup;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
import org.apache.nifi.controller.status.NodeStatus;
@ -167,6 +170,7 @@ import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.StandardProvenanceAuthorizableFactory;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.HttpRemoteSiteListener;
@ -278,6 +282,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicBoolean flowSynchronized = new AtomicBoolean(false);
private final StandardControllerServiceProvider controllerServiceProvider;
private final StandardControllerServiceResolver controllerServiceResolver;
private final Authorizer authorizer;
private final AuditService auditService;
private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
@ -544,6 +549,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
flowManager = new StandardFlowManager(nifiProperties, sslContext, this, flowFileEventRepository, parameterContextManager);
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);
controllerServiceResolver = new StandardControllerServiceResolver(authorizer, flowManager, new NiFiRegistryFlowMapper(extensionManager),
controllerServiceProvider, new StandardControllerServiceApiLookup(extensionManager));
flowManager.initialize(controllerServiceProvider);
eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
@ -2094,6 +2101,9 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return controllerServiceProvider;
}
public ControllerServiceResolver getControllerServiceResolver() {
return controllerServiceResolver;
}
public VariableRegistry getVariableRegistry() {
return variableRegistry;

View File

@ -32,6 +32,7 @@ import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ParameterProviderReference;
import org.apache.nifi.parameter.ParameterGroupConfiguration;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.flow.VersionedParameterContext;
@ -1636,7 +1637,7 @@ public interface NiFiServiceFacade {
*
* @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found
*/
RegisteredFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows);
FlowSnapshotContainer getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo, boolean fetchRemoteFlows);
/**
* Get the latest Versioned Flow Snapshot from the registry for the Process Group with the given ID
@ -1646,7 +1647,7 @@ public interface NiFiServiceFacade {
*
* @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found
*/
RegisteredFlowSnapshot getVersionedFlowSnapshotByGroupId(String processGroupId);
FlowSnapshotContainer getVersionedFlowSnapshotByGroupId(String processGroupId);
/**
* Get the current state of the Process Group with the given ID, converted to a Versioned Flow Snapshot
@ -2700,11 +2701,11 @@ public interface NiFiServiceFacade {
* For any Controller Service that is found in the given Versioned Process Group, if that Controller Service is not itself included in the Versioned Process Groups,
* attempts to find an existing Controller Service that matches the definition. If any is found, the component within the Versioned Process Group is updated to point
* to the existing service.
* @param versionedFlowSnapshot the flow snapshot
* @param flowSnapshotContainer the flow snapshot container
* @param parentGroupId the ID of the Process Group from which the Controller Services are inherited
* @param user the NiFi user on whose behalf the request is happening; this user is used for validation so that only the Controller Services that the user has READ permissions to are included
*/
void resolveInheritedControllerServices(RegisteredFlowSnapshot versionedFlowSnapshot, String parentGroupId, NiFiUser user);
void resolveInheritedControllerServices(FlowSnapshotContainer flowSnapshotContainer, String parentGroupId, NiFiUser user);
/**
* For any Parameter Provider that is found in the given Versioned Process Group, attempts to find an existing Parameter Provider that matches the definition. If any is found,

View File

@ -71,7 +71,6 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ParameterProviderNode;
@ -103,7 +102,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ParameterProviderReference;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConfigurableComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
@ -111,8 +109,6 @@ import org.apache.nifi.flow.VersionedExternalFlowMetadata;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
@ -144,6 +140,7 @@ import org.apache.nifi.registry.flow.FlowRegistryClientNode;
import org.apache.nifi.registry.flow.FlowRegistryException;
import org.apache.nifi.registry.flow.FlowRegistryPermissions;
import org.apache.nifi.registry.flow.FlowRegistryUtil;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
@ -4061,124 +4058,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public void resolveInheritedControllerServices(final RegisteredFlowSnapshot versionedFlowSnapshot, final String processGroupId, final NiFiUser user) {
final VersionedProcessGroup versionedGroup = versionedFlowSnapshot.getFlowContents();
resolveInheritedControllerServices(versionedGroup, processGroupId, versionedFlowSnapshot.getExternalControllerServices(), user);
}
private void resolveInheritedControllerServices(final VersionedProcessGroup versionedGroup, final String processGroupId,
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences,
final NiFiUser user) {
final Set<String> availableControllerServiceIds = findAllControllerServiceIds(versionedGroup);
final ProcessGroup parentGroup = processGroupDAO.getProcessGroup(processGroupId);
final Set<ControllerServiceNode> serviceNodes = parentGroup.getControllerServices(true).stream()
.filter(service -> service.isAuthorized(authorizer, RequestAction.READ, user))
.collect(Collectors.toSet());
final ExtensionManager extensionManager = controllerFacade.getExtensionManager();
for (final VersionedProcessor processor : versionedGroup.getProcessors()) {
final Optional<BundleCoordinate> compatibleBundle = BundleUtils.getOptionalCompatibleBundle(extensionManager, processor.getType(), BundleUtils.createBundleDto(processor.getBundle()));
if (compatibleBundle.isPresent()) {
final ConfigurableComponent tempComponent = extensionManager.getTempComponent(processor.getType(), compatibleBundle.get());
resolveInheritedControllerServices(processor, availableControllerServiceIds, serviceNodes, externalControllerServiceReferences, tempComponent::getPropertyDescriptor);
}
}
for (final VersionedControllerService service : versionedGroup.getControllerServices()) {
final Optional<BundleCoordinate> compatibleBundle = BundleUtils.getOptionalCompatibleBundle(extensionManager, service.getType(), BundleUtils.createBundleDto(service.getBundle()));
if (compatibleBundle.isPresent()) {
final ConfigurableComponent tempComponent = extensionManager.getTempComponent(service.getType(), compatibleBundle.get());
resolveInheritedControllerServices(service, availableControllerServiceIds, serviceNodes, externalControllerServiceReferences, tempComponent::getPropertyDescriptor);
}
}
for (final VersionedProcessGroup child : versionedGroup.getProcessGroups()) {
resolveInheritedControllerServices(child, processGroupId, externalControllerServiceReferences, user);
}
}
private void resolveInheritedControllerServices(final VersionedConfigurableComponent component, final Set<String> availableControllerServiceIds,
final Set<ControllerServiceNode> availableControllerServices,
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences,
final Function<String, PropertyDescriptor> descriptorLookup) {
final Map<String, VersionedPropertyDescriptor> descriptors = component.getPropertyDescriptors();
final Map<String, String> properties = component.getProperties();
resolveInheritedControllerServices(descriptors, properties, availableControllerServiceIds, availableControllerServices, externalControllerServiceReferences, descriptorLookup);
}
private void resolveInheritedControllerServices(final Map<String, VersionedPropertyDescriptor> propertyDescriptors, final Map<String, String> componentProperties,
final Set<String> availableControllerServiceIds, final Set<ControllerServiceNode> availableControllerServices,
final Map<String, ExternalControllerServiceReference> externalControllerServiceReferences,
final Function<String, PropertyDescriptor> descriptorLookup) {
for (final Map.Entry<String, String> entry : new HashMap<>(componentProperties).entrySet()) {
final String propertyName = entry.getKey();
final String propertyValue = entry.getValue();
final VersionedPropertyDescriptor propertyDescriptor = propertyDescriptors.get(propertyName);
if (propertyDescriptor == null) {
continue;
}
if (!propertyDescriptor.getIdentifiesControllerService()) {
continue;
}
// If the referenced Controller Service is available in this flow, there is nothing to resolve.
if (availableControllerServiceIds.contains(propertyValue)) {
continue;
}
final ExternalControllerServiceReference externalServiceReference = externalControllerServiceReferences == null ? null : externalControllerServiceReferences.get(propertyValue);
if (externalServiceReference == null) {
continue;
}
final PropertyDescriptor descriptor = descriptorLookup.apply(propertyName);
if (descriptor == null) {
continue;
}
final Class<? extends ControllerService> referencedServiceClass = descriptor.getControllerServiceDefinition();
if (referencedServiceClass == null) {
continue;
}
final String externalControllerServiceName = externalServiceReference.getName();
final List<ControllerServiceNode> matchingControllerServices = availableControllerServices.stream()
.filter(service -> service.getName().equals(externalControllerServiceName))
.filter(service -> referencedServiceClass.isAssignableFrom(service.getProxiedControllerService().getClass()))
.collect(Collectors.toList());
if (matchingControllerServices.size() != 1) {
continue;
}
final ControllerServiceNode matchingServiceNode = matchingControllerServices.get(0);
final Optional<String> versionedComponentId = matchingServiceNode.getVersionedComponentId();
final String resolvedId = versionedComponentId.orElseGet(matchingServiceNode::getIdentifier);
componentProperties.put(propertyName, resolvedId);
}
}
private Set<String> findAllControllerServiceIds(final VersionedProcessGroup group) {
final Set<String> ids = new HashSet<>();
findAllControllerServiceIds(group, ids);
return ids;
}
private void findAllControllerServiceIds(final VersionedProcessGroup group, final Set<String> ids) {
for (final VersionedControllerService service : group.getControllerServices()) {
ids.add(service.getIdentifier());
}
for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
findAllControllerServiceIds(childGroup, ids);
}
public void resolveInheritedControllerServices(final FlowSnapshotContainer flowSnapshotContainer, final String processGroupId, final NiFiUser user) {
controllerFacade.getControllerServiceResolver().resolveInheritedControllerServices(flowSnapshotContainer, processGroupId, user);
}
@Override
@ -5208,7 +5089,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public RegisteredFlowSnapshot getVersionedFlowSnapshotByGroupId(final String processGroupId) {
public FlowSnapshotContainer getVersionedFlowSnapshotByGroupId(final String processGroupId) {
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
@ -5217,7 +5098,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public RegisteredFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) {
public FlowSnapshotContainer getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo, final boolean fetchRemoteFlows) {
return getVersionedFlowSnapshot(versionControlInfo.getRegistryId(), versionControlInfo.getBucketId(), versionControlInfo.getFlowId(),
versionControlInfo.getVersion(), fetchRemoteFlows);
}
@ -5231,16 +5112,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
* @param fetchRemoteFlows indicator to include remote flows when retrieving the flow
* @return a VersionedFlowSnapshot from a registry with the given version
*/
private RegisteredFlowSnapshot getVersionedFlowSnapshot(final String registryId, final String bucketId, final String flowId,
private FlowSnapshotContainer getVersionedFlowSnapshot(final String registryId, final String bucketId, final String flowId,
final Integer flowVersion, final boolean fetchRemoteFlows) {
final FlowRegistryClientNode flowRegistry = flowRegistryDAO.getFlowRegistryClient(registryId);
if (flowRegistry == null) {
throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + registryId);
}
final RegisteredFlowSnapshot snapshot;
try {
snapshot = flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), bucketId, flowId, flowVersion, fetchRemoteFlows);
return flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), bucketId, flowId, flowVersion, fetchRemoteFlows);
} catch (final FlowRegistryException e) {
logger.error(e.getMessage(), e);
throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
@ -5248,8 +5128,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
} catch (final IOException ioe) {
throw new IllegalStateException("Failed to communicate with Flow Registry when attempting to retrieve a versioned flow", ioe);
}
return snapshot;
}
@Override
@ -5330,8 +5208,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
if (registryGroup == null) {
try {
final RegisteredFlowSnapshot versionedFlowSnapshot = flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()),
final FlowSnapshotContainer flowSnapshotContainer = flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()),
versionControlInfo.getBucketIdentifier(), versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion(), true);
final RegisteredFlowSnapshot versionedFlowSnapshot = flowSnapshotContainer.getFlowSnapshot();
registryGroup = versionedFlowSnapshot.getFlowContents();
} catch (final IOException | FlowRegistryException e) {
throw new NiFiCoreException("Failed to retrieve flow with Flow Registry in order to calculate local differences due to " + e.getMessage(), e);

View File

@ -31,6 +31,7 @@ import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
@ -135,12 +136,12 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
* @param allowDirtyFlowUpdate allow updating a flow with versioned changes present
* @param requestType the type of request ("replace-requests" or "update-requests")
* @param replicateUriPath the uri path to use for replicating the request (differs from initial request uri)
* @param flowSnapshotSupplier provides access to the flow snapshot to be used for replacement
* @param flowSnapshotContainerSupplier provides access to the flow snapshot to be used for replacement
* @return response containing status of the async request
*/
protected Response initiateFlowUpdate(final String groupId, final T requestEntity, final boolean allowDirtyFlowUpdate,
final String requestType, final String replicateUriPath,
final Supplier<RegisteredFlowSnapshot> flowSnapshotSupplier) {
final Supplier<FlowSnapshotContainer> flowSnapshotContainerSupplier) {
// Verify the request
final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision();
if (revisionDto == null) {
@ -185,14 +186,15 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
// 13. Re-Start all Processors, Funnels, Ports that are affected and not removed.
// Step 0: Obtain the versioned flow snapshot to use for the update
final RegisteredFlowSnapshot flowSnapshot = flowSnapshotSupplier.get();
final FlowSnapshotContainer flowSnapshotContainer = flowSnapshotContainerSupplier.get();
final RegisteredFlowSnapshot flowSnapshot = flowSnapshotContainer.getFlowSnapshot();
// The new flow may not contain the same versions of components in existing flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// If there are any Controller Services referenced that are inherited from the parent group, resolve those to point to the appropriate Controller Service, if we are able to.
serviceFacade.resolveInheritedControllerServices(flowSnapshot, groupId, user);
serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, user);
// If there are any Parameter Providers referenced by Parameter Contexts, resolve these to point to the appropriate Parameter Provider, if we are able to.
serviceFacade.resolveParameterProviders(flowSnapshot, user);
@ -379,7 +381,14 @@ public abstract class FlowUpdateResource<T extends ProcessGroupDescriptorEntity,
// Get the Original Flow Snapshot in case we fail to update and need to rollback
// This only applies to flows that were under version control, update may be called without version control
final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(groupId);
final RegisteredFlowSnapshot originalFlowSnapshot = vciEntity == null ? null : serviceFacade.getVersionedFlowSnapshot(vciEntity.getVersionControlInformation(), true);
final RegisteredFlowSnapshot originalFlowSnapshot;
if (vciEntity == null) {
originalFlowSnapshot = null;
} else {
final FlowSnapshotContainer originalFlowSnapshotContainer = serviceFacade.getVersionedFlowSnapshot(vciEntity.getVersionControlInformation(), true);
originalFlowSnapshot = originalFlowSnapshotContainer.getFlowSnapshot();
}
try {
if (replicateRequest) {

View File

@ -58,6 +58,7 @@ import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.FlowRegistryBucket;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowState;
@ -2012,7 +2013,8 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) {
// Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
// Step 2: Retrieve flow from Flow Registry
final RegisteredFlowSnapshot flowSnapshot = getFlowFromRegistry(versionControlInfo);
final FlowSnapshotContainer flowSnapshotContainer = getFlowFromRegistry(versionControlInfo);
final RegisteredFlowSnapshot flowSnapshot = flowSnapshotContainer.getFlowSnapshot();
// Step 3: Enrich version control info came from UI
if (flowSnapshot.getFlowContents() != null) {
@ -2026,7 +2028,7 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// If there are any Controller Services referenced that are inherited from the parent group, resolve those to point to the appropriate Controller Service, if we are able to.
serviceFacade.resolveInheritedControllerServices(flowSnapshot, groupId, NiFiUserUtils.getNiFiUser());
serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, NiFiUserUtils.getNiFiUser());
// If there are any Parameter Providers referenced by Parameter Contexts, resolve these to point to the appropriate Parameter Provider, if we are able to.
serviceFacade.resolveParameterProviders(flowSnapshot, NiFiUserUtils.getNiFiUser());
@ -2095,8 +2097,9 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
);
}
private RegisteredFlowSnapshot getFlowFromRegistry(final VersionControlInformationDTO versionControlInfo) {
final RegisteredFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true);
private FlowSnapshotContainer getFlowFromRegistry(final VersionControlInformationDTO versionControlInfo) {
final FlowSnapshotContainer flowSnapshotContainer = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true);
final RegisteredFlowSnapshot flowSnapshot = flowSnapshotContainer.getFlowSnapshot();
final FlowRegistryBucket bucket = flowSnapshot.getBucket();
final RegisteredFlow flow = flowSnapshot.getFlow();
@ -2108,7 +2111,7 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
final VersionedFlowState flowState = flowSnapshot.isLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
versionControlInfo.setState(flowState.name());
return flowSnapshot;
return flowSnapshotContainer;
}
@ -4170,8 +4173,9 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
versionedFlowSnapshot.setSnapshotMetadata(null);
sanitizeRegistryInfo(versionedFlowSnapshot.getFlowContents());
final FlowSnapshotContainer flowSnapshotContainer = new FlowSnapshotContainer(versionedFlowSnapshot);
return initiateFlowUpdate(groupId, importEntity, true, "replace-requests",
"/nifi-api/process-groups/" + groupId + "/flow-contents", importEntity::getVersionedFlowSnapshot);
"/nifi-api/process-groups/" + groupId + "/flow-contents", () -> flowSnapshotContainer);
}
/**
@ -4286,7 +4290,8 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
// if there are any Controller Services referenced that are inherited from the parent group,
// resolve those to point to the appropriate Controller Service, if we are able to.
serviceFacade.resolveInheritedControllerServices(deserializedSnapshot, groupId, NiFiUserUtils.getNiFiUser());
final FlowSnapshotContainer flowSnapshotContainer = new FlowSnapshotContainer(deserializedSnapshot);
serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, NiFiUserUtils.getNiFiUser());
// If there are any Parameter Providers referenced by Parameter Contexts, resolve these to point to the appropriate Parameter Provider, if we are able to.
serviceFacade.resolveParameterProviders(deserializedSnapshot, NiFiUserUtils.getNiFiUser());
@ -4386,7 +4391,8 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
// if there are any Controller Services referenced that are inherited from the parent group,
// resolve those to point to the appropriate Controller Service, if we are able to.
serviceFacade.resolveInheritedControllerServices(versionedFlowSnapshot, groupId, NiFiUserUtils.getNiFiUser());
final FlowSnapshotContainer flowSnapshotContainer = new FlowSnapshotContainer(versionedFlowSnapshot);
serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, NiFiUserUtils.getNiFiUser());
// If there are any Parameter Providers referenced by Parameter Contexts, resolve these to point to the appropriate Parameter Provider, if we are able to.
serviceFacade.resolveParameterProviders(versionedFlowSnapshot, NiFiUserUtils.getNiFiUser());

View File

@ -33,6 +33,7 @@ import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.registry.flow.FlowRegistryBucket;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlow;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
@ -164,7 +165,8 @@ public class VersionsResource extends FlowUpdateResource<VersionControlInformati
});
// get the versioned flow
final RegisteredFlowSnapshot versionedFlowSnapshot = serviceFacade.getVersionedFlowSnapshotByGroupId(groupId);
final FlowSnapshotContainer snapshotContainer = serviceFacade.getVersionedFlowSnapshotByGroupId(groupId);
final RegisteredFlowSnapshot versionedFlowSnapshot = snapshotContainer.getFlowSnapshot();
final VersionedProcessGroup versionedProcessGroup = versionedFlowSnapshot.getFlowContents();
final String flowName = versionedProcessGroup.getName();
@ -1129,14 +1131,15 @@ public class VersionsResource extends FlowUpdateResource<VersionControlInformati
final NiFiUser user = NiFiUserUtils.getNiFiUser();
// Step 0: Get the Versioned Flow Snapshot from the Flow Registry
final RegisteredFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true);
final FlowSnapshotContainer flowSnapshotContainer = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true);
final RegisteredFlowSnapshot flowSnapshot = flowSnapshotContainer.getFlowSnapshot();
// The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
// the flow snapshot to contain compatible bundles.
serviceFacade.discoverCompatibleBundles(flowSnapshot.getFlowContents());
// If there are any Controller Services referenced that are inherited from the parent group, resolve those to point to the appropriate Controller Service, if we are able to.
serviceFacade.resolveInheritedControllerServices(flowSnapshot, groupId, NiFiUserUtils.getNiFiUser());
serviceFacade.resolveInheritedControllerServices(flowSnapshotContainer, groupId, NiFiUserUtils.getNiFiUser());
// If there are any Parameter Providers referenced by Parameter Contexts, resolve these to point to the appropriate Parameter Provider, if we are able to.
serviceFacade.resolveParameterProviders(flowSnapshot, NiFiUserUtils.getNiFiUser());

View File

@ -54,6 +54,7 @@ import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceResolver;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
@ -190,6 +191,10 @@ public class ControllerFacade implements Authorizable {
return flowController.getControllerServiceProvider();
}
public ControllerServiceResolver getControllerServiceResolver() {
return flowController.getControllerServiceResolver();
}
public ExtensionManager getExtensionManager() {
return flowController.getExtensionManager();
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.web.api;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.FlowSnapshotContainer;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata;
import org.apache.nifi.web.NiFiServiceFacade;
@ -48,8 +49,8 @@ public class TestVersionsResource {
public void testExportFlowVersion() {
final String groupId = UUID.randomUUID().toString();
final RegisteredFlowSnapshot versionedFlowSnapshot = mock(RegisteredFlowSnapshot.class);
when(serviceFacade.getVersionedFlowSnapshotByGroupId(groupId)).thenReturn(versionedFlowSnapshot);
final FlowSnapshotContainer snapshotContainer = new FlowSnapshotContainer(versionedFlowSnapshot);
when(serviceFacade.getVersionedFlowSnapshotByGroupId(groupId)).thenReturn(snapshotContainer);
final String flowName = "flowname";
final int flowVersion = 1;