NIFI-5154: When Processor or Controller Service is added to a Process Group, remove any references from it to any other Controller Service that is not reachable from the newly assigned Process Group

Fixed bug in unit test
Addressed review feedback/addressed issue where if a group is moved inside another group, the descendant processors of the moved group did not have their service references updated properly. Also addressed an issue where if a service is defined in Group A, then Group B lives within Group A and has a processor that references a service at the level of Group A, we allowed user to move Group B outside of Group A (but wouldn't allow the processor to be moved out of scope by itself).
This closes #2678
This commit is contained in:
Mark Payne 2018-05-04 14:48:53 -04:00 committed by Matt Gilman
parent 8acac9cba5
commit fb48ae2f88
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
2 changed files with 70 additions and 5 deletions

View File

@ -660,6 +660,10 @@ public final class StandardProcessGroup implements ProcessGroup {
processGroups.put(Objects.requireNonNull(group).getIdentifier(), group); processGroups.put(Objects.requireNonNull(group).getIdentifier(), group);
flowController.onProcessGroupAdded(group); flowController.onProcessGroupAdded(group);
group.findAllControllerServices().stream().forEach(this::updateControllerServiceReferences);
group.findAllProcessors().stream().forEach(this::updateControllerServiceReferences);
onComponentModified(); onComponentModified();
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
@ -829,12 +833,49 @@ public final class StandardProcessGroup implements ProcessGroup {
processor.getVariableRegistry().setParent(getVariableRegistry()); processor.getVariableRegistry().setParent(getVariableRegistry());
processors.put(processorId, processor); processors.put(processorId, processor);
flowController.onProcessorAdded(processor); flowController.onProcessorAdded(processor);
updateControllerServiceReferences(processor);
onComponentModified(); onComponentModified();
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
} }
/**
* Looks for any property that is configured on the given component that references a Controller Service.
* If any exists, and that Controller Service is not accessible from this Process Group, then the given
* component will be removed from the service's referencing components.
*
* @param component the component whose invalid references should be removed
*/
private void updateControllerServiceReferences(final ConfiguredComponent component) {
for (final Map.Entry<PropertyDescriptor, String> entry : component.getProperties().entrySet()) {
final String serviceId = entry.getValue();
if (serviceId == null) {
continue;
}
final PropertyDescriptor propertyDescriptor = entry.getKey();
final Class<? extends ControllerService> serviceClass = propertyDescriptor.getControllerServiceDefinition();
if (serviceClass != null) {
final boolean validReference = isValidServiceReference(serviceId, serviceClass);
final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(serviceId);
if (serviceNode != null) {
if (validReference) {
serviceNode.addReference(component);
} else {
serviceNode.removeReference(component);
}
}
}
}
}
private boolean isValidServiceReference(final String serviceId, final Class<? extends ControllerService> serviceClass) {
final Set<String> validServiceIds = controllerServiceProvider.getControllerServiceIdentifiers(serviceClass, getIdentifier());
return validServiceIds.contains(serviceId);
}
@Override @Override
public void removeProcessor(final ProcessorNode processor) { public void removeProcessor(final ProcessorNode processor) {
boolean removed = false; boolean removed = false;
@ -2046,6 +2087,7 @@ public final class StandardProcessGroup implements ProcessGroup {
service.getVariableRegistry().setParent(getVariableRegistry()); service.getVariableRegistry().setParent(getVariableRegistry());
this.controllerServices.put(service.getIdentifier(), service); this.controllerServices.put(service.getIdentifier(), service);
LOG.info("{} added to {}", service, this); LOG.info("{} added to {}", service, this);
updateControllerServiceReferences(service);
onComponentModified(); onComponentModified();
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
@ -2773,8 +2815,8 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
} }
for (final String id : snippet.getProcessors().keySet()) { final Set<ProcessorNode> processors = findAllProcessors(snippet);
final ProcessorNode processorNode = getProcessor(id); for (final ProcessorNode processorNode : processors) {
for (final PropertyDescriptor descriptor : processorNode.getProperties().keySet()) { for (final PropertyDescriptor descriptor : processorNode.getProperties().keySet()) {
final Class<? extends ControllerService> serviceDefinition = descriptor.getControllerServiceDefinition(); final Class<? extends ControllerService> serviceDefinition = descriptor.getControllerServiceDefinition();
@ -2790,7 +2832,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// ensure the configured service is an allowed service if it's still a valid service // ensure the configured service is an allowed service if it's still a valid service
if (currentControllerServiceIds.contains(serviceId) && !proposedControllerServiceIds.contains(serviceId)) { if (currentControllerServiceIds.contains(serviceId) && !proposedControllerServiceIds.contains(serviceId)) {
throw new IllegalStateException("Cannot perform Move Operation because a Processor references a service that is not available in the destination Process Group"); throw new IllegalStateException("Cannot perform Move Operation because Processor with ID " + processorNode.getIdentifier()
+ " references a service that is not available in the destination Process Group");
} }
} }
} }
@ -2801,6 +2844,20 @@ public final class StandardProcessGroup implements ProcessGroup {
} }
} }
private Set<ProcessorNode> findAllProcessors(final Snippet snippet) {
final Set<ProcessorNode> processors = new HashSet<>();
snippet.getProcessors().keySet().stream()
.map(this::getProcessor)
.forEach(processors::add);
for (final String groupId : snippet.getProcessGroups().keySet()) {
processors.addAll(getProcessGroup(groupId).findAllProcessors());
}
return processors;
}
@Override @Override
public MutableVariableRegistry getVariableRegistry() { public MutableVariableRegistry getVariableRegistry() {
return variableRegistry; return variableRegistry;

View File

@ -55,6 +55,8 @@ import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.variable.MutableVariableRegistry; import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.registry.variable.StandardComponentVariableRegistry; import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -359,8 +361,14 @@ public class TestStandardControllerServiceProvider {
private ProcessorNode createProcessor(final StandardProcessScheduler scheduler, final ControllerServiceProvider serviceProvider) { private ProcessorNode createProcessor(final StandardProcessScheduler scheduler, final ControllerServiceProvider serviceProvider) {
final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class); final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
final LoggableComponent<Processor> dummyProcessor = new LoggableComponent<>(new DummyProcessor(), systemBundle.getBundleDetails().getCoordinate(), null);
final ProcessorNode procNode = new StandardProcessorNode(dummyProcessor, UUID.randomUUID().toString(), final Processor processor = new DummyProcessor();
final MockProcessContext context = new MockProcessContext(processor, Mockito.mock(StateManager.class), variableRegistry);
final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context);
processor.initialize(mockInitContext);
final LoggableComponent<Processor> dummyProcessor = new LoggableComponent<>(processor, systemBundle.getBundleDetails().getCoordinate(), null);
final ProcessorNode procNode = new StandardProcessorNode(dummyProcessor, mockInitContext.getIdentifier(),
new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider, niFiProperties, new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider, niFiProperties,
new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent);