NIFI-6420 Controller Service references not properly tracked if multiple references from same component

<h1>Changes</h1>

-  Instead of keeping a Set of referencing `ComponentNode`, keep instead a Map which stores, for each referencing component, also the name of the property for which the service is referenced
   -  This is done to keep the `addReference()` idempotent
- `StandardControllerService.referencingComponents` changed from HashSet to HashMap
    -  `Key` is the combination of `ComponentNode.hashCode()` and property name
    -  `Value` is the referencing `ComponentNode`
- `ControllerServiceNode.addReference()` now requires a tuple (referring `ComponentNode`, property name)
- `ControllerServiceNode.removeReference()` now requires a tuple (referring `ComponentNode`, property name)
- `ControllerServiceNode.getReferences()` signature is left untouched, when it's called, the values of the `referencingComponents` map are turned into a Set, meaning a referencing component will keep being returned by this method until at least one of its properties still reference the `ControllerService`
- `StandardSchedulingContext.leaseControllerService()` uses a dedicated constant
- `FrameworkIntegrationTest` has a test which create a processor with two optional controller service properties and then verifies that having them referencing and dereferencing the same controller service doesn't cause the reported bug anymore

Removed SchedulingContext from ClojureScriptEngineConfigurator

Co-Authored-By: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Mark Payne <markap14@hotmail.com>

This closes #3600.
This commit is contained in:
Alessandro D'Armiento 2019-07-24 00:24:05 +02:00 committed by Mark Payne
parent a5bdecbd25
commit 35d79eaf0f
13 changed files with 125 additions and 271 deletions

View File

@ -1,70 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ControllerServiceLookup;
public interface SchedulingContext extends ProcessContext {
/**
* <p>
* Indicates to the framework that the Controller Service with the given
* identifier will be used by this Processor. This will prevent any Data
* Flow Manager from disabling the Controller Service while this Processor
* is still running.
* </p>
*
* <p>
* Generally, a Controller Service is accessed by indicating in a
* {@link PropertyDescriptor} that the PropertyDescriptor identifies a
* Controller Service via the
* {@link PropertyDescriptor.Builder#identifiesControllerService(Class) identifiesControllerService(Class)}
* method and then calling
* {@link ProcessContext#getProperty(PropertyDescriptor)}.{@link PropertyValue#asControllerService(Class) asControllerService(Class)}.
* In this case, it is not necessary to lease the Controller Service, as the
* Framework will handle this.
* </p>
*
* <p>
* There are, however, cases in which a Controller Service must be accessed
* in a different way, via the {@link ControllerServiceLookup} (accessed via
* {@link ProcessContext#getControllerServiceLookup()}). In this case, the
* Controller Service that is obtained from the ControllerServiceLookup can
* be disabled by a Data Flow Manager while it is still in use by a
* Processor, causing IllegalStateException to be thrown whenever the
* Processor attempts to interact with the service. This method provides a
* mechanism by which a Processor is able to indicate that the Controller
* Service with the given identifier should not be disabled while this
* Processor is running.
* </p>
*
* <p>
* For any Controller Service that is leased by calling this method, the
* lease will automatically be terminated, allowing the Controller Service
* to be disabled, whenever the Processor is stopped.
* </p>
*
* @param identifier
*
* @throws IllegalArgumentException if no Controller Service exists with the
* given identifier
*/
void leaseControllerService(String identifier);
}

View File

@ -28,9 +28,9 @@ import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SchedulingContext;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.state.MockStateManager;
import org.junit.Assert;
@ -48,7 +48,7 @@ import java.util.Set;
import static java.util.Objects.requireNonNull;
public class MockProcessContext extends MockControllerServiceLookup implements SchedulingContext, ControllerServiceLookup, NodeTypeProvider {
public class MockProcessContext extends MockControllerServiceLookup implements ProcessContext, ControllerServiceLookup, NodeTypeProvider {
private final ConfigurableComponent component;
private final String componentName;
@ -423,10 +423,6 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
return this;
}
@Override
public void leaseControllerService(final String identifier) {
}
@Override
public Set<Relationship> getAvailableRelationships() {
if (!(component instanceof Processor)) {

View File

@ -346,13 +346,13 @@ public abstract class AbstractComponentNode implements ComponentNode {
if (oldConfiguration != null) {
final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(effectiveValue);
if (oldNode != null) {
oldNode.removeReference(this);
oldNode.removeReference(this, descriptor);
}
}
final ControllerServiceNode newNode = serviceProvider.getControllerServiceNode(effectiveValue);
if (newNode != null) {
newNode.addReference(this);
newNode.addReference(this, descriptor);
}
}
}
@ -404,7 +404,7 @@ public abstract class AbstractComponentNode implements ComponentNode {
if (value != null) {
final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(value);
if (oldNode != null) {
oldNode.removeReference(this);
oldNode.removeReference(this, descriptor);
}
}
}

View File

@ -21,6 +21,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
@ -119,14 +120,16 @@ public interface ControllerServiceNode extends ComponentNode, VersionedComponent
/**
* Indicates that the given component is now referencing this Controller Service
* @param referringComponent the component referencing this service
* @param propertyDescriptor the property for which the component is referencing this controller service
*/
void addReference(ComponentNode referringComponent);
void addReference(ComponentNode referringComponent, PropertyDescriptor propertyDescriptor);
/**
* Indicates that the given component is no longer referencing this Controller Service
* @param referringComponent the component that is no longer referencing this service
* @param propertyDescriptor the property for which the component is referencing this controller service
*/
void removeReference(ComponentNode referringComponent);
void removeReference(ComponentNode referringComponent, PropertyDescriptor propertyDescriptor);
void setComments(String comment);

View File

@ -598,7 +598,7 @@ public class StandardFlowManager implements FlowManager {
if (value != null) {
final ControllerServiceNode serviceNode = flowController.getControllerServiceProvider().getControllerServiceNode(value);
if (serviceNode != null) {
serviceNode.removeReference(reportingTaskNode);
serviceNode.removeReference(reportingTaskNode, descriptor);
}
}
}
@ -654,7 +654,7 @@ public class StandardFlowManager implements FlowManager {
if (value != null) {
final ControllerServiceNode referencedNode = getRootControllerService(value);
if (referencedNode != null) {
referencedNode.removeReference(service);
referencedNode.removeReference(service, descriptor);
}
}
}

View File

@ -49,6 +49,7 @@ import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,6 +71,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class StandardControllerServiceNode extends AbstractComponentNode implements ControllerServiceNode {
@ -84,7 +86,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private final Set<ComponentNode> referencingComponents = new HashSet<>();
private final Set<Tuple<ComponentNode, PropertyDescriptor>> referencingComponents = new HashSet<>();
private volatile String comment;
private volatile ProcessGroup processGroup;
@ -224,17 +226,18 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
public ControllerServiceReference getReferences() {
readLock.lock();
try {
return new StandardControllerServiceReference(this, referencingComponents);
// In case a controller service is referenced multiple times by a component node, the latter is decoupled here
return new StandardControllerServiceReference(this, referencingComponents.stream().map(Tuple::getKey).collect(Collectors.toSet()));
} finally {
readLock.unlock();
}
}
@Override
public void addReference(final ComponentNode referencingComponent) {
public void addReference(final ComponentNode referencingComponent, final PropertyDescriptor propertyDescriptor) {
writeLock.lock();
try {
referencingComponents.add(referencingComponent);
referencingComponents.add(new Tuple<>(referencingComponent, propertyDescriptor));
} finally {
writeLock.unlock();
}
@ -263,10 +266,10 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
@Override
public void removeReference(final ComponentNode referencingComponent) {
public void removeReference(final ComponentNode referencingComponent, final PropertyDescriptor propertyDescriptor) {
writeLock.lock();
try {
referencingComponents.remove(referencingComponent);
referencingComponents.remove(new Tuple<>(referencingComponent, propertyDescriptor));
} finally {
writeLock.unlock();
}

View File

@ -896,9 +896,9 @@ public final class StandardProcessGroup implements ProcessGroup {
final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(serviceId);
if (serviceNode != null) {
if (validReference) {
serviceNode.addReference(component);
serviceNode.addReference(component, propertyDescriptor);
} else {
serviceNode.removeReference(component);
serviceNode.removeReference(component, propertyDescriptor);
}
}
}
@ -939,7 +939,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (value != null) {
final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value);
if (serviceNode != null) {
serviceNode.removeReference(processor);
serviceNode.removeReference(processor, descriptor);
}
}
}
@ -2155,7 +2155,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (value != null) {
final ControllerServiceNode referencedNode = getControllerService(value);
if (referencedNode != null) {
referencedNode.removeReference(service);
referencedNode.removeReference(service, descriptor);
}
}
}

View File

@ -1,169 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState;
public class StandardSchedulingContext implements SchedulingContext {
private final ProcessContext processContext;
private final ControllerServiceProvider serviceProvider;
private final ProcessorNode processorNode;
private final StateManager stateManager;
public StandardSchedulingContext(final ProcessContext processContext, final ControllerServiceProvider serviceProvider, final ProcessorNode processorNode, final StateManager stateManager) {
this.processContext = processContext;
this.serviceProvider = serviceProvider;
this.processorNode = processorNode;
this.stateManager = stateManager;
}
@Override
public void leaseControllerService(final String identifier) {
final ControllerServiceNode serviceNode = serviceProvider.getControllerServiceNode(identifier);
if (serviceNode == null) {
throw new IllegalArgumentException("Cannot lease Controller Service because no Controller Service exists with identifier " + identifier);
}
if (serviceNode.getState() != ControllerServiceState.ENABLED) {
throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService().getIdentifier() + " is not currently enabled");
}
switch (serviceNode.getValidationStatus()) {
case INVALID:
throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService().getIdentifier() + " is not currently valid");
case VALIDATING:
throw new IllegalStateException("Cannot lease Controller Service because Controller Service "
+ serviceNode.getProxiedControllerService().getIdentifier() + " is in the process of validating its configuration");
}
serviceNode.addReference(processorNode);
}
@Override
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
return processContext.getProperty(descriptor);
}
@Override
public PropertyValue getProperty(final String propertyName) {
return processContext.getProperty(propertyName);
}
@Override
public PropertyValue newPropertyValue(final String rawValue) {
return processContext.newPropertyValue(rawValue);
}
@Override
public void yield() {
processContext.yield();
}
@Override
public int getMaxConcurrentTasks() {
return processContext.getMaxConcurrentTasks();
}
@Override
public String getAnnotationData() {
return processContext.getAnnotationData();
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
return processContext.getProperties();
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public String encrypt(final String unencrypted) {
return processContext.encrypt(unencrypted);
}
@Override
public String decrypt(final String encrypted) {
return processContext.decrypt(encrypted);
}
@Override
public ControllerServiceLookup getControllerServiceLookup() {
return processContext.getControllerServiceLookup();
}
@Override
public Set<Relationship> getAvailableRelationships() {
return processContext.getAvailableRelationships();
}
@Override
public boolean hasIncomingConnection() {
return processContext.hasIncomingConnection();
}
@Override
public boolean hasNonLoopConnection() {
return processContext.hasNonLoopConnection();
}
@Override
public boolean hasConnection(Relationship relationship) {
return processContext.hasConnection(relationship);
}
@Override
public boolean isExpressionLanguagePresent(PropertyDescriptor property) {
if (property == null || !property.isExpressionLanguageSupported()) {
return false;
}
final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue());
return (elRanges != null && !elRanges.isEmpty());
}
@Override
public StateManager getStateManager() {
return stateManager;
}
@Override
public String getName() {
return processorNode.getName();
}
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.controller.service;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.ValidationStatus;
@ -459,7 +460,7 @@ public class TestStandardControllerServiceProvider {
final ControllerServiceNode serviceNode = createControllerService(ServiceA.class.getName(), "1", systemBundle.getBundleDetails().getCoordinate(), provider);
final ProcessorNode procNode = createProcessor(scheduler, provider);
serviceNode.addReference(procNode);
serviceNode.addReference(procNode, PropertyDescriptor.NULL_DESCRIPTOR);
// procNode.setScheduledState(ScheduledState.STOPPED);
provider.unscheduleReferencingComponents(serviceNode);

View File

@ -28,6 +28,8 @@ import org.junit.Test;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
@ -67,4 +69,38 @@ public class ControllerServiceReferenceIT extends FrameworkIntegrationTest {
assertSame(validationStatus, ValidationStatus.VALID);
assertEquals(0, validationErrors.size());
}
@Test
public void testReferenceCounts() {
final String FIRST_PROPERTY = "Counter Service";
final String SECOND_PROPERTY = "Another Counter Service";
final ControllerServiceNode serviceNode = createControllerServiceNode(LongValidatingControllerService.class.getName());
serviceNode.setProperties(Collections.singletonMap(LongValidatingControllerService.DELAY.getName(), "250 millis"));
final ProcessorNode counter = createProcessorNode(MultipleControllerServiceReferencingProcessor.class);
final Map<String, String> properties = new HashMap<>();
// Add a reference of the service node in the first property of the processor
properties.put(FIRST_PROPERTY, serviceNode.getIdentifier());
counter.setProperties(properties);
assertEquals(1, serviceNode.getReferences().getReferencingComponents().size());
// Add another reference of the same service node in the second property of the processor
properties.put(SECOND_PROPERTY, serviceNode.getIdentifier());
counter.setProperties(properties);
assertEquals(1, serviceNode.getReferences().getReferencingComponents().size());
// Remove the reference of the service node from the first property of the processor
properties.put(FIRST_PROPERTY, null);
counter.setProperties(properties);
// The counter should still be one because the service node is still referenced by the processor in its second property
assertEquals(1, serviceNode.getReferences().getReferencingComponents().size());
// Remove also the reference of the service node from the second property of the processor
properties.put(SECOND_PROPERTY, null);
counter.setProperties(properties);
// The counter should become 0 because now the service node is not reference anymore in any processor property
assertEquals(0, serviceNode.getReferences().getReferencingComponents().size());
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.integration.cs;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.integration.FrameworkIntegrationTest;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
public class MultipleControllerServiceReferencingProcessor extends AbstractProcessor {
private final PropertyDescriptor SERVICE = new PropertyDescriptor.Builder()
.name("Counter Service")
.identifiesControllerService(Counter.class)
.required(false)
.build();
private final PropertyDescriptor ANOTHER_SERVICE = new PropertyDescriptor.Builder()
.name("Another Counter Service")
.identifiesControllerService(Counter.class)
.required(false)
.build();
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(FrameworkIntegrationTest.REL_SUCCESS);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(SERVICE, ANOTHER_SERVICE);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { }
}

View File

@ -32,7 +32,7 @@ public class ClojureScriptEngineConfigurator extends AbstractModuleClassloaderCo
+ "[org.apache.nifi.flowfile FlowFile]\n"
+ "[org.apache.nifi.processor "
+ "AbstractProcessor AbstractSessionFactoryProcessor DataUnit FlowFileFilter ProcessContext Processor "
+ "ProcessorInitializationContext ProcessSession ProcessSessionFactory Relationship SchedulingContext"
+ "ProcessorInitializationContext ProcessSession ProcessSessionFactory Relationship ProcessContext"
+ "]\n"
+ "[org.apache.nifi.processor.exception FlowFileAccessException FlowFileHandlingException MissingFlowFileException ProcessException]\n"
+ "[org.apache.nifi.processor.io InputStreamCallback OutputStreamCallback StreamCallback]\n"

View File

@ -31,15 +31,15 @@ import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.parameter.ExpressionLanguageAgnosticParameterParser;
import org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterParser;
import org.apache.nifi.parameter.ParameterReference;
import org.apache.nifi.parameter.ParameterTokenList;
import org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SchedulingContext;
import org.apache.nifi.registry.VariableRegistry;
import java.io.File;
@ -55,7 +55,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
public class StatelessProcessContext implements SchedulingContext, ControllerServiceInitializationContext, StatelessConnectionContext {
public class StatelessProcessContext implements ProcessContext, ControllerServiceInitializationContext, StatelessConnectionContext {
private final ConfigurableComponent component;
private final String componentName;
@ -400,10 +400,6 @@ public class StatelessProcessContext implements SchedulingContext, ControllerSer
return this.lookup;
}
@Override
public void leaseControllerService(final String identifier) {
}
@Override
public Set<Relationship> getAvailableRelationships() {
if (!(component instanceof Processor)) {