From fb48ae2f8853e30562d87595b52febe230a26596 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 4 May 2018 14:48:53 -0400 Subject: [PATCH] NIFI-5154: When Processor or Controller Service is added to a Process Group, remove any references from it to any other Controller Service that is not reachable from the newly assigned Process Group Fixed bug in unit test Addressed review feedback/addressed issue where if a group is moved inside another group, the descendant processors of the moved group did not have their service references updated properly. Also addressed an issue where if a service is defined in Group A, then Group B lives within Group A and has a processor that references a service at the level of Group A, we allowed user to move Group B outside of Group A (but wouldn't allow the processor to be moved out of scope by itself). This closes #2678 --- .../nifi/groups/StandardProcessGroup.java | 63 ++++++++++++++++++- ...TestStandardControllerServiceProvider.java | 12 +++- 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 94196d0799..551d39e8bf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -660,6 +660,10 @@ public final class StandardProcessGroup implements ProcessGroup { processGroups.put(Objects.requireNonNull(group).getIdentifier(), group); flowController.onProcessGroupAdded(group); + + group.findAllControllerServices().stream().forEach(this::updateControllerServiceReferences); + group.findAllProcessors().stream().forEach(this::updateControllerServiceReferences); + onComponentModified(); } finally { writeLock.unlock(); @@ -829,12 +833,49 @@ public final class StandardProcessGroup implements ProcessGroup { processor.getVariableRegistry().setParent(getVariableRegistry()); processors.put(processorId, processor); flowController.onProcessorAdded(processor); + updateControllerServiceReferences(processor); onComponentModified(); } finally { writeLock.unlock(); } } + /** + * Looks for any property that is configured on the given component that references a Controller Service. + * If any exists, and that Controller Service is not accessible from this Process Group, then the given + * component will be removed from the service's referencing components. + * + * @param component the component whose invalid references should be removed + */ + private void updateControllerServiceReferences(final ConfiguredComponent component) { + for (final Map.Entry entry : component.getProperties().entrySet()) { + final String serviceId = entry.getValue(); + if (serviceId == null) { + continue; + } + + final PropertyDescriptor propertyDescriptor = entry.getKey(); + final Class serviceClass = propertyDescriptor.getControllerServiceDefinition(); + + if (serviceClass != null) { + final boolean validReference = isValidServiceReference(serviceId, serviceClass); + final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(serviceId); + if (serviceNode != null) { + if (validReference) { + serviceNode.addReference(component); + } else { + serviceNode.removeReference(component); + } + } + } + } + } + + private boolean isValidServiceReference(final String serviceId, final Class serviceClass) { + final Set validServiceIds = controllerServiceProvider.getControllerServiceIdentifiers(serviceClass, getIdentifier()); + return validServiceIds.contains(serviceId); + } + @Override public void removeProcessor(final ProcessorNode processor) { boolean removed = false; @@ -2046,6 +2087,7 @@ public final class StandardProcessGroup implements ProcessGroup { service.getVariableRegistry().setParent(getVariableRegistry()); this.controllerServices.put(service.getIdentifier(), service); LOG.info("{} added to {}", service, this); + updateControllerServiceReferences(service); onComponentModified(); } finally { writeLock.unlock(); @@ -2773,8 +2815,8 @@ public final class StandardProcessGroup implements ProcessGroup { } } - for (final String id : snippet.getProcessors().keySet()) { - final ProcessorNode processorNode = getProcessor(id); + final Set processors = findAllProcessors(snippet); + for (final ProcessorNode processorNode : processors) { for (final PropertyDescriptor descriptor : processorNode.getProperties().keySet()) { final Class serviceDefinition = descriptor.getControllerServiceDefinition(); @@ -2790,7 +2832,8 @@ public final class StandardProcessGroup implements ProcessGroup { // ensure the configured service is an allowed service if it's still a valid service if (currentControllerServiceIds.contains(serviceId) && !proposedControllerServiceIds.contains(serviceId)) { - throw new IllegalStateException("Cannot perform Move Operation because a Processor references a service that is not available in the destination Process Group"); + throw new IllegalStateException("Cannot perform Move Operation because Processor with ID " + processorNode.getIdentifier() + + " references a service that is not available in the destination Process Group"); } } } @@ -2801,6 +2844,20 @@ public final class StandardProcessGroup implements ProcessGroup { } } + private Set findAllProcessors(final Snippet snippet) { + final Set processors = new HashSet<>(); + + snippet.getProcessors().keySet().stream() + .map(this::getProcessor) + .forEach(processors::add); + + for (final String groupId : snippet.getProcessGroups().keySet()) { + processors.addAll(getProcessGroup(groupId).findAllProcessors()); + } + + return processors; + } + @Override public MutableVariableRegistry getVariableRegistry() { return variableRegistry; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index fac04a6bd1..bfd1340111 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -55,6 +55,8 @@ import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.MockProcessorInitializationContext; import org.apache.nifi.util.NiFiProperties; import org.junit.Assert; import org.junit.Before; @@ -359,8 +361,14 @@ public class TestStandardControllerServiceProvider { private ProcessorNode createProcessor(final StandardProcessScheduler scheduler, final ControllerServiceProvider serviceProvider) { final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class); - final LoggableComponent dummyProcessor = new LoggableComponent<>(new DummyProcessor(), systemBundle.getBundleDetails().getCoordinate(), null); - final ProcessorNode procNode = new StandardProcessorNode(dummyProcessor, UUID.randomUUID().toString(), + + final Processor processor = new DummyProcessor(); + final MockProcessContext context = new MockProcessContext(processor, Mockito.mock(StateManager.class), variableRegistry); + final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context); + processor.initialize(mockInitContext); + + final LoggableComponent dummyProcessor = new LoggableComponent<>(processor, systemBundle.getBundleDetails().getCoordinate(), null); + final ProcessorNode procNode = new StandardProcessorNode(dummyProcessor, mockInitContext.getIdentifier(), new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider, niFiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent);