NIFI-11047 - Issue when upgrading version of dataflow with external service

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6849
This commit is contained in:
Paul Grey 2023-01-13 12:19:46 -05:00 committed by Matthew Burgess
parent cd3323a8a7
commit 567e95aa28
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
2 changed files with 60 additions and 1 deletions

View File

@ -1340,7 +1340,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
|| (versionedDescriptor != null && versionedDescriptor.isSensitive());
String value;
if (descriptor != null && referencesService) {
if (descriptor != null && referencesService && (proposedProperties.get(propertyName) != null)) {
// Need to determine if the component's property descriptor for this service is already set to an id
// of an existing service that is outside the current processor group, and if it is we want to leave
// the property set to that value

View File

@ -24,7 +24,9 @@ import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.flow.FlowManager;
@ -47,6 +49,7 @@ import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.groups.ComponentIdGenerator;
import org.apache.nifi.groups.ComponentScheduler;
import org.apache.nifi.groups.FlowSynchronizationOptions;
@ -69,6 +72,7 @@ import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -96,6 +100,7 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.AdditionalMatchers.or;
@ -111,6 +116,7 @@ import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -665,6 +671,59 @@ public class StandardVersionedComponentSynchronizerTest {
verify(controllerServiceNode).setName(eq(versionedService.getName()));
}
public static class MapStringString extends HashMap<String, String> {
}
@Test
public void testExternalControllerServiceReferenceRemoved() throws FlowSynchronizationException, InterruptedException, TimeoutException {
final PropertyDescriptor descriptorB = new PropertyDescriptor.Builder().name("b").build();
final PropertyDescriptor descriptorCS = new PropertyDescriptor.Builder().name("cs")
.identifiesControllerService(ControllerService.class).build();
final Map<PropertyDescriptor, String> rawPropertyValues = new HashMap<>();
rawPropertyValues.put(descriptorB, descriptorB.getName());
rawPropertyValues.put(descriptorCS, descriptorCS.getName());
final VersionedPropertyDescriptor versionedDescriptorB = new VersionedPropertyDescriptor();
final VersionedPropertyDescriptor versionedDescriptorCS = new VersionedPropertyDescriptor();
versionedDescriptorB.setName(descriptorB.getName());
versionedDescriptorCS.setName(descriptorCS.getName());
final Map<String, VersionedPropertyDescriptor> proposedDescriptors = new HashMap<>();
proposedDescriptors.put(versionedDescriptorB.getName(), versionedDescriptorB);
proposedDescriptors.put(versionedDescriptorCS.getName(), versionedDescriptorCS);
final Map<PropertyDescriptor, PropertyConfiguration> propertiesBefore = new HashMap<>();
propertiesBefore.put(descriptorB, new PropertyConfiguration("originalB", null, null, null));
propertiesBefore.put(descriptorCS, new PropertyConfiguration("originalCS", null, null, null));
final ProcessorNode processorNode = createMockProcessor();
when(processorNode.getPropertyDescriptor(eq("b"))).thenReturn(descriptorB);
when(processorNode.getPropertyDescriptor(eq("cs"))).thenReturn(descriptorCS);
when(processorNode.getProperties()).thenReturn(propertiesBefore);
when(processorNode.getRawPropertyValues()).thenReturn(rawPropertyValues);
when(processorNode.getEffectivePropertyValue(eq(descriptorB))).thenReturn("originalB");
when(processorNode.getEffectivePropertyValue(eq(descriptorCS))).thenReturn("originalCS");
final ProcessGroup processGroup = processorNode.getProcessGroup();
final ProcessGroup processGroupParent = mock(ProcessGroup.class);
final ControllerServiceNode controllerServiceNode = createMockControllerService();
when(processGroup.getParent()).thenReturn(processGroupParent);
when(processGroupParent.findControllerService(any(), eq(false), eq(true))).thenReturn(controllerServiceNode);
final Map<String, String> proposedProperties = new HashMap<>();
proposedProperties.put("b", "updateB");
final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
versionedProcessor.setPropertyDescriptors(proposedDescriptors);
versionedProcessor.setProperties(proposedProperties);
final ArgumentCaptor<MapStringString> captorProperties = ArgumentCaptor.forClass(MapStringString.class);
synchronizer.synchronize(processorNode, versionedProcessor, group, synchronizationOptions);
verify(processorNode).setProperties(captorProperties.capture(), anyBoolean(), any());
final Map<String, String> properties = captorProperties.getValue();
assertEquals("updateB", properties.get("b"));
assertNull(properties.get("cs"));
}
@Test
public void testControllerServiceRemoved() throws FlowSynchronizationException, InterruptedException, TimeoutException {
final ControllerServiceNode service = createMockControllerService();