NIFI-12394 Fixed Service references for Migrated Configurations

When syncing to a VersionedFlow, make sure processor references to new controller services are valid after the processor migrates the configuration of its properties

This closes #8184

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mike Moser 2023-12-21 22:07:09 +00:00 committed by exceptionfactory
parent 02df7c13f8
commit 050f81f686
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 111 additions and 14 deletions

View File

@ -1202,10 +1202,11 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
destination.addControllerService(newService);
}
updateControllerService(newService, proposed, topLevelGroup);
final Map<String, String> decryptedProperties = getDecryptedProperties(proposed.getProperties());
createdExtensions.add(new CreatedExtension(newService, decryptedProperties));
updateControllerService(newService, proposed, topLevelGroup);
return newService;
}
@ -1410,7 +1411,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final boolean sensitive = (descriptor != null && descriptor.isSensitive())
|| (versionedDescriptor != null && versionedDescriptor.isSensitive());
String value;
final String value;
if (descriptor != null && referencesService && (proposedProperties.get(propertyName) != null)) {
// Need to determine if the component's property descriptor for this service is already set to an id
// of an existing service that is outside the current processor group, and if it is we want to leave
@ -1433,6 +1434,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final String serviceVersionedComponentId = proposedProperties.get(propertyName);
String instanceId = getServiceInstanceId(serviceVersionedComponentId, group);
value = (instanceId == null) ? serviceVersionedComponentId : instanceId;
// Find the same property descriptor in the component's CreatedExtension and replace it with the
// instance ID of the service
createdExtensions.stream().filter(ce -> ce.extension.equals(componentNode)).forEach(createdExtension -> {
createdExtension.propertyValues.replace(propertyName, value);
});
} else {
value = existingExternalServiceId;
}
@ -1460,8 +1467,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
// so we want to continue on and update the value to null.
}
value = decrypt(value, syncOptions.getPropertyDecryptor());
fullPropertyMap.put(propertyName, value);
fullPropertyMap.put(propertyName, decrypt(value, syncOptions.getPropertyDecryptor()));
}
}
@ -2266,13 +2272,14 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
if (port == null) {
final ComponentType proposedType = proposed.getComponentType();
final Port newPort;
if (proposedType == ComponentType.INPUT_PORT) {
addInputPort(group, proposed, synchronizationOptions.getComponentIdGenerator(), proposed.getName());
newPort = addInputPort(group, proposed, synchronizationOptions.getComponentIdGenerator(), proposed.getName());
} else {
addOutputPort(group, proposed, synchronizationOptions.getComponentIdGenerator(), proposed.getName());
newPort = addOutputPort(group, proposed, synchronizationOptions.getComponentIdGenerator(), proposed.getName());
}
LOG.info("Successfully synchronized {} by adding it to the flow", port);
LOG.info("Successfully synchronized {} by adding it to the flow", newPort);
} else if (proposed == null) {
final Set<Connectable> stoppedDownstream = stopDownstreamComponents(port, timeout, synchronizationOptions);
toRestart.addAll(stoppedDownstream);
@ -2400,11 +2407,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
procNode.setVersionedComponentId(proposed.getIdentifier());
destination.addProcessor(procNode);
updateProcessor(procNode, proposed, topLevelGroup);
final Map<String, String> decryptedProperties = getDecryptedProperties(proposed.getProperties());
createdExtensions.add(new CreatedExtension(procNode, decryptedProperties));
updateProcessor(procNode, proposed, topLevelGroup);
// Notify the processor node that the configuration (properties, e.g.) has been restored
final ProcessContext processContext = context.getProcessContextFactory().apply(procNode);
procNode.onConfigurationRestored(processContext);

View File

@ -19,12 +19,15 @@ package org.apache.nifi.flow.synchronization;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.ReloadComponent;
@ -73,6 +76,7 @@ import org.apache.nifi.parameter.StandardParameterContextManager;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.junit.jupiter.api.BeforeEach;
@ -84,12 +88,14 @@ import org.mockito.stubbing.Answer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@ -279,6 +285,8 @@ public class StandardVersionedComponentSynchronizerTest {
when(service.isActive()).thenReturn(false);
when(service.getProcessGroup()).thenReturn(group);
when(service.getState()).thenReturn(ControllerServiceState.DISABLED);
when(service.getBulletinLevel()).thenReturn(LogLevel.WARN);
when(service.getControllerServiceImplementation()).thenReturn(new TestControllerService());
return service;
}
@ -304,6 +312,16 @@ public class StandardVersionedComponentSynchronizerTest {
return port;
}
private ProcessGroup createMockProcessGroup() {
final ProcessGroup processGroup = mock(ProcessGroup.class);
when(processGroup.getIdentifier()).thenReturn("processGroup");
when(processGroup.getPosition()).thenReturn(new org.apache.nifi.connectable.Position(0, 0));
when(processGroup.getFlowFileConcurrency()).thenReturn(FlowFileConcurrency.UNBOUNDED);
when(processGroup.getFlowFileOutboundPolicy()).thenReturn(FlowFileOutboundPolicy.BATCH_OUTPUT);
when(processGroup.getExecutionEngine()).thenReturn(ExecutionEngine.STANDARD);
return processGroup;
}
private Connection createMockConnection(final Connectable source, final Connectable destination, final ProcessGroup group) {
final String uuid = UUID.randomUUID().toString();
@ -342,12 +360,7 @@ public class StandardVersionedComponentSynchronizerTest {
@Test
public void testSynchronizeProcessorAddedMigrated() {
final ProcessGroup processGroup = mock(ProcessGroup.class);
when(processGroup.getIdentifier()).thenReturn("processGroup");
when(processGroup.getPosition()).thenReturn(new org.apache.nifi.connectable.Position(0, 0));
when(processGroup.getFlowFileConcurrency()).thenReturn(FlowFileConcurrency.UNBOUNDED);
when(processGroup.getFlowFileOutboundPolicy()).thenReturn(FlowFileOutboundPolicy.BATCH_OUTPUT);
when(processGroup.getExecutionEngine()).thenReturn(ExecutionEngine.STANDARD);
final ProcessGroup processGroup = createMockProcessGroup();
final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
rootGroup.setIdentifier("rootGroup");
@ -377,6 +390,51 @@ public class StandardVersionedComponentSynchronizerTest {
assertEquals(ENCODED_TEXT, propertyValue);
}
@Test
public void testAddProcessorWithServiceAndMigration() {
final ProcessGroup processGroup = createMockProcessGroup();
final ProcessorNode processorNode = createMockProcessor();
when(processorNode.getProcessGroup()).thenReturn(processGroup);
when(processorNode.getRawPropertyValues()).thenReturn(Collections.emptyMap());
final VersionedPropertyDescriptor versionedDescriptorCS = new VersionedPropertyDescriptor();
final PropertyDescriptor descriptorCS = new PropertyDescriptor.Builder().name("cs")
.identifiesControllerService(ControllerService.class).build();
versionedDescriptorCS.setName(descriptorCS.getName());
versionedDescriptorCS.setIdentifiesControllerService(true);
final Map<String, VersionedPropertyDescriptor> proposedDescriptors =
Map.of(versionedDescriptorCS.getName(), versionedDescriptorCS);
ControllerServiceNode controllerServiceNode = createMockControllerService();
when(flowManager.createControllerService(any(), any(), any(), anySet(), eq(true), eq(true), eq(null))).thenReturn(controllerServiceNode);
when(flowManager.createProcessor(any(), any(), any(), eq(true))).thenReturn(processorNode);
setReferences(controllerServiceNode, processorNode);
when(controllerServiceNode.getVersionedComponentId()).thenReturn(Optional.of("12345"));
final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
final Map<String, String> versionedProperties = Collections.singletonMap("cs", "12345");
versionedProcessor.setProperties(versionedProperties);
versionedProcessor.setPropertyDescriptors(proposedDescriptors);
final VersionedControllerService versionedControllerService = createMinimalVersionedControllerService();
final VersionedProcessGroup versionedGroup = new VersionedProcessGroup();
versionedGroup.setIdentifier("pg-v2");
versionedGroup.setProcessors(Set.of(versionedProcessor));
versionedGroup.setControllerServices(Set.of(versionedControllerService));
final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
externalFlow.setFlowContents(versionedGroup);
when(processGroup.getControllerServices(false)).thenReturn(Set.of(controllerServiceNode));
synchronizer.synchronize(processGroup, externalFlow, synchronizationOptions);
verify(processorNode).migrateConfiguration(propertiesCaptor.capture(), any());
final Map<String, String> migratedProperties = propertiesCaptor.getValue();
assertEquals(controllerServiceNode.getIdentifier(), migratedProperties.get("cs"));
}
@Test
public void testSynchronizeProcessorSensitiveDynamicProperties() throws FlowSynchronizationException, InterruptedException, TimeoutException {
final Map<String, String> versionedProperties = Collections.singletonMap(SENSITIVE_PROPERTY_NAME, ENCRYPTED_PROPERTY_VALUE);
@ -1386,4 +1444,35 @@ public class StandardVersionedComponentSynchronizerTest {
}
}
}
private static class TestControllerService implements ControllerService {
@Override
public Collection<ValidationResult> validate(ValidationContext context) {
return null;
}
@Override
public PropertyDescriptor getPropertyDescriptor(String name) {
return null;
}
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
}
@Override
public List<PropertyDescriptor> getPropertyDescriptors() {
return null;
}
@Override
public String getIdentifier() {
return null;
}
@Override
public void initialize(ControllerServiceInitializationContext context) throws InitializationException {
}
}
}