NIFI-10802 Apply bundle updates before properties in component synchronizer (#6654)

This closes #6654
This commit is contained in:
Bryan Bende 2022-11-14 09:09:44 -05:00 committed by GitHub
parent 5fb8cf89f0
commit 7de74ad3f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 27 additions and 16 deletions

View File

@ -41,6 +41,7 @@ import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.ControllerServiceState;
@ -1262,16 +1263,17 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
service.setBulletinLevel(LogLevel.WARN); service.setBulletinLevel(LogLevel.WARN);
} }
final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(service, proposed.getProperties(), proposed.getPropertyDescriptors().values());
final Map<String, String> properties = populatePropertiesMap(service, proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup());
service.setProperties(properties, true, sensitiveDynamicPropertyNames);
if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) { if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
final List<PropertyDescriptor> descriptors = new ArrayList<>(service.getRawPropertyValues().keySet()); final List<PropertyDescriptor> descriptors = new ArrayList<>(service.getRawPropertyValues().keySet());
final Set<URL> additionalUrls = service.getAdditionalClasspathResources(descriptors); final Set<URL> additionalUrls = service.getAdditionalClasspathResources(descriptors);
context.getReloadComponent().reload(service, proposed.getType(), newBundleCoordinate, additionalUrls); context.getReloadComponent().reload(service, proposed.getType(), newBundleCoordinate, additionalUrls);
} }
final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(service, proposed.getProperties(), proposed.getPropertyDescriptors().values());
final Map<String, String> properties = populatePropertiesMap(service, proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup());
service.setProperties(properties, true, sensitiveDynamicPropertyNames);
} finally { } finally {
service.resumeValidationTrigger(); service.resumeValidationTrigger();
} }
@ -2715,6 +2717,13 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
processor.setName(proposed.getName()); processor.setName(proposed.getName());
processor.setPenalizationPeriod(proposed.getPenaltyDuration()); processor.setPenalizationPeriod(proposed.getPenaltyDuration());
if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
final Set<URL> additionalUrls = processor.getAdditionalClasspathResources(descriptors);
context.getReloadComponent().reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls);
}
final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(processor, proposed.getProperties(), proposed.getPropertyDescriptors().values()); final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(processor, proposed.getProperties(), proposed.getPropertyDescriptors().values());
final Map<String, String> properties = populatePropertiesMap(processor, proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup()); final Map<String, String> properties = populatePropertiesMap(processor, proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup());
processor.setProperties(properties, true, sensitiveDynamicPropertyNames); processor.setProperties(properties, true, sensitiveDynamicPropertyNames);
@ -2753,13 +2762,6 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
// Transition state to disabled/enabled/running // Transition state to disabled/enabled/running
context.getComponentScheduler().transitionComponentState(processor, proposed.getScheduledState()); context.getComponentScheduler().transitionComponentState(processor, proposed.getScheduledState());
notifyScheduledStateChange((ComponentNode) processor, syncOptions, proposed.getScheduledState()); notifyScheduledStateChange((ComponentNode) processor, syncOptions, proposed.getScheduledState());
if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
final Set<URL> additionalUrls = processor.getAdditionalClasspathResources(descriptors);
context.getReloadComponent().reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls);
}
} finally { } finally {
processor.resumeValidationTrigger(); processor.resumeValidationTrigger();
} }
@ -3413,7 +3415,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
@Override @Override
public void synchronize(final ReportingTaskNode reportingTask, final VersionedReportingTask proposed, final FlowSynchronizationOptions synchronizationOptions) public void synchronize(final ReportingTaskNode reportingTask, final VersionedReportingTask proposed, final FlowSynchronizationOptions synchronizationOptions)
throws FlowSynchronizationException, TimeoutException, InterruptedException { throws FlowSynchronizationException, TimeoutException, InterruptedException, ReportingTaskInstantiationException {
if (reportingTask == null && proposed == null) { if (reportingTask == null && proposed == null) {
return; return;
@ -3442,14 +3444,15 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
} }
} }
private ReportingTaskNode addReportingTask(final VersionedReportingTask reportingTask) { private ReportingTaskNode addReportingTask(final VersionedReportingTask reportingTask) throws ReportingTaskInstantiationException {
final BundleCoordinate coordinate = toCoordinate(reportingTask.getBundle()); final BundleCoordinate coordinate = toCoordinate(reportingTask.getBundle());
final ReportingTaskNode taskNode = context.getFlowManager().createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false); final ReportingTaskNode taskNode = context.getFlowManager().createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false);
updateReportingTask(taskNode, reportingTask); updateReportingTask(taskNode, reportingTask);
return taskNode; return taskNode;
} }
private void updateReportingTask(final ReportingTaskNode reportingTask, final VersionedReportingTask proposed) { private void updateReportingTask(final ReportingTaskNode reportingTask, final VersionedReportingTask proposed)
throws ReportingTaskInstantiationException {
LOG.debug("Updating Reporting Task {}", reportingTask); LOG.debug("Updating Reporting Task {}", reportingTask);
reportingTask.pauseValidationTrigger(); reportingTask.pauseValidationTrigger();
@ -3458,8 +3461,15 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
reportingTask.setComments(proposed.getComments()); reportingTask.setComments(proposed.getComments());
reportingTask.setSchedulingPeriod(proposed.getSchedulingPeriod()); reportingTask.setSchedulingPeriod(proposed.getSchedulingPeriod());
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy())); reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
reportingTask.setAnnotationData(proposed.getAnnotationData()); reportingTask.setAnnotationData(proposed.getAnnotationData());
if (!isEqual(reportingTask.getBundleCoordinate(), proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
final List<PropertyDescriptor> descriptors = new ArrayList<>(reportingTask.getProperties().keySet());
final Set<URL> additionalUrls = reportingTask.getAdditionalClasspathResources(descriptors);
context.getReloadComponent().reload(reportingTask, proposed.getType(), newBundleCoordinate, additionalUrls);
}
final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(reportingTask, proposed.getProperties(), proposed.getPropertyDescriptors().values()); final Set<String> sensitiveDynamicPropertyNames = getSensitiveDynamicPropertyNames(reportingTask, proposed.getProperties(), proposed.getPropertyDescriptors().values());
reportingTask.setProperties(proposed.getProperties(), false, sensitiveDynamicPropertyNames); reportingTask.setProperties(proposed.getProperties(), false, sensitiveDynamicPropertyNames);

View File

@ -24,6 +24,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedControllerService;
@ -171,7 +172,7 @@ public interface VersionedComponentSynchronizer {
* @throws TimeoutException if the reporting task is being removed and takes longer to stop than the timeout allowed by the {@link FlowSynchronizationOptions synchronization options}. * @throws TimeoutException if the reporting task is being removed and takes longer to stop than the timeout allowed by the {@link FlowSynchronizationOptions synchronization options}.
*/ */
void synchronize(ReportingTaskNode reportingTask, VersionedReportingTask proposed, FlowSynchronizationOptions synchronizationOptions) void synchronize(ReportingTaskNode reportingTask, VersionedReportingTask proposed, FlowSynchronizationOptions synchronizationOptions)
throws FlowSynchronizationException, TimeoutException, InterruptedException; throws FlowSynchronizationException, TimeoutException, InterruptedException, ReportingTaskInstantiationException;
/** /**