NIFI-12339 Fixed Property Decryption for Migrated Components (#8002)

- Updated StandardVersionedComponentSynchronizer to decrypt properties when creating extension references for subsequent migration
This commit is contained in:
David Handermann 2023-11-10 10:50:03 -06:00 committed by GitHub
parent 832d4455c1
commit dabdf94bf1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 123 additions and 68 deletions

View File

@ -126,6 +126,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -1202,7 +1203,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
} }
updateControllerService(newService, proposed, topLevelGroup); updateControllerService(newService, proposed, topLevelGroup);
createdExtensions.add(new CreatedExtension(newService, proposed.getProperties())); final Map<String, String> decryptedProperties = getDecryptedProperties(proposed.getProperties());
createdExtensions.add(new CreatedExtension(newService, decryptedProperties));
return newService; return newService;
} }
@ -1466,6 +1468,18 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
return fullPropertyMap; return fullPropertyMap;
} }
private Map<String, String> getDecryptedProperties(final Map<String, String> properties) {
final Map<String, String> decryptedProperties = new LinkedHashMap<>();
final PropertyDecryptor decryptor = syncOptions.getPropertyDecryptor();
properties.forEach((propertyName, propertyValue) -> {
final String propertyValueDecrypted = decrypt(propertyValue, decryptor);
decryptedProperties.put(propertyName, propertyValueDecrypted);
});
return decryptedProperties;
}
private static String decrypt(final String value, final PropertyDecryptor decryptor) { private static String decrypt(final String value, final PropertyDecryptor decryptor) {
if (isValueEncrypted(value)) { if (isValueEncrypted(value)) {
try { try {
@ -2388,7 +2402,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
destination.addProcessor(procNode); destination.addProcessor(procNode);
updateProcessor(procNode, proposed, topLevelGroup); updateProcessor(procNode, proposed, topLevelGroup);
createdExtensions.add(new CreatedExtension(procNode, proposed.getProperties())); final Map<String, String> decryptedProperties = getDecryptedProperties(proposed.getProperties());
createdExtensions.add(new CreatedExtension(procNode, decryptedProperties));
// Notify the processor node that the configuration (properties, e.g.) has been restored // Notify the processor node that the configuration (properties, e.g.) has been restored
final ProcessContext processContext = context.getProcessContextFactory().apply(procNode); final ProcessContext processContext = context.getProcessContextFactory().apply(procNode);
@ -3468,7 +3483,9 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final BundleCoordinate coordinate = toCoordinate(reportingTask.getBundle()); final BundleCoordinate coordinate = toCoordinate(reportingTask.getBundle());
final ReportingTaskNode taskNode = context.getFlowManager().createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false); final ReportingTaskNode taskNode = context.getFlowManager().createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false);
updateReportingTask(taskNode, reportingTask); updateReportingTask(taskNode, reportingTask);
createdExtensions.add(new CreatedExtension(taskNode, reportingTask.getProperties()));
final Map<String, String> decryptedProperties = getDecryptedProperties(reportingTask.getProperties());
createdExtensions.add(new CreatedExtension(taskNode, decryptedProperties));
return taskNode; return taskNode;
} }

View File

@ -79,7 +79,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.time.Duration; import java.time.Duration;
@ -102,6 +101,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.ENC_PREFIX;
import static org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer.ENC_SUFFIX;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -129,6 +130,12 @@ import static org.mockito.Mockito.when;
public class StandardVersionedComponentSynchronizerTest { public class StandardVersionedComponentSynchronizerTest {
private static final String ENCODED_TEXT = "ENCODED";
private static final String ENCRYPTED_PROPERTY_VALUE = "%s%s%s".formatted(ENC_PREFIX, ENCODED_TEXT, ENC_SUFFIX);
private static final String SENSITIVE_PROPERTY_NAME = "Access Token";
private ProcessorNode processorA; private ProcessorNode processorA;
private ProcessorNode processorB; private ProcessorNode processorB;
private Connection connectionAB; private Connection connectionAB;
@ -147,6 +154,8 @@ public class StandardVersionedComponentSynchronizerTest {
private BundleCoordinate bundleCoordinate; private BundleCoordinate bundleCoordinate;
private FlowManager flowManager; private FlowManager flowManager;
private final ArgumentCaptor<Map<String, String>> propertiesCaptor = ArgumentCaptor.captor();
private final Set<String> queuesWithData = Collections.synchronizedSet(new HashSet<>()); private final Set<String> queuesWithData = Collections.synchronizedSet(new HashSet<>());
private final Bundle bundle = new Bundle("group", "artifact", "version 1.0"); private final Bundle bundle = new Bundle("group", "artifact", "version 1.0");
@ -331,6 +340,68 @@ public class StandardVersionedComponentSynchronizerTest {
return connection; return connection;
} }
@Test
public void testSynchronizeProcessorAddedMigrated() {
final ProcessGroup processGroup = mock(ProcessGroup.class);
when(processGroup.getIdentifier()).thenReturn("processGroup");
when(processGroup.getPosition()).thenReturn(new org.apache.nifi.connectable.Position(0, 0));
when(processGroup.getFlowFileConcurrency()).thenReturn(FlowFileConcurrency.UNBOUNDED);
when(processGroup.getFlowFileOutboundPolicy()).thenReturn(FlowFileOutboundPolicy.BATCH_OUTPUT);
when(processGroup.getExecutionEngine()).thenReturn(ExecutionEngine.STANDARD);
final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
rootGroup.setIdentifier("rootGroup");
final Map<String, String> versionedProperties = Collections.singletonMap(SENSITIVE_PROPERTY_NAME, ENCRYPTED_PROPERTY_VALUE);
final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
versionedProcessor.setProperties(versionedProperties);
final ProcessorNode processorNode = createMockProcessor();
rootGroup.setProcessors(Set.of(versionedProcessor));
final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
externalFlow.setFlowContents(rootGroup);
when(flowManager.createProcessor(any(), any(), any(), eq(true))).thenReturn(processorNode);
synchronizer.synchronize(processGroup, externalFlow, synchronizationOptions);
verify(processGroup, times(0)).setParameterContext(any(ParameterContext.class));
assertSensitivePropertyDecrypted(processorNode);
verify(processorNode).migrateConfiguration(propertiesCaptor.capture(), any());
final Map<String, String> migratedProperties = propertiesCaptor.getValue();
final String propertyValue = migratedProperties.get(SENSITIVE_PROPERTY_NAME);
assertEquals(ENCODED_TEXT, propertyValue);
}
@Test
public void testSynchronizeProcessorSensitiveDynamicProperties() throws FlowSynchronizationException, InterruptedException, TimeoutException {
final Map<String, String> versionedProperties = Collections.singletonMap(SENSITIVE_PROPERTY_NAME, ENCRYPTED_PROPERTY_VALUE);
final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
versionedProcessor.setProperties(versionedProperties);
synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);
assertSensitivePropertyDecrypted(processorA);
}
@Test
public void testSynchronizeControllerServiceSensitiveDynamicProperties() throws FlowSynchronizationException, InterruptedException, TimeoutException {
final Map<String, String> versionedProperties = Collections.singletonMap(SENSITIVE_PROPERTY_NAME, ENCRYPTED_PROPERTY_VALUE);
final VersionedControllerService versionedControllerService = createMinimalVersionedControllerService();
versionedControllerService.setProperties(versionedProperties);
final ControllerServiceNode serviceNode = createMockControllerService();
synchronizer.synchronize(serviceNode, versionedControllerService, group, synchronizationOptions);
assertSensitivePropertyDecrypted(serviceNode);
}
@Test @Test
public void testSynchronizeStoppedProcessor() throws FlowSynchronizationException, TimeoutException, InterruptedException { public void testSynchronizeStoppedProcessor() throws FlowSynchronizationException, TimeoutException, InterruptedException {
final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor(); final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
@ -533,13 +604,6 @@ public class StandardVersionedComponentSynchronizerTest {
scheduledStateChangeListener.assertNumProcessorUpdates(processors.length); scheduledStateChangeListener.assertNumProcessorUpdates(processors.length);
} }
private void verifyCallbackIndicatedStartOnly(final ProcessorNode... processors) {
for (final ProcessorNode processor : processors) {
scheduledStateChangeListener.assertProcessorUpdates(new ScheduledStateUpdate<>(processor, org.apache.nifi.controller.ScheduledState.RUNNING));
}
scheduledStateChangeListener.assertNumProcessorUpdates(processors.length);
}
@Test @Test
public void testConnectionRemoval() throws FlowSynchronizationException, TimeoutException { public void testConnectionRemoval() throws FlowSynchronizationException, TimeoutException {
startProcessor(processorA); startProcessor(processorA);
@ -560,9 +624,7 @@ public class StandardVersionedComponentSynchronizerTest {
synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION); synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
assertThrows(FlowSynchronizationException.class, () -> { assertThrows(FlowSynchronizationException.class, () -> synchronizer.synchronize(connectionAB, null, group, synchronizationOptions));
synchronizer.synchronize(connectionAB, null, group, synchronizationOptions);
});
// Ensure that the update occurred // Ensure that the update occurred
verify(connectionAB, times(0)).setName("Hello"); verify(connectionAB, times(0)).setName("Hello");
@ -583,12 +645,10 @@ public class StandardVersionedComponentSynchronizerTest {
// Use a background thread to synchronize the connection. // Use a background thread to synchronize the connection.
final CountDownLatch completionLatch = new CountDownLatch(1); final CountDownLatch completionLatch = new CountDownLatch(1);
final Thread syncThread = new Thread(() -> { final Thread syncThread = new Thread(() -> assertDoesNotThrow(() -> {
assertDoesNotThrow(() -> { synchronizer.synchronize(connectionAB, null, group, synchronizationOptions);
synchronizer.synchronize(connectionAB, null, group, synchronizationOptions); completionLatch.countDown();
completionLatch.countDown(); }));
});
});
syncThread.start(); syncThread.start();
// Wait up to 1/2 second to ensure that the task does not complete. // Wait up to 1/2 second to ensure that the task does not complete.
@ -670,9 +730,7 @@ public class StandardVersionedComponentSynchronizerTest {
public void testRemoveOutputPortFailsIfIncomingConnection() { public void testRemoveOutputPortFailsIfIncomingConnection() {
createMockConnection(processorA, outputPort, group); createMockConnection(processorA, outputPort, group);
assertThrows(FlowSynchronizationException.class, () -> { assertThrows(FlowSynchronizationException.class, () -> synchronizer.synchronize(outputPort, null, group, synchronizationOptions));
synchronizer.synchronize(outputPort, null, group, synchronizationOptions);
});
} }
@Test @Test
@ -687,15 +745,11 @@ public class StandardVersionedComponentSynchronizerTest {
queuesWithData.add(connection.getIdentifier()); queuesWithData.add(connection.getIdentifier());
// Ensure that we fail to remove it due to FlowSynchronizationException because destination of connection is not running // Ensure that we fail to remove it due to FlowSynchronizationException because destination of connection is not running
assertThrows(FlowSynchronizationException.class, () -> { assertThrows(FlowSynchronizationException.class, () -> synchronizer.synchronize(inputPort, null, group, synchronizationOptions));
synchronizer.synchronize(inputPort, null, group, synchronizationOptions);
});
// Start processor and ensure that we fail to remove it due to TimeoutException because destination of connection is now running // Start processor and ensure that we fail to remove it due to TimeoutException because destination of connection is now running
startProcessor(processorA); startProcessor(processorA);
assertThrows(TimeoutException.class, () -> { assertThrows(TimeoutException.class, () -> synchronizer.synchronize(inputPort, null, group, synchronizationOptions));
synchronizer.synchronize(inputPort, null, group, synchronizationOptions);
});
} }
@Test @Test
@ -707,9 +761,6 @@ public class StandardVersionedComponentSynchronizerTest {
verify(controllerServiceNode).setName(eq(versionedService.getName())); verify(controllerServiceNode).setName(eq(versionedService.getName()));
} }
public static class MapStringString extends HashMap<String, String> {
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testExternalControllerServiceReferenceRemoved() throws FlowSynchronizationException, InterruptedException, TimeoutException { public void testExternalControllerServiceReferenceRemoved() throws FlowSynchronizationException, InterruptedException, TimeoutException {
@ -826,13 +877,10 @@ public class StandardVersionedComponentSynchronizerTest {
verify(controllerServiceProvider).unscheduleReferencingComponents(service); verify(controllerServiceProvider).unscheduleReferencingComponents(service);
verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service)); verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service));
Mockito.doAnswer(new Answer<Void>() { Mockito.doAnswer((Answer<Void>) invocationOnMock -> {
@Override final Set<?> services = invocationOnMock.getArgument(0);
public Void answer(final InvocationOnMock invocationOnMock) { assertTrue(services.isEmpty());
final Set<?> services = invocationOnMock.getArgument(0); return null;
assertTrue(services.isEmpty());
return null;
}
}).when(controllerServiceProvider).enableControllerServicesAsync(Mockito.anySet()); }).when(controllerServiceProvider).enableControllerServicesAsync(Mockito.anySet());
verify(controllerServiceProvider, times(0)).scheduleReferencingComponents(Mockito.any(ControllerServiceNode.class), Mockito.anySet(), Mockito.any(ComponentScheduler.class)); verify(controllerServiceProvider, times(0)).scheduleReferencingComponents(Mockito.any(ControllerServiceNode.class), Mockito.anySet(), Mockito.any(ComponentScheduler.class));
@ -937,9 +985,7 @@ public class StandardVersionedComponentSynchronizerTest {
synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION); synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
when(parameterReferenceManager.getProcessorsReferencing(existing, "abc")).thenReturn(Collections.singleton(processorA)); when(parameterReferenceManager.getProcessorsReferencing(existing, "abc")).thenReturn(Collections.singleton(processorA));
assertThrows(TimeoutException.class, () -> { assertThrows(TimeoutException.class, () -> synchronizer.synchronize(existing, proposed, synchronizationOptions));
synchronizer.synchronize(existing, proposed, synchronizationOptions);
});
// Updates should not occur. // Updates should not occur.
assertEquals("xyz", existing.getParameter("abc").get().getValue()); assertEquals("xyz", existing.getParameter("abc").get().getValue());
@ -1035,9 +1081,7 @@ public class StandardVersionedComponentSynchronizerTest {
when(parameterReferenceManager.getProcessorsReferencing(existing, "abc")).thenReturn(Collections.emptySet()); when(parameterReferenceManager.getProcessorsReferencing(existing, "abc")).thenReturn(Collections.emptySet());
when(parameterReferenceManager.getControllerServicesReferencing(existing, "abc")).thenReturn(Collections.singleton(service)); when(parameterReferenceManager.getControllerServicesReferencing(existing, "abc")).thenReturn(Collections.singleton(service));
assertThrows(TimeoutException.class, () -> { assertThrows(TimeoutException.class, () -> synchronizer.synchronize(existing, proposed, synchronizationOptions));
synchronizer.synchronize(existing, proposed, synchronizationOptions);
});
// Updates should not occur. // Updates should not occur.
assertEquals("xyz", existing.getParameter("abc").get().getValue()); assertEquals("xyz", existing.getParameter("abc").get().getValue());
@ -1284,32 +1328,26 @@ public class StandardVersionedComponentSynchronizerTest {
return versionedPort; return versionedPort;
} }
private class ScheduledStateUpdate<T> { private void assertSensitivePropertyDecrypted(final ComponentNode componentNode) {
private T component; verify(componentNode).setProperties(propertiesCaptor.capture(), eq(true), eq(Collections.emptySet()));
private org.apache.nifi.controller.ScheduledState state;
public ScheduledStateUpdate(T component, org.apache.nifi.controller.ScheduledState state) { final Map<String, String> appliedProperties = propertiesCaptor.getValue();
this.component = component; final String appliedSensitivePropertyValue = appliedProperties.get(SENSITIVE_PROPERTY_NAME);
this.state = state; assertEquals(ENCODED_TEXT, appliedSensitivePropertyValue);
}
} }
private class ControllerServiceStateUpdate { private record ScheduledStateUpdate<T>(T component, org.apache.nifi.controller.ScheduledState state) {
private ControllerServiceNode controllerService; }
private ControllerServiceState state;
public ControllerServiceStateUpdate(ControllerServiceNode controllerService, ControllerServiceState state) { private record ControllerServiceStateUpdate(ControllerServiceNode controllerService, ControllerServiceState state) {
this.controllerService = controllerService;
this.state = state;
}
} }
private class CapturingScheduledStateChangeListener implements ScheduledStateChangeListener { private class CapturingScheduledStateChangeListener implements ScheduledStateChangeListener {
private List<ScheduledStateUpdate<ProcessorNode>> processorUpdates = new ArrayList<>(); private final List<ScheduledStateUpdate<ProcessorNode>> processorUpdates = new ArrayList<>();
private List<ScheduledStateUpdate<Port>> portUpdates = new ArrayList<>(); private final List<ScheduledStateUpdate<Port>> portUpdates = new ArrayList<>();
private List<ControllerServiceStateUpdate> serviceUpdates = new ArrayList<>(); private final List<ControllerServiceStateUpdate> serviceUpdates = new ArrayList<>();
private List<ScheduledStateUpdate<ReportingTaskNode>> reportingTaskUpdates = new ArrayList<>(); private final List<ScheduledStateUpdate<ReportingTaskNode>> reportingTaskUpdates = new ArrayList<>();
@Override @Override
public void onScheduledStateChange(final ProcessorNode processor, final ScheduledState intendedState) { public void onScheduledStateChange(final ProcessorNode processor, final ScheduledState intendedState) {
@ -1347,10 +1385,5 @@ public class StandardVersionedComponentSynchronizerTest {
} }
} }
} }
void assertNumPortUpdates(int expectedNum) {
assertEquals(expectedNum, portUpdates.size(),
"Expected " + expectedNum + " port state changes");
}
} }
} }

View File

@ -232,6 +232,11 @@ public abstract class AbstractComponentNode implements ComponentNode {
return false; return false;
} }
/**
* Overwrite current Component properties using provided values with values decrypted when necessary by the caller
*
* @param properties Map of Property Name to Value
*/
protected void overwriteProperties(final Map<String, String> properties) { protected void overwriteProperties(final Map<String, String> properties) {
// Update properties. // Update properties.
final Map<String, String> updatedProperties = new HashMap<>(properties); final Map<String, String> updatedProperties = new HashMap<>(properties);