mirror of https://github.com/apache/nifi.git
NIFI-13659 Call migrateProperties on components changed in synchronization (#9201)
Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
5c8753fe15
commit
71face8ea7
|
@ -154,7 +154,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
|
|||
|
||||
private final VersionedFlowSynchronizationContext context;
|
||||
private final Set<String> updatedVersionedComponentIds = new HashSet<>();
|
||||
private final List<CreatedExtension> createdExtensions = new ArrayList<>();
|
||||
private final List<CreatedOrModifiedExtension> createdAndModifiedExtensions = new ArrayList<>();
|
||||
|
||||
private FlowSynchronizationOptions syncOptions;
|
||||
private final ConnectableAdditionTracker connectableAdditionTracker = new ConnectableAdditionTracker();
|
||||
|
@ -181,7 +181,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
|
|||
final FlowComparison flowComparison = flowComparator.compare();
|
||||
|
||||
updatedVersionedComponentIds.clear();
|
||||
createdExtensions.clear();
|
||||
createdAndModifiedExtensions.clear();
|
||||
setSynchronizationOptions(options);
|
||||
|
||||
for (final FlowDifference diff : flowComparison.getDifferences()) {
|
||||
|
@ -255,9 +255,9 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
|
|||
}
|
||||
});
|
||||
|
||||
for (final CreatedExtension createdExtension : createdExtensions) {
|
||||
final ComponentNode extension = createdExtension.extension();
|
||||
final Map<String, String> originalPropertyValues = createdExtension.propertyValues();
|
||||
for (final CreatedOrModifiedExtension createdOrModifiedExtension : createdAndModifiedExtensions) {
|
||||
final ComponentNode extension = createdOrModifiedExtension.extension();
|
||||
final Map<String, String> originalPropertyValues = createdOrModifiedExtension.propertyValues();
|
||||
|
||||
final ControllerServiceFactory serviceFactory = new StandardControllerServiceFactory(context.getExtensionManager(), context.getFlowManager(),
|
||||
context.getControllerServiceProvider(), extension);
|
||||
|
@ -603,6 +603,9 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
|
|||
|
||||
if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
|
||||
updateControllerService(service, proposedService, topLevelGroup);
|
||||
// Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state,
|
||||
// so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state
|
||||
createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service)));
|
||||
LOG.info("Updated {}", service);
|
||||
}
|
||||
}
|
||||
|
@ -1050,6 +1053,9 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
|
|||
LOG.info("Added {} to {}", added, group);
|
||||
} else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
|
||||
updateProcessor(processor, proposedProcessor, topLevelGroup);
|
||||
// Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state,
|
||||
// so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state
|
||||
createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
|
||||
LOG.info("Updated {}", processor);
|
||||
} else {
|
||||
processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
|
||||
|
@ -1239,7 +1245,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
|
|||
}
|
||||
|
||||
final Map<String, String> decryptedProperties = getDecryptedProperties(proposed.getProperties());
|
||||
createdExtensions.add(new CreatedExtension(newService, decryptedProperties));
|
||||
createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(newService, decryptedProperties));
|
||||
|
||||
updateControllerService(newService, proposed, topLevelGroup);
|
||||
|
||||
|
@ -1473,8 +1479,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
|
|||
|
||||
// Find the same property descriptor in the component's CreatedExtension and replace it with the
|
||||
// instance ID of the service
|
||||
createdExtensions.stream().filter(ce -> ce.extension.equals(componentNode)).forEach(createdExtension -> {
|
||||
createdExtension.propertyValues.replace(propertyName, value);
|
||||
createdAndModifiedExtensions.stream().filter(ce -> ce.extension.equals(componentNode)).forEach(createdOrModifiedExtension -> {
|
||||
createdOrModifiedExtension.propertyValues.replace(propertyName, value);
|
||||
});
|
||||
} else {
|
||||
value = existingExternalServiceId;
|
||||
|
@ -2474,7 +2480,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
|
|||
destination.addProcessor(procNode);
|
||||
|
||||
final Map<String, String> decryptedProperties = getDecryptedProperties(proposed.getProperties());
|
||||
createdExtensions.add(new CreatedExtension(procNode, decryptedProperties));
|
||||
createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(procNode, decryptedProperties));
|
||||
|
||||
updateProcessor(procNode, proposed, topLevelGroup);
|
||||
|
||||
|
@ -3545,6 +3551,9 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
|
|||
LOG.info("Successfully synchronized {} by adding it to the flow", added);
|
||||
} else {
|
||||
updateReportingTask(reportingTask, proposed);
|
||||
// Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state,
|
||||
// so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state
|
||||
createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(reportingTask, getPropertyValues(reportingTask)));
|
||||
LOG.info("Successfully synchronized {} by updating it to match proposed version", reportingTask);
|
||||
}
|
||||
} finally {
|
||||
|
@ -3558,7 +3567,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
|
|||
updateReportingTask(taskNode, reportingTask);
|
||||
|
||||
final Map<String, String> decryptedProperties = getDecryptedProperties(reportingTask.getProperties());
|
||||
createdExtensions.add(new CreatedExtension(taskNode, decryptedProperties));
|
||||
createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(taskNode, decryptedProperties));
|
||||
|
||||
return taskNode;
|
||||
}
|
||||
|
@ -3803,7 +3812,17 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
|
|||
return getVersionedControllerService(group.getParent(), versionedComponentId);
|
||||
}
|
||||
|
||||
private record CreatedExtension(ComponentNode extension, Map<String, String> propertyValues) {
|
||||
private Map<String, String> getPropertyValues(final ComponentNode componentNode) {
|
||||
final Map<String, String> propertyValues = new HashMap<>();
|
||||
if (componentNode.getRawPropertyValues() != null) {
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : componentNode.getRawPropertyValues().entrySet()) {
|
||||
propertyValues.put(entry.getKey().getName(), entry.getValue());
|
||||
}
|
||||
}
|
||||
return propertyValues;
|
||||
}
|
||||
|
||||
private record CreatedOrModifiedExtension(ComponentNode extension, Map<String, String> propertyValues) {
|
||||
}
|
||||
|
||||
private record ParameterValueAndReferences(String value, List<String> assetIds) { }
|
||||
|
|
Loading…
Reference in New Issue