From 7de74ad3f09a763b1da1a5778db204727dbfc11c Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Mon, 14 Nov 2022 09:09:44 -0500 Subject: [PATCH] NIFI-10802 Apply bundle updates before properties in component synchronizer (#6654) This closes #6654 --- ...tandardVersionedComponentSynchronizer.java | 40 ++++++++++++------- .../VersionedComponentSynchronizer.java | 3 +- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 30f663ad4e..8edf38bd4f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -41,6 +41,7 @@ import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalanceStrategy; +import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceState; @@ -1262,16 +1263,17 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen service.setBulletinLevel(LogLevel.WARN); } - final Set sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(service, proposed.getProperties(), proposed.getPropertyDescriptors().values()); - final Map properties = populatePropertiesMap(service, proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup()); - service.setProperties(properties, true, sensitiveDynamicPropertyNames); - if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) { final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); final List descriptors = new ArrayList<>(service.getRawPropertyValues().keySet()); final Set additionalUrls = service.getAdditionalClasspathResources(descriptors); context.getReloadComponent().reload(service, proposed.getType(), newBundleCoordinate, additionalUrls); } + + final Set sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(service, proposed.getProperties(), proposed.getPropertyDescriptors().values()); + final Map properties = populatePropertiesMap(service, proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup()); + service.setProperties(properties, true, sensitiveDynamicPropertyNames); + } finally { service.resumeValidationTrigger(); } @@ -2715,6 +2717,13 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen processor.setName(proposed.getName()); processor.setPenalizationPeriod(proposed.getPenaltyDuration()); + if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) { + final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); + final List descriptors = new ArrayList<>(processor.getProperties().keySet()); + final Set additionalUrls = processor.getAdditionalClasspathResources(descriptors); + context.getReloadComponent().reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls); + } + final Set sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(processor, proposed.getProperties(), proposed.getPropertyDescriptors().values()); final Map properties = populatePropertiesMap(processor, proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup()); processor.setProperties(properties, true, sensitiveDynamicPropertyNames); @@ -2753,13 +2762,6 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen // Transition state to disabled/enabled/running context.getComponentScheduler().transitionComponentState(processor, proposed.getScheduledState()); notifyScheduledStateChange((ComponentNode) processor, syncOptions, proposed.getScheduledState()); - - if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) { - final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); - final List descriptors = new ArrayList<>(processor.getProperties().keySet()); - final Set additionalUrls = processor.getAdditionalClasspathResources(descriptors); - context.getReloadComponent().reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls); - } } finally { processor.resumeValidationTrigger(); } @@ -3413,7 +3415,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen @Override public void synchronize(final ReportingTaskNode reportingTask, final VersionedReportingTask proposed, final FlowSynchronizationOptions synchronizationOptions) - throws FlowSynchronizationException, TimeoutException, InterruptedException { + throws FlowSynchronizationException, TimeoutException, InterruptedException, ReportingTaskInstantiationException { if (reportingTask == null && proposed == null) { return; @@ -3442,14 +3444,15 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } } - private ReportingTaskNode addReportingTask(final VersionedReportingTask reportingTask) { + private ReportingTaskNode addReportingTask(final VersionedReportingTask reportingTask) throws ReportingTaskInstantiationException { final BundleCoordinate coordinate = toCoordinate(reportingTask.getBundle()); final ReportingTaskNode taskNode = context.getFlowManager().createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false); updateReportingTask(taskNode, reportingTask); return taskNode; } - private void updateReportingTask(final ReportingTaskNode reportingTask, final VersionedReportingTask proposed) { + private void updateReportingTask(final ReportingTaskNode reportingTask, final VersionedReportingTask proposed) + throws ReportingTaskInstantiationException { LOG.debug("Updating Reporting Task {}", reportingTask); reportingTask.pauseValidationTrigger(); @@ -3458,8 +3461,15 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen reportingTask.setComments(proposed.getComments()); reportingTask.setSchedulingPeriod(proposed.getSchedulingPeriod()); reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy())); - reportingTask.setAnnotationData(proposed.getAnnotationData()); + + if (!isEqual(reportingTask.getBundleCoordinate(), proposed.getBundle())) { + final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); + final List descriptors = new ArrayList<>(reportingTask.getProperties().keySet()); + final Set additionalUrls = reportingTask.getAdditionalClasspathResources(descriptors); + context.getReloadComponent().reload(reportingTask, proposed.getType(), newBundleCoordinate, additionalUrls); + } + final Set sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(reportingTask, proposed.getProperties(), proposed.getPropertyDescriptors().values()); reportingTask.setProperties(proposed.getProperties(), false, sensitiveDynamicPropertyNames); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedComponentSynchronizer.java index 04b2491cd2..dbe7f0c6f7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedComponentSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedComponentSynchronizer.java @@ -24,6 +24,7 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedControllerService; @@ -171,7 +172,7 @@ public interface VersionedComponentSynchronizer { * @throws TimeoutException if the reporting task is being removed and takes longer to stop than the timeout allowed by the {@link FlowSynchronizationOptions synchronization options}. */ void synchronize(ReportingTaskNode reportingTask, VersionedReportingTask proposed, FlowSynchronizationOptions synchronizationOptions) - throws FlowSynchronizationException, TimeoutException, InterruptedException; + throws FlowSynchronizationException, TimeoutException, InterruptedException, ReportingTaskInstantiationException; /**