diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index d9b6d81424..af3cf4931e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -126,6 +126,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -1202,7 +1203,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } updateControllerService(newService, proposed, topLevelGroup); - createdExtensions.add(new CreatedExtension(newService, proposed.getProperties())); + final Map decryptedProperties = getDecryptedProperties(proposed.getProperties()); + createdExtensions.add(new CreatedExtension(newService, decryptedProperties)); return newService; } @@ -1466,6 +1468,18 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen return fullPropertyMap; } + private Map getDecryptedProperties(final Map properties) { + final Map decryptedProperties = new LinkedHashMap<>(); + + final PropertyDecryptor decryptor = syncOptions.getPropertyDecryptor(); + properties.forEach((propertyName, propertyValue) -> { + final String propertyValueDecrypted = decrypt(propertyValue, decryptor); + decryptedProperties.put(propertyName, propertyValueDecrypted); + }); + + return decryptedProperties; + } + private static String decrypt(final String value, final PropertyDecryptor decryptor) { if (isValueEncrypted(value)) { try { @@ -2388,7 +2402,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen destination.addProcessor(procNode); updateProcessor(procNode, proposed, topLevelGroup); - createdExtensions.add(new CreatedExtension(procNode, proposed.getProperties())); + final Map decryptedProperties = getDecryptedProperties(proposed.getProperties()); + createdExtensions.add(new CreatedExtension(procNode, decryptedProperties)); // Notify the processor node that the configuration (properties, e.g.) has been restored final ProcessContext processContext = context.getProcessContextFactory().apply(procNode); @@ -3468,7 +3483,9 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final BundleCoordinate coordinate = toCoordinate(reportingTask.getBundle()); final ReportingTaskNode taskNode = context.getFlowManager().createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false); updateReportingTask(taskNode, reportingTask); - createdExtensions.add(new CreatedExtension(taskNode, reportingTask.getProperties())); + + final Map decryptedProperties = getDecryptedProperties(reportingTask.getProperties()); + createdExtensions.add(new CreatedExtension(taskNode, decryptedProperties)); return taskNode; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java index b663517bbf..2d8dcfc4e3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java @@ -79,7 +79,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.time.Duration; @@ -102,6 +101,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.ENC_PREFIX; +import static org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.ENC_SUFFIX; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -129,6 +130,12 @@ import static org.mockito.Mockito.when; public class StandardVersionedComponentSynchronizerTest { + private static final String ENCODED_TEXT = "ENCODED"; + + private static final String ENCRYPTED_PROPERTY_VALUE = "%s%s%s".formatted(ENC_PREFIX, ENCODED_TEXT, ENC_SUFFIX); + + private static final String SENSITIVE_PROPERTY_NAME = "Access Token"; + private ProcessorNode processorA; private ProcessorNode processorB; private Connection connectionAB; @@ -147,6 +154,8 @@ public class StandardVersionedComponentSynchronizerTest { private BundleCoordinate bundleCoordinate; private FlowManager flowManager; + private final ArgumentCaptor> propertiesCaptor = ArgumentCaptor.captor(); + private final Set queuesWithData = Collections.synchronizedSet(new HashSet<>()); private final Bundle bundle = new Bundle("group", "artifact", "version 1.0"); @@ -331,6 +340,68 @@ public class StandardVersionedComponentSynchronizerTest { return connection; } + @Test + public void testSynchronizeProcessorAddedMigrated() { + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(processGroup.getIdentifier()).thenReturn("processGroup"); + when(processGroup.getPosition()).thenReturn(new org.apache.nifi.connectable.Position(0, 0)); + when(processGroup.getFlowFileConcurrency()).thenReturn(FlowFileConcurrency.UNBOUNDED); + when(processGroup.getFlowFileOutboundPolicy()).thenReturn(FlowFileOutboundPolicy.BATCH_OUTPUT); + when(processGroup.getExecutionEngine()).thenReturn(ExecutionEngine.STANDARD); + + final VersionedProcessGroup rootGroup = new VersionedProcessGroup(); + rootGroup.setIdentifier("rootGroup"); + + final Map versionedProperties = Collections.singletonMap(SENSITIVE_PROPERTY_NAME, ENCRYPTED_PROPERTY_VALUE); + + final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor(); + versionedProcessor.setProperties(versionedProperties); + + final ProcessorNode processorNode = createMockProcessor(); + rootGroup.setProcessors(Set.of(versionedProcessor)); + + final VersionedExternalFlow externalFlow = new VersionedExternalFlow(); + externalFlow.setFlowContents(rootGroup); + + when(flowManager.createProcessor(any(), any(), any(), eq(true))).thenReturn(processorNode); + + synchronizer.synchronize(processGroup, externalFlow, synchronizationOptions); + verify(processGroup, times(0)).setParameterContext(any(ParameterContext.class)); + + assertSensitivePropertyDecrypted(processorNode); + + verify(processorNode).migrateConfiguration(propertiesCaptor.capture(), any()); + + final Map migratedProperties = propertiesCaptor.getValue(); + final String propertyValue = migratedProperties.get(SENSITIVE_PROPERTY_NAME); + assertEquals(ENCODED_TEXT, propertyValue); + } + + @Test + public void testSynchronizeProcessorSensitiveDynamicProperties() throws FlowSynchronizationException, InterruptedException, TimeoutException { + final Map versionedProperties = Collections.singletonMap(SENSITIVE_PROPERTY_NAME, ENCRYPTED_PROPERTY_VALUE); + + final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor(); + versionedProcessor.setProperties(versionedProperties); + + synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions); + + assertSensitivePropertyDecrypted(processorA); + } + + @Test + public void testSynchronizeControllerServiceSensitiveDynamicProperties() throws FlowSynchronizationException, InterruptedException, TimeoutException { + final Map versionedProperties = Collections.singletonMap(SENSITIVE_PROPERTY_NAME, ENCRYPTED_PROPERTY_VALUE); + + final VersionedControllerService versionedControllerService = createMinimalVersionedControllerService(); + versionedControllerService.setProperties(versionedProperties); + + final ControllerServiceNode serviceNode = createMockControllerService(); + synchronizer.synchronize(serviceNode, versionedControllerService, group, synchronizationOptions); + + assertSensitivePropertyDecrypted(serviceNode); + } + @Test public void testSynchronizeStoppedProcessor() throws FlowSynchronizationException, TimeoutException, InterruptedException { final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor(); @@ -533,13 +604,6 @@ public class StandardVersionedComponentSynchronizerTest { scheduledStateChangeListener.assertNumProcessorUpdates(processors.length); } - private void verifyCallbackIndicatedStartOnly(final ProcessorNode... processors) { - for (final ProcessorNode processor : processors) { - scheduledStateChangeListener.assertProcessorUpdates(new ScheduledStateUpdate<>(processor, org.apache.nifi.controller.ScheduledState.RUNNING)); - } - scheduledStateChangeListener.assertNumProcessorUpdates(processors.length); - } - @Test public void testConnectionRemoval() throws FlowSynchronizationException, TimeoutException { startProcessor(processorA); @@ -560,9 +624,7 @@ public class StandardVersionedComponentSynchronizerTest { synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION); - assertThrows(FlowSynchronizationException.class, () -> { - synchronizer.synchronize(connectionAB, null, group, synchronizationOptions); - }); + assertThrows(FlowSynchronizationException.class, () -> synchronizer.synchronize(connectionAB, null, group, synchronizationOptions)); // Ensure that the update occurred verify(connectionAB, times(0)).setName("Hello"); @@ -583,12 +645,10 @@ public class StandardVersionedComponentSynchronizerTest { // Use a background thread to synchronize the connection. final CountDownLatch completionLatch = new CountDownLatch(1); - final Thread syncThread = new Thread(() -> { - assertDoesNotThrow(() -> { - synchronizer.synchronize(connectionAB, null, group, synchronizationOptions); - completionLatch.countDown(); - }); - }); + final Thread syncThread = new Thread(() -> assertDoesNotThrow(() -> { + synchronizer.synchronize(connectionAB, null, group, synchronizationOptions); + completionLatch.countDown(); + })); syncThread.start(); // Wait up to 1/2 second to ensure that the task does not complete. @@ -670,9 +730,7 @@ public class StandardVersionedComponentSynchronizerTest { public void testRemoveOutputPortFailsIfIncomingConnection() { createMockConnection(processorA, outputPort, group); - assertThrows(FlowSynchronizationException.class, () -> { - synchronizer.synchronize(outputPort, null, group, synchronizationOptions); - }); + assertThrows(FlowSynchronizationException.class, () -> synchronizer.synchronize(outputPort, null, group, synchronizationOptions)); } @Test @@ -687,15 +745,11 @@ public class StandardVersionedComponentSynchronizerTest { queuesWithData.add(connection.getIdentifier()); // Ensure that we fail to remove it due to FlowSynchronizationException because destination of connection is not running - assertThrows(FlowSynchronizationException.class, () -> { - synchronizer.synchronize(inputPort, null, group, synchronizationOptions); - }); + assertThrows(FlowSynchronizationException.class, () -> synchronizer.synchronize(inputPort, null, group, synchronizationOptions)); // Start processor and ensure that we fail to remove it due to TimeoutException because destination of connection is now running startProcessor(processorA); - assertThrows(TimeoutException.class, () -> { - synchronizer.synchronize(inputPort, null, group, synchronizationOptions); - }); + assertThrows(TimeoutException.class, () -> synchronizer.synchronize(inputPort, null, group, synchronizationOptions)); } @Test @@ -707,9 +761,6 @@ public class StandardVersionedComponentSynchronizerTest { verify(controllerServiceNode).setName(eq(versionedService.getName())); } - public static class MapStringString extends HashMap { - } - @Test @SuppressWarnings("unchecked") public void testExternalControllerServiceReferenceRemoved() throws FlowSynchronizationException, InterruptedException, TimeoutException { @@ -826,13 +877,10 @@ public class StandardVersionedComponentSynchronizerTest { verify(controllerServiceProvider).unscheduleReferencingComponents(service); verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service)); - Mockito.doAnswer(new Answer() { - @Override - public Void answer(final InvocationOnMock invocationOnMock) { - final Set services = invocationOnMock.getArgument(0); - assertTrue(services.isEmpty()); - return null; - } + Mockito.doAnswer((Answer) invocationOnMock -> { + final Set services = invocationOnMock.getArgument(0); + assertTrue(services.isEmpty()); + return null; }).when(controllerServiceProvider).enableControllerServicesAsync(Mockito.anySet()); verify(controllerServiceProvider, times(0)).scheduleReferencingComponents(Mockito.any(ControllerServiceNode.class), Mockito.anySet(), Mockito.any(ComponentScheduler.class)); @@ -937,9 +985,7 @@ public class StandardVersionedComponentSynchronizerTest { synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION); when(parameterReferenceManager.getProcessorsReferencing(existing, "abc")).thenReturn(Collections.singleton(processorA)); - assertThrows(TimeoutException.class, () -> { - synchronizer.synchronize(existing, proposed, synchronizationOptions); - }); + assertThrows(TimeoutException.class, () -> synchronizer.synchronize(existing, proposed, synchronizationOptions)); // Updates should not occur. assertEquals("xyz", existing.getParameter("abc").get().getValue()); @@ -1035,9 +1081,7 @@ public class StandardVersionedComponentSynchronizerTest { when(parameterReferenceManager.getProcessorsReferencing(existing, "abc")).thenReturn(Collections.emptySet()); when(parameterReferenceManager.getControllerServicesReferencing(existing, "abc")).thenReturn(Collections.singleton(service)); - assertThrows(TimeoutException.class, () -> { - synchronizer.synchronize(existing, proposed, synchronizationOptions); - }); + assertThrows(TimeoutException.class, () -> synchronizer.synchronize(existing, proposed, synchronizationOptions)); // Updates should not occur. assertEquals("xyz", existing.getParameter("abc").get().getValue()); @@ -1284,32 +1328,26 @@ public class StandardVersionedComponentSynchronizerTest { return versionedPort; } - private class ScheduledStateUpdate { - private T component; - private org.apache.nifi.controller.ScheduledState state; + private void assertSensitivePropertyDecrypted(final ComponentNode componentNode) { + verify(componentNode).setProperties(propertiesCaptor.capture(), eq(true), eq(Collections.emptySet())); - public ScheduledStateUpdate(T component, org.apache.nifi.controller.ScheduledState state) { - this.component = component; - this.state = state; - } + final Map appliedProperties = propertiesCaptor.getValue(); + final String appliedSensitivePropertyValue = appliedProperties.get(SENSITIVE_PROPERTY_NAME); + assertEquals(ENCODED_TEXT, appliedSensitivePropertyValue); } - private class ControllerServiceStateUpdate { - private ControllerServiceNode controllerService; - private ControllerServiceState state; + private record ScheduledStateUpdate(T component, org.apache.nifi.controller.ScheduledState state) { + } - public ControllerServiceStateUpdate(ControllerServiceNode controllerService, ControllerServiceState state) { - this.controllerService = controllerService; - this.state = state; - } + private record ControllerServiceStateUpdate(ControllerServiceNode controllerService, ControllerServiceState state) { } private class CapturingScheduledStateChangeListener implements ScheduledStateChangeListener { - private List> processorUpdates = new ArrayList<>(); - private List> portUpdates = new ArrayList<>(); - private List serviceUpdates = new ArrayList<>(); - private List> reportingTaskUpdates = new ArrayList<>(); + private final List> processorUpdates = new ArrayList<>(); + private final List> portUpdates = new ArrayList<>(); + private final List serviceUpdates = new ArrayList<>(); + private final List> reportingTaskUpdates = new ArrayList<>(); @Override public void onScheduledStateChange(final ProcessorNode processor, final ScheduledState intendedState) { @@ -1347,10 +1385,5 @@ public class StandardVersionedComponentSynchronizerTest { } } } - - void assertNumPortUpdates(int expectedNum) { - assertEquals(expectedNum, portUpdates.size(), - "Expected " + expectedNum + " port state changes"); - } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java index 0b6d3bbd72..5f9e027f27 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java @@ -232,6 +232,11 @@ public abstract class AbstractComponentNode implements ComponentNode { return false; } + /** + * Overwrite current Component properties using provided values with values decrypted when necessary by the caller + * + * @param properties Map of Property Name to Value + */ protected void overwriteProperties(final Map properties) { // Update properties. final Map updatedProperties = new HashMap<>(properties);