diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index af3cf4931e..d687deee3d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -1202,10 +1202,11 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen destination.addControllerService(newService); } - updateControllerService(newService, proposed, topLevelGroup); final Map decryptedProperties = getDecryptedProperties(proposed.getProperties()); createdExtensions.add(new CreatedExtension(newService, decryptedProperties)); + updateControllerService(newService, proposed, topLevelGroup); + return newService; } @@ -1410,7 +1411,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final boolean sensitive = (descriptor != null && descriptor.isSensitive()) || (versionedDescriptor != null && versionedDescriptor.isSensitive()); - String value; + final String value; if (descriptor != null && referencesService && (proposedProperties.get(propertyName) != null)) { // Need to determine if the component's property descriptor for this service is already set to an id // of an existing service that is outside the current processor group, and if it is we want to leave @@ -1433,6 +1434,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final String serviceVersionedComponentId = proposedProperties.get(propertyName); String instanceId = getServiceInstanceId(serviceVersionedComponentId, group); value = (instanceId == null) ? serviceVersionedComponentId : instanceId; + + // Find the same property descriptor in the component's CreatedExtension and replace it with the + // instance ID of the service + createdExtensions.stream().filter(ce -> ce.extension.equals(componentNode)).forEach(createdExtension -> { + createdExtension.propertyValues.replace(propertyName, value); + }); } else { value = existingExternalServiceId; } @@ -1460,8 +1467,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen // so we want to continue on and update the value to null. } - value = decrypt(value, syncOptions.getPropertyDecryptor()); - fullPropertyMap.put(propertyName, value); + fullPropertyMap.put(propertyName, decrypt(value, syncOptions.getPropertyDecryptor())); } } @@ -2266,13 +2272,14 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen if (port == null) { final ComponentType proposedType = proposed.getComponentType(); + final Port newPort; if (proposedType == ComponentType.INPUT_PORT) { - addInputPort(group, proposed, synchronizationOptions.getComponentIdGenerator(), proposed.getName()); + newPort = addInputPort(group, proposed, synchronizationOptions.getComponentIdGenerator(), proposed.getName()); } else { - addOutputPort(group, proposed, synchronizationOptions.getComponentIdGenerator(), proposed.getName()); + newPort = addOutputPort(group, proposed, synchronizationOptions.getComponentIdGenerator(), proposed.getName()); } - LOG.info("Successfully synchronized {} by adding it to the flow", port); + LOG.info("Successfully synchronized {} by adding it to the flow", newPort); } else if (proposed == null) { final Set stoppedDownstream = stopDownstreamComponents(port, timeout, synchronizationOptions); toRestart.addAll(stoppedDownstream); @@ -2400,11 +2407,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen procNode.setVersionedComponentId(proposed.getIdentifier()); destination.addProcessor(procNode); - updateProcessor(procNode, proposed, topLevelGroup); final Map decryptedProperties = getDecryptedProperties(proposed.getProperties()); createdExtensions.add(new CreatedExtension(procNode, decryptedProperties)); + updateProcessor(procNode, proposed, topLevelGroup); + // Notify the processor node that the configuration (properties, e.g.) has been restored final ProcessContext processContext = context.getProcessContextFactory().apply(procNode); procNode.onConfigurationRestored(processContext); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java index 2d8dcfc4e3..aee703bff2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java @@ -19,12 +19,15 @@ package org.apache.nifi.flow.synchronization; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.PropertyConfiguration; import org.apache.nifi.controller.ReloadComponent; @@ -73,6 +76,7 @@ import org.apache.nifi.parameter.StandardParameterContextManager; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.registry.flow.mapping.FlowMappingOptions; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.SchedulingStrategy; import org.junit.jupiter.api.BeforeEach; @@ -84,12 +88,14 @@ import org.mockito.stubbing.Answer; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -279,6 +285,8 @@ public class StandardVersionedComponentSynchronizerTest { when(service.isActive()).thenReturn(false); when(service.getProcessGroup()).thenReturn(group); when(service.getState()).thenReturn(ControllerServiceState.DISABLED); + when(service.getBulletinLevel()).thenReturn(LogLevel.WARN); + when(service.getControllerServiceImplementation()).thenReturn(new TestControllerService()); return service; } @@ -304,6 +312,16 @@ public class StandardVersionedComponentSynchronizerTest { return port; } + private ProcessGroup createMockProcessGroup() { + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(processGroup.getIdentifier()).thenReturn("processGroup"); + when(processGroup.getPosition()).thenReturn(new org.apache.nifi.connectable.Position(0, 0)); + when(processGroup.getFlowFileConcurrency()).thenReturn(FlowFileConcurrency.UNBOUNDED); + when(processGroup.getFlowFileOutboundPolicy()).thenReturn(FlowFileOutboundPolicy.BATCH_OUTPUT); + when(processGroup.getExecutionEngine()).thenReturn(ExecutionEngine.STANDARD); + + return processGroup; + } private Connection createMockConnection(final Connectable source, final Connectable destination, final ProcessGroup group) { final String uuid = UUID.randomUUID().toString(); @@ -342,12 +360,7 @@ public class StandardVersionedComponentSynchronizerTest { @Test public void testSynchronizeProcessorAddedMigrated() { - final ProcessGroup processGroup = mock(ProcessGroup.class); - when(processGroup.getIdentifier()).thenReturn("processGroup"); - when(processGroup.getPosition()).thenReturn(new org.apache.nifi.connectable.Position(0, 0)); - when(processGroup.getFlowFileConcurrency()).thenReturn(FlowFileConcurrency.UNBOUNDED); - when(processGroup.getFlowFileOutboundPolicy()).thenReturn(FlowFileOutboundPolicy.BATCH_OUTPUT); - when(processGroup.getExecutionEngine()).thenReturn(ExecutionEngine.STANDARD); + final ProcessGroup processGroup = createMockProcessGroup(); final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); rootGroup.setIdentifier("rootGroup"); @@ -377,6 +390,51 @@ public class StandardVersionedComponentSynchronizerTest { assertEquals(ENCODED_TEXT, propertyValue); } + @Test + public void testAddProcessorWithServiceAndMigration() { + final ProcessGroup processGroup = createMockProcessGroup(); + + final ProcessorNode processorNode = createMockProcessor(); + when(processorNode.getProcessGroup()).thenReturn(processGroup); + when(processorNode.getRawPropertyValues()).thenReturn(Collections.emptyMap()); + + final VersionedPropertyDescriptor versionedDescriptorCS = new VersionedPropertyDescriptor(); + final PropertyDescriptor descriptorCS = new PropertyDescriptor.Builder().name("cs") + .identifiesControllerService(ControllerService.class).build(); + versionedDescriptorCS.setName(descriptorCS.getName()); + versionedDescriptorCS.setIdentifiesControllerService(true); + final Map proposedDescriptors = + Map.of(versionedDescriptorCS.getName(), versionedDescriptorCS); + + ControllerServiceNode controllerServiceNode = createMockControllerService(); + when(flowManager.createControllerService(any(), any(), any(), anySet(), eq(true), eq(true), eq(null))).thenReturn(controllerServiceNode); + when(flowManager.createProcessor(any(), any(), any(), eq(true))).thenReturn(processorNode); + setReferences(controllerServiceNode, processorNode); + when(controllerServiceNode.getVersionedComponentId()).thenReturn(Optional.of("12345")); + + final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor(); + final Map versionedProperties = Collections.singletonMap("cs", "12345"); + versionedProcessor.setProperties(versionedProperties); + versionedProcessor.setPropertyDescriptors(proposedDescriptors); + final VersionedControllerService versionedControllerService = createMinimalVersionedControllerService(); + + final VersionedProcessGroup versionedGroup = new VersionedProcessGroup(); + versionedGroup.setIdentifier("pg-v2"); + versionedGroup.setProcessors(Set.of(versionedProcessor)); + versionedGroup.setControllerServices(Set.of(versionedControllerService)); + + final VersionedExternalFlow externalFlow = new VersionedExternalFlow(); + externalFlow.setFlowContents(versionedGroup); + + when(processGroup.getControllerServices(false)).thenReturn(Set.of(controllerServiceNode)); + + synchronizer.synchronize(processGroup, externalFlow, synchronizationOptions); + + verify(processorNode).migrateConfiguration(propertiesCaptor.capture(), any()); + final Map migratedProperties = propertiesCaptor.getValue(); + assertEquals(controllerServiceNode.getIdentifier(), migratedProperties.get("cs")); + } + @Test public void testSynchronizeProcessorSensitiveDynamicProperties() throws FlowSynchronizationException, InterruptedException, TimeoutException { final Map versionedProperties = Collections.singletonMap(SENSITIVE_PROPERTY_NAME, ENCRYPTED_PROPERTY_VALUE); @@ -1386,4 +1444,35 @@ public class StandardVersionedComponentSynchronizerTest { } } } + + private static class TestControllerService implements ControllerService { + + @Override + public Collection validate(ValidationContext context) { + return null; + } + + @Override + public PropertyDescriptor getPropertyDescriptor(String name) { + return null; + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + } + + @Override + public List getPropertyDescriptors() { + return null; + } + + @Override + public String getIdentifier() { + return null; + } + + @Override + public void initialize(ControllerServiceInitializationContext context) throws InitializationException { + } + } }