NIFI-2909 Adding per-instance class loading capability through @RequiresInstanceClassLoading annotation

NIFI-1712 Applying per-instance class loading to HBaseClientService to allow specifying Phoenix Client JAR

-Refactoring the ClassLoading so that every processor, controller service, and reporting task gets an InstanceClassLoader with a parent of the NAR ClassLoader, and only components with @RequiresInstanceClassLoading will make a copy of the NAR ClassLoader resources, and addressing some review feedback

This closes #1156
This commit is contained in:
Bryan Bende 2016-10-10 09:27:57 -04:00 committed by Oleg Zhurakousky
parent 2f0d9a34f1
commit d1d053725b
48 changed files with 1281 additions and 377 deletions

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.annotation.behavior;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marker annotation a component can use to indicate that the framework should create a new ClassLoader
* for each instance of the component, copying all resources from the component's NARClassLoader to a
* new ClassLoader which will only be used by a given instance of the component.
*
* This annotation is typically used when a component has one or more PropertyDescriptors which set
* dynamicallyModifiesClasspath(boolean) to true.
*
* When this annotation is used it is important to note that each added instance of the component will increase
* the overall memory footprint more than that of a component without this annotation.
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface RequiresInstanceClassLoading {
}

View File

@ -79,6 +79,11 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
* Language
*/
private final boolean expressionLanguageSupported;
/**
* indicates whether or not this property represents resources that should be added
* to the classpath for this instance of the component
*/
private final boolean dynamicallyModifiesClasspath;
/**
* the interface of the {@link ControllerService} that this Property refers
@ -102,6 +107,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
this.required = builder.required;
this.sensitive = builder.sensitive;
this.dynamic = builder.dynamic;
this.dynamicallyModifiesClasspath = builder.dynamicallyModifiesClasspath;
this.expressionLanguageSupported = builder.expressionLanguageSupported;
this.controllerServiceDefinition = builder.controllerServiceDefinition;
this.validators = new ArrayList<>(builder.validators);
@ -232,6 +238,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
private boolean sensitive = false;
private boolean expressionLanguageSupported = false;
private boolean dynamic = false;
private boolean dynamicallyModifiesClasspath = false;
private Class<? extends ControllerService> controllerServiceDefinition;
private List<Validator> validators = new ArrayList<>();
@ -244,6 +251,7 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
this.required = specDescriptor.required;
this.sensitive = specDescriptor.sensitive;
this.dynamic = specDescriptor.dynamic;
this.dynamicallyModifiesClasspath = specDescriptor.dynamicallyModifiesClasspath;
this.expressionLanguageSupported = specDescriptor.expressionLanguageSupported;
this.controllerServiceDefinition = specDescriptor.getControllerServiceDefinition();
this.validators = new ArrayList<>(specDescriptor.validators);
@ -331,6 +339,22 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
return this;
}
/**
* Specifies that the value of this property represents one or more resources that the
* framework should add to the classpath of the given component.
*
* NOTE: If a component contains a PropertyDescriptor where dynamicallyModifiesClasspath is set to true,
* the component must also be annotated with @RequiresInstanceClassloading, otherwise the component will be
* considered invalid.
*
* @param dynamicallyModifiesClasspath whether or not this property should be used by the framework to modify the classpath
* @return the builder
*/
public Builder dynamicallyModifiesClasspath(final boolean dynamicallyModifiesClasspath) {
this.dynamicallyModifiesClasspath = dynamicallyModifiesClasspath;
return this;
}
/**
* @param values contrained set of values
* @return the builder
@ -492,6 +516,10 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor>
return expressionLanguageSupported;
}
public boolean isDynamicClasspathModifier() {
return dynamicallyModifiesClasspath;
}
public Class<? extends ControllerService> getControllerServiceDefinition() {
return controllerServiceDefinition;
}

View File

@ -591,4 +591,5 @@ public class FileUtils {
return digest.digest();
}
}

View File

@ -17,6 +17,8 @@
package org.apache.nifi.util.file.classloader;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FilenameFilter;
@ -24,23 +26,54 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.Set;
public class ClassLoaderUtils {
public static ClassLoader getCustomClassLoader(String modulePath, ClassLoader parentClassLoader, FilenameFilter filenameFilter) throws MalformedURLException {
// Split and trim the module path(s)
List<String> modules = (modulePath == null)
? null
: Arrays.stream(modulePath.split(",")).filter(StringUtils::isNotBlank).map(String::trim).collect(Collectors.toList());
static final Logger logger = LoggerFactory.getLogger(ClassLoaderUtils.class);
URL[] classpaths = getURLsForClasspath(modules, filenameFilter);
public static ClassLoader getCustomClassLoader(String modulePath, ClassLoader parentClassLoader, FilenameFilter filenameFilter) throws MalformedURLException {
URL[] classpaths = getURLsForClasspath(modulePath, filenameFilter, false);
return createModuleClassLoader(classpaths, parentClassLoader);
}
protected static URL[] getURLsForClasspath(List<String> modulePaths, FilenameFilter filenameFilter) throws MalformedURLException {
/**
*
* @param modulePath a module path to get URLs from, the module path may be a comma-separated list of paths
* @param filenameFilter a filter to apply when a module path is a directory and performs a listing, a null filter will return all matches
* @return an array of URL instances representing all of the modules resolved from processing modulePath
* @throws MalformedURLException if a module path does not exist
*/
public static URL[] getURLsForClasspath(String modulePath, FilenameFilter filenameFilter, boolean suppressExceptions) throws MalformedURLException {
return getURLsForClasspath(modulePath == null ? Collections.emptySet() : Collections.singleton(modulePath), filenameFilter, suppressExceptions);
}
/**
*
* @param modulePaths one or modules paths to get URLs from, each module path may be a comma-separated list of paths
* @param filenameFilter a filter to apply when a module path is a directory and performs a listing, a null filter will return all matches
* @param suppressExceptions if true then all modules will attempt to be resolved even if some throw an exception, if false the first exception will be thrown
* @return an array of URL instances representing all of the modules resolved from processing modulePaths
* @throws MalformedURLException if a module path does not exist
*/
public static URL[] getURLsForClasspath(Set<String> modulePaths, FilenameFilter filenameFilter, boolean suppressExceptions) throws MalformedURLException {
// use LinkedHashSet to maintain the ordering that the incoming paths are processed
Set<String> modules = new LinkedHashSet<>();
if (modulePaths != null) {
modulePaths.stream()
.flatMap(path -> Arrays.stream(path.split(",")))
.filter(StringUtils::isNotBlank)
.map(String::trim)
.forEach(m -> modules.add(m));
}
return toURLs(modules, filenameFilter, suppressExceptions);
}
protected static URL[] toURLs(Set<String> modulePaths, FilenameFilter filenameFilter, boolean suppressExceptions) throws MalformedURLException {
List<URL> additionalClasspath = new LinkedList<>();
if (modulePaths != null) {
for (String modulePathString : modulePaths) {
@ -52,23 +85,33 @@ public class ClassLoaderUtils {
isUrl = false;
}
if (!isUrl) {
File modulePath = new File(modulePathString);
try {
File modulePath = new File(modulePathString);
if (modulePath.exists()) {
if (modulePath.exists()) {
additionalClasspath.add(modulePath.toURI().toURL());
additionalClasspath.add(modulePath.toURI().toURL());
if (modulePath.isDirectory()) {
File[] files = modulePath.listFiles(filenameFilter);
if (modulePath.isDirectory()) {
File[] files = modulePath.listFiles(filenameFilter);
if (files != null) {
for (File jarFile : files) {
additionalClasspath.add(jarFile.toURI().toURL());
if (files != null) {
for (File classpathResource : files) {
if (classpathResource.isDirectory()) {
logger.warn("Recursive directories are not supported, skipping " + classpathResource.getAbsolutePath());
} else {
additionalClasspath.add(classpathResource.toURI().toURL());
}
}
}
}
} else {
throw new MalformedURLException("Path specified does not exist");
}
} catch (MalformedURLException e) {
if (!suppressExceptions) {
throw e;
}
} else {
throw new MalformedURLException("Path specified does not exist");
}
}
}

View File

@ -18,9 +18,13 @@ package org.apache.nifi.util.file.classloader;
import java.io.FilenameFilter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashSet;
import java.util.Set;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -79,6 +83,43 @@ public class TestClassLoaderUtils {
assertNotNull(ClassLoaderUtils.getCustomClassLoader(jarFilePath, this.getClass().getClassLoader(), getJarFilenameFilter()));
}
@Test
public void testGetURLsForClasspathWithDirectory() throws MalformedURLException {
final String jarFilePath = "src/test/resources/TestClassLoaderUtils";
URL[] urls = ClassLoaderUtils.getURLsForClasspath(jarFilePath, null, false);
assertEquals(2, urls.length);
}
@Test
public void testGetURLsForClasspathWithSingleJAR() throws MalformedURLException {
final String jarFilePath = "src/test/resources/TestClassLoaderUtils/TestSuccess.jar";
URL[] urls = ClassLoaderUtils.getURLsForClasspath(jarFilePath, null, false);
assertEquals(1, urls.length);
}
@Test(expected = MalformedURLException.class)
public void testGetURLsForClasspathWithSomeNonExistentAndNoSuppression() throws MalformedURLException {
final String jarFilePath = "src/test/resources/TestClassLoaderUtils/TestSuccess.jar,src/test/resources/TestClassLoaderUtils/FakeTest.jar";
ClassLoaderUtils.getURLsForClasspath(jarFilePath, null, false);
}
@Test
public void testGetURLsForClasspathWithSomeNonExistentAndSuppression() throws MalformedURLException {
final String jarFilePath = "src/test/resources/TestClassLoaderUtils/TestSuccess.jar,src/test/resources/TestClassLoaderUtils/FakeTest.jar";
URL[] urls = ClassLoaderUtils.getURLsForClasspath(jarFilePath, null, true);
assertEquals(1, urls.length);
}
@Test
public void testGetURLsForClasspathWithSetAndSomeNonExistentAndSuppression() throws MalformedURLException {
final Set<String> modules = new HashSet<>();
modules.add("src/test/resources/TestClassLoaderUtils/TestSuccess.jar,src/test/resources/TestClassLoaderUtils/FakeTest1.jar");
modules.add("src/test/resources/TestClassLoaderUtils/FakeTest2.jar,src/test/resources/TestClassLoaderUtils/FakeTest3.jar");
URL[] urls = ClassLoaderUtils.getURLsForClasspath(modules, null, true);
assertEquals(1, urls.length);
}
protected FilenameFilter getJarFilenameFilter(){
return (dir, name) -> name != null && name.endsWith(".jar");
}

View File

@ -2269,6 +2269,58 @@ API artifacts into the same NAR is often acceptable.
[[per-instance-classloading]]
== Per-Instance ClassLoading
A component developer may wish to add additional resources to the components classpath at runtime.
For example, you may want to provide the location of a JDBC driver to a processor that interacts with a
relational database, thus allowing the processor to work with any driver rather than trying to bundle a
driver into the NAR.
This may be accomplished by declaring one or more PropertyDescriptor instances with
`dynamicallyModifiesClasspath` set to true. For example:
[source,java]
----
PropertyDescriptor EXTRA_RESOURCE = new PropertyDescriptor.Builder()
.name("Extra Resources")
.description("The path to one or more resources to add to the classpath.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.dynamicallyModifiesClasspath(true)
.build();
----
When these properties are set on a component, the framework identifies all properties where
`dynamicallyModifiesClasspath` is set to true. For each of these properties, the framework
attempts to resolve filesystem resources from the value of the property. The value may be a
comma-separated list of one or more directories or files, where any paths that do not exist are
skipped. If the resource represents a directory, the directory is listed, and all of the files
in that directory are added to the classpath individually.
Each property may impose further restrictions on the format of the value through the validators.
For example, using StandardValidators.FILE_EXISTS_VALIDATOR restricts the property to accepting a
single file. Using StandardValidators.NON_EMPTY_VALIDATOR allows any combination of comma-separated
files or directories.
Resources are added to the instance ClassLoader by adding them to an inner ClassLoader that is always
checked first. Anytime the value of these properties change, the inner ClassLoader is closed and
re-created with the new resources.
NiFi provides the `@RequiresInstanceClassLoading` annotation to further expand and isolate the libraries
available on a components classpath. You can annotate a component with `@RequiresInstanceClassLoading`
to indicate that the instance ClassLoader for the component requires a copy of all the resources in the
component's NAR ClassLoader. When `@RequiresInstanceClassLoading` is not present, the
instance ClassLoader simply has it's parent ClassLoader set to the NAR ClassLoader, rather than
copying resources.
Because @RequiresInstanceClassLoading copies resources from the NAR ClassLoader for each instance of the
component, use this capability judiciously. If ten instances of one component are created, all classes
from the component's NAR ClassLoader are loaded into memory ten times. This could eventually increase the
memory footprint significantly when enough instances of the component are created.
== How to contribute to Apache NiFi

View File

@ -19,6 +19,7 @@ package org.apache.nifi.documentation.init;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.documentation.ConfigurableComponentInitializer;
import org.apache.nifi.documentation.mock.MockConfigurationContext;
import org.apache.nifi.documentation.mock.MockControllerServiceInitializationContext;
@ -38,15 +39,15 @@ public class ControllerServiceInitializer implements ConfigurableComponentInitia
@Override
public void initialize(ConfigurableComponent component) throws InitializationException {
ControllerService controllerService = (ControllerService) component;
try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
controllerService.initialize(new MockControllerServiceInitializationContext());
ControllerServiceInitializationContext context = new MockControllerServiceInitializationContext();
try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), context.getIdentifier())) {
controllerService.initialize(context);
}
}
@Override
public void teardown(ConfigurableComponent component) {
try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
ControllerService controllerService = (ControllerService) component;
final ComponentLog logger = new MockComponentLogger();

View File

@ -26,6 +26,7 @@ import org.apache.nifi.documentation.util.ReflectionUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessorInitializationContext;
/**
* Initializes a Procesor using a MockProcessorInitializationContext
@ -37,15 +38,16 @@ public class ProcessorInitializer implements ConfigurableComponentInitializer {
@Override
public void initialize(ConfigurableComponent component) {
Processor processor = (Processor) component;
try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
processor.initialize(new MockProcessorInitializationContext());
ProcessorInitializationContext initializationContext = new MockProcessorInitializationContext();
try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), initializationContext.getIdentifier())) {
processor.initialize(initializationContext);
}
}
@Override
public void teardown(ConfigurableComponent component) {
Processor processor = (Processor) component;
try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
final ComponentLog logger = new MockComponentLogger();
final MockProcessContext context = new MockProcessContext();

View File

@ -25,6 +25,7 @@ import org.apache.nifi.documentation.mock.MockReportingInitializationContext;
import org.apache.nifi.documentation.util.ReflectionUtils;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
/**
@ -37,15 +38,16 @@ public class ReportingTaskingInitializer implements ConfigurableComponentInitial
@Override
public void initialize(ConfigurableComponent component) throws InitializationException {
ReportingTask reportingTask = (ReportingTask) component;
try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
reportingTask.initialize(new MockReportingInitializationContext());
ReportingInitializationContext context = new MockReportingInitializationContext();
try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), context.getIdentifier())) {
reportingTask.initialize(context);
}
}
@Override
public void teardown(ConfigurableComponent component) {
ReportingTask reportingTask = (ReportingTask) component;
try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
final MockConfigurationContext context = new MockConfigurationContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, reportingTask, new MockComponentLogger(), context);

View File

@ -16,10 +16,27 @@
*/
package org.apache.nifi.controller;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.InstanceClassLoader;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -30,14 +47,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.nar.NarCloseable;
public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent {
private final String id;
@ -48,13 +57,17 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
private final AtomicReference<String> annotationData = new AtomicReference<>();
private final String componentType;
private final String componentCanonicalClass;
private final VariableRegistry variableRegistry;
private final ComponentLog logger;
private final Lock lock = new ReentrantLock();
private final ConcurrentMap<PropertyDescriptor, String> properties = new ConcurrentHashMap<>();
public AbstractConfiguredComponent(final ConfigurableComponent component, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
final String componentType, final String componentCanonicalClass) {
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry,
final ComponentLog logger) {
this.id = id;
this.component = component;
this.validationContextFactory = validationContextFactory;
@ -62,6 +75,8 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
this.name = new AtomicReference<>(component.getClass().getSimpleName());
this.componentType = componentType;
this.componentCanonicalClass = componentCanonicalClass;
this.variableRegistry = variableRegistry;
this.logger = logger;
}
@Override
@ -90,47 +105,72 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
}
@Override
public void setProperty(final String name, final String value) {
if (null == name || null == value) {
throw new IllegalArgumentException();
public void setProperties(Map<String, String> properties) {
if (properties == null) {
return;
}
lock.lock();
try {
verifyModifiable();
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), id)) {
final Set<String> modulePaths = new LinkedHashSet<>();
for (final Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey() != null && entry.getValue() == null) {
removeProperty(entry.getKey());
} else if (entry.getKey() != null) {
setProperty(entry.getKey(), entry.getValue());
final String oldValue = properties.put(descriptor, value);
if (!value.equals(oldValue)) {
if (descriptor.getControllerServiceDefinition() != null) {
if (oldValue != null) {
final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(oldValue);
if (oldNode != null) {
oldNode.removeReference(this);
}
// for any properties that dynamically modify the classpath, attempt to evaluate them for expression language
final PropertyDescriptor descriptor = component.getPropertyDescriptor(entry.getKey());
if (descriptor.isDynamicClasspathModifier() && !StringUtils.isEmpty(entry.getValue())) {
final StandardPropertyValue propertyValue = new StandardPropertyValue(entry.getValue(), null, variableRegistry);
modulePaths.add(propertyValue.evaluateAttributeExpressions().getValue());
}
final ControllerServiceNode newNode = serviceProvider.getControllerServiceNode(value);
if (newNode != null) {
newNode.addReference(this);
}
}
try {
component.onPropertyModified(descriptor, oldValue, value);
} catch (final Exception e) {
// nothing really to do here...
}
}
processClasspathModifiers(modulePaths);
}
} finally {
lock.unlock();
}
}
// Keep setProperty/removeProperty private so that all calls go through setProperties
private void setProperty(final String name, final String value) {
if (null == name || null == value) {
throw new IllegalArgumentException("Name or Value can not be null");
}
final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
final String oldValue = properties.put(descriptor, value);
if (!value.equals(oldValue)) {
if (descriptor.getControllerServiceDefinition() != null) {
if (oldValue != null) {
final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(oldValue);
if (oldNode != null) {
oldNode.removeReference(this);
}
}
final ControllerServiceNode newNode = serviceProvider.getControllerServiceNode(value);
if (newNode != null) {
newNode.addReference(this);
}
}
try {
component.onPropertyModified(descriptor, oldValue, value);
} catch (final Exception e) {
// nothing really to do here...
}
}
}
/**
* Removes the property and value for the given property name if a
* descriptor and value exists for the given name. If the property is
@ -141,48 +181,74 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
* @return true if removed; false otherwise
* @throws java.lang.IllegalArgumentException if the name is null
*/
@Override
public boolean removeProperty(final String name) {
private boolean removeProperty(final String name) {
if (null == name) {
throw new IllegalArgumentException();
throw new IllegalArgumentException("Name can not be null");
}
lock.lock();
try {
verifyModifiable();
final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
String value = null;
if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
String value = null;
if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) {
if (descriptor.getControllerServiceDefinition() != null) {
if (value != null) {
final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(value);
if (oldNode != null) {
oldNode.removeReference(this);
}
}
if (descriptor.getControllerServiceDefinition() != null) {
if (value != null) {
final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(value);
if (oldNode != null) {
oldNode.removeReference(this);
}
try {
component.onPropertyModified(descriptor, value, null);
} catch (final Exception e) {
// nothing really to do here...
}
return true;
}
}
} finally {
lock.unlock();
try {
component.onPropertyModified(descriptor, value, null);
} catch (final Exception e) {
logger.error(e.getMessage(), e);
}
return true;
}
return false;
}
/**
* Adds all of the modules identified by the given module paths to the InstanceClassLoader for this component.
*
* @param modulePaths a list of module paths where each entry can be a comma-separated list of multiple module paths
*/
private void processClasspathModifiers(final Set<String> modulePaths) {
try {
final URL[] urls = ClassLoaderUtils.getURLsForClasspath(modulePaths, null, true);
if (logger.isDebugEnabled()) {
logger.debug("Adding {} resources to the classpath for {}", new Object[] {urls.length, name});
for (URL url : urls) {
logger.debug(url.getFile());
}
}
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (!(classLoader instanceof InstanceClassLoader)) {
// Really shouldn't happen, but if we somehow got here and don't have an InstanceClassLoader then log a warning and move on
final String classLoaderName = classLoader == null ? "null" : classLoader.getClass().getName();
if (logger.isWarnEnabled()) {
logger.warn(String.format("Unable to modify the classpath for %s, expected InstanceClassLoader, but found %s", name, classLoaderName));
}
return;
}
final InstanceClassLoader instanceClassLoader = (InstanceClassLoader) classLoader;
instanceClassLoader.setInstanceResources(urls);
} catch (MalformedURLException e) {
// Shouldn't get here since we are suppressing errors
logger.warn("Error processing classpath resources", e);
}
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
final List<PropertyDescriptor> supported = component.getPropertyDescriptors();
if (supported == null || supported.isEmpty()) {
return Collections.unmodifiableMap(properties);
@ -226,35 +292,35 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
@Override
public String toString() {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
return component.toString();
}
}
@Override
public Collection<ValidationResult> validate(final ValidationContext context) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
return component.validate(context);
}
}
@Override
public PropertyDescriptor getPropertyDescriptor(final String name) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
return component.getPropertyDescriptor(name);
}
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
component.onPropertyModified(descriptor, oldValue, newValue);
}
}
@Override
public List<PropertyDescriptor> getPropertyDescriptors() {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
return component.getPropertyDescriptors();
}
}
@ -286,7 +352,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
serviceIdentifiersNotToValidate, getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(component.getClass(), component.getIdentifier())) {
validationResults = component.validate(validationContext);
}
@ -327,4 +393,9 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
protected ValidationContextFactory getValidationContextFactory() {
return this.validationContextFactory;
}
protected VariableRegistry getVariableRegistry() {
return this.variableRegistry;
}
}

View File

@ -35,25 +35,7 @@ public interface ConfiguredComponent extends Authorizable {
public void setAnnotationData(String data);
/**
* Sets the property with the given name to the given value
*
* @param name the name of the property to update
* @param value the value to update the property to
*/
public void setProperty(String name, String value);
/**
* Removes the property and value for the given property name if a
* descriptor and value exists for the given name. If the property is
* optional its value might be reset to default or will be removed entirely
* if was a dynamic property.
*
* @param name the property to remove
* @return true if removed; false otherwise
* @throws java.lang.IllegalArgumentException if the name is null
*/
public boolean removeProperty(String name);
public void setProperties(Map<String, String> properties);
public Map<PropertyDescriptor, String> getProperties();
@ -75,4 +57,5 @@ public interface ConfiguredComponent extends Authorizable {
* @return the Canonical Class Name of the component
*/
String getCanonicalClassName();
}

View File

@ -28,10 +28,12 @@ import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,9 +45,10 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
protected final AtomicReference<ScheduledState> scheduledState;
public ProcessorNode(final Processor processor, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
final String componentType, final String componentCanonicalClass) {
super(processor, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass);
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry,
final ComponentLog logger) {
super(processor, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, logger);
this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
}

View File

@ -731,7 +731,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private void notifyComponentsConfigurationRestored() {
for (final ProcessorNode procNode : getGroup(getRootGroupId()).findAllProcessors()) {
final Processor processor = procNode.getProcessor();
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor);
}
}
@ -739,7 +739,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
final ControllerService service = serviceNode.getControllerServiceImplementation();
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass())) {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass(), service.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
}
}
@ -747,7 +747,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
final ReportingTask task = taskNode.getReportingTask();
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(task.getClass())) {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(task.getClass(), task.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, task);
}
}
@ -1046,21 +1046,22 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
creationSuccessful = false;
}
final ComponentLog logger = new SimpleProcessLogger(id, processor);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry);
final ProcessorNode procNode;
if (creationSuccessful) {
procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties);
procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, nifiProperties, variableRegistry, logger);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties);
procNode = new StandardProcessorNode(processor, id, validationContextFactory, processScheduler, controllerServiceProvider, componentType, type, nifiProperties, variableRegistry, logger);
}
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
if (firstTimeAdded) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
} catch (final Exception e) {
logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
@ -1068,7 +1069,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
if (firstTimeAdded) {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass())) {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
}
}
@ -1082,14 +1083,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type);
final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type, identifier);
final Class<?> rawClass;
if (detectedClassLoaderForType == null) {
// try to find from the current class loader
rawClass = Class.forName(type);
} else {
// try to find from the registered classloader for that type
rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type));
rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type, identifier));
}
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
@ -1328,7 +1329,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// invoke any methods annotated with @OnShutdown on Controller Services
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass(), serviceNode.getIdentifier())) {
final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider, null, variableRegistry);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext);
}
@ -1337,7 +1338,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// invoke any methods annotated with @OnShutdown on Reporting Tasks
for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
final ConfigurationContext configContext = taskNode.getConfigurationContext();
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, taskNode.getReportingTask(), configContext);
}
}
@ -1609,12 +1610,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
final String serviceId = controllerServiceDTO.getId();
final ControllerServiceNode serviceNode = getControllerServiceNode(serviceId);
for (final Map.Entry<String, String> entry : controllerServiceDTO.getProperties().entrySet()) {
if (entry.getValue() != null) {
serviceNode.setProperty(entry.getKey(), entry.getValue());
}
}
serviceNode.setProperties(controllerServiceDTO.getProperties());
}
//
@ -1728,11 +1724,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
if (config.getProperties() != null) {
for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) {
if (entry.getValue() != null) {
procNode.setProperty(entry.getKey(), entry.getValue());
}
}
procNode.setProperties(config.getProperties());
}
group.addProcessor(procNode);
@ -2826,7 +2818,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
boolean creationSuccessful = true;
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type);
final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type, id);
final Class<?> rawClass;
if (detectedClassLoader == null) {
rawClass = Class.forName(type);
@ -2851,15 +2843,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
final ComponentLog logger = new SimpleProcessLogger(id, task);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider, variableRegistry);
final ReportingTaskNode taskNode;
if (creationSuccessful) {
taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry);
taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, variableRegistry, logger);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry);
taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory, componentType, type, variableRegistry, logger);
}
taskNode.setName(task.getClass().getSimpleName());
@ -2875,7 +2868,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie);
}
try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getReportingTask().getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask());
} catch (final Exception e) {
@ -2929,7 +2922,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
reportingTaskNode.verifyCanDelete();
try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getReportingTask().getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
}
@ -2947,6 +2940,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
reportingTasks.remove(reportingTaskNode.getIdentifier());
ExtensionManager.removeInstanceClassLoaderIfExists(reportingTaskNode.getIdentifier());
}
@Override
@ -2966,7 +2960,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (firstTimeAdded) {
final ControllerService service = serviceNode.getControllerServiceImplementation();
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass())) {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(service.getClass(), service.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
}
}
@ -3085,7 +3079,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
service.verifyCanDelete();
try (final NarCloseable x = NarCloseable.withComponentNarLoader(service.getControllerServiceImplementation().getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(service.getControllerServiceImplementation().getClass(), service.getIdentifier())) {
final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null, variableRegistry);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext);
}
@ -3106,6 +3100,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
rootControllerServices.remove(service.getIdentifier());
getStateManagerProvider().onComponentRemoved(service.getIdentifier());
ExtensionManager.removeInstanceClassLoaderIfExists(service.getIdentifier());
LOG.info("{} removed from Flow Controller", service, this);
}
@ -3451,17 +3447,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
final ProcessGroup rootGroup = getGroup(getRootGroupId());
for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
}
}
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(serviceNode.getControllerServiceImplementation().getClass(), serviceNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
}
}
for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
}
}

View File

@ -408,11 +408,15 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
.filter(e -> controllerServiceMapping.containsKey(e.getValue()))
.collect(Collectors.toSet());
final Map<String,String> controllerServiceProps = new HashMap<>();
for (Map.Entry<PropertyDescriptor, String> propEntry : propertyDescriptors) {
final PropertyDescriptor propertyDescriptor = propEntry.getKey();
final ControllerServiceNode clone = controllerServiceMapping.get(propEntry.getValue());
reportingTask.setProperty(propertyDescriptor.getName(), clone.getIdentifier());
controllerServiceProps.put(propertyDescriptor.getName(), clone.getIdentifier());
}
reportingTask.setProperties(controllerServiceProps);
}
}
}
@ -514,14 +518,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
reportingTask.setAnnotationData(dto.getAnnotationData());
for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
if (entry.getValue() == null) {
reportingTask.removeProperty(entry.getKey());
} else {
reportingTask.setProperty(entry.getKey(), entry.getValue());
}
}
reportingTask.setProperties(dto.getProperties());
final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask());
final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(),
@ -922,13 +919,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
procNode.setAutoTerminatedRelationships(relationships);
}
for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) {
if (entry.getValue() == null) {
procNode.removeProperty(entry.getKey());
} else {
procNode.setProperty(entry.getKey(), entry.getValue());
}
}
procNode.setProperties(config.getProperties());
final ScheduledState scheduledState = ScheduledState.valueOf(processorDTO.getState());
if (ScheduledState.RUNNING.equals(scheduledState)) {

View File

@ -54,6 +54,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
@ -135,19 +136,21 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// ??????? NOT any more
public StandardProcessorNode(final Processor processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider, final NiFiProperties nifiProperties) {
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider, final NiFiProperties nifiProperties,
final VariableRegistry variableRegistry, final ComponentLog logger) {
this(processor, uuid, validationContextFactory, scheduler, controllerServiceProvider,
processor.getClass().getSimpleName(), processor.getClass().getCanonicalName(), nifiProperties);
processor.getClass().getSimpleName(), processor.getClass().getCanonicalName(), nifiProperties, variableRegistry, logger);
}
public StandardProcessorNode(final Processor processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider,
final String componentType, final String componentCanonicalClass, final NiFiProperties nifiProperties) {
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
final ControllerServiceProvider controllerServiceProvider,
final String componentType, final String componentCanonicalClass, final NiFiProperties nifiProperties,
final VariableRegistry variableRegistry, final ComponentLog logger) {
super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass);
super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, logger);
this.processor = processor;
identifier = new AtomicReference<>(uuid);
@ -811,7 +814,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
Relationship returnRel = specRel;
final Set<Relationship> relationships;
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
relationships = processor.getRelationships();
}
@ -857,7 +860,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
public Set<Relationship> getUndefinedRelationships() {
final Set<Relationship> undefined = new HashSet<>();
final Set<Relationship> relationships;
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
relationships = processor.getRelationships();
}
@ -913,7 +916,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
.newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier());
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
validationResults = getProcessor().validate(validationContext);
}
@ -960,7 +963,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
.newValidationContext(getProperties(), getAnnotationData(), getProcessGroup().getIdentifier(), getIdentifier());
final Collection<ValidationResult> validationResults;
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
validationResults = getProcessor().validate(validationContext);
}
@ -1036,14 +1039,14 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public Collection<Relationship> getRelationships() {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
return getProcessor().getRelationships();
}
}
@Override
public String toString() {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProcessor().getClass(), processor.getIdentifier())) {
return getProcessor().toString();
}
}
@ -1060,7 +1063,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
processor.onTrigger(context, sessionFactory);
}
}
@ -1240,7 +1243,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable<Void>() {
@Override
public Void call() throws Exception {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
return null;
}
@ -1250,7 +1253,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
} else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
}
scheduledState.set(ScheduledState.STOPPED);
@ -1325,7 +1328,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
try {
if (scheduleState.isScheduled()) {
schedulingAgent.unschedule(StandardProcessorNode.this, scheduleState);
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
}
}
@ -1334,7 +1337,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// performing the lifecycle actions counts as 1 thread.
final boolean allThreadsComplete = scheduleState.getActiveThreadCount() == 1;
if (allThreadsComplete) {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass())) {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
}

View File

@ -33,6 +33,7 @@ import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
@ -50,27 +51,26 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
private volatile String comment;
private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
protected final VariableRegistry variableRegistry;
public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory, final VariableRegistry variableRegistry) {
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory, final VariableRegistry variableRegistry,
final ComponentLog logger) {
this(reportingTask, id, controllerServiceProvider, processScheduler, validationContextFactory,
reportingTask.getClass().getSimpleName(), reportingTask.getClass().getCanonicalName(),variableRegistry);
reportingTask.getClass().getSimpleName(), reportingTask.getClass().getCanonicalName(),variableRegistry, logger);
}
public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory,
final String componentType, final String componentCanonicalClass, VariableRegistry variableRegistry) {
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory,
final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry,
final ComponentLog logger) {
super(reportingTask, id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass);
super(reportingTask, id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, logger);
this.reportingTask = reportingTask;
this.processScheduler = processScheduler;
this.serviceLookup = controllerServiceProvider;
this.variableRegistry = variableRegistry;
}
@Override
@ -115,7 +115,7 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
@Override
public ConfigurationContext getConfigurationContext() {
return new StandardConfigurationContext(this, serviceLookup, getSchedulingPeriod(), variableRegistry);
return new StandardConfigurationContext(this, serviceLookup, getSchedulingPeriod(), getVariableRegistry());
}
@Override
@ -135,17 +135,6 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
this.scheduledState = state;
}
@Override
public void setProperty(final String name, final String value) {
super.setProperty(name, value);
}
@Override
public boolean removeProperty(String name) {
return super.removeProperty(name);
}
public boolean isDisabled() {
return scheduledState == ScheduledState.DISABLED;
}

View File

@ -24,6 +24,7 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingTask;
@ -34,15 +35,15 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
public StandardReportingTaskNode(final ReportingTask reportingTask, final String id, final FlowController controller,
final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
final VariableRegistry variableRegistry) {
super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry);
final VariableRegistry variableRegistry, final ComponentLog logger) {
super(reportingTask, id, controller, processScheduler, validationContextFactory, variableRegistry, logger);
this.flowController = controller;
}
public StandardReportingTaskNode(final ReportingTask reportingTask, final String id, final FlowController controller,
final ProcessScheduler processScheduler, final ValidationContextFactory validationContextFactory,
final String componentType, final String canonicalClassName, VariableRegistry variableRegistry) {
super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName,variableRegistry);
final String componentType, final String canonicalClassName, final VariableRegistry variableRegistry, final ComponentLog logger) {
super(reportingTask, id, controller, processScheduler, validationContextFactory, componentType, canonicalClassName,variableRegistry, logger);
this.flowController = controller;
}
@ -58,6 +59,6 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme
@Override
public ReportingContext getReportingContext() {
return new StandardReportingContext(flowController, flowController.getBulletinRepository(), getProperties(), flowController, getReportingTask(), variableRegistry);
return new StandardReportingContext(flowController, flowController.getBulletinRepository(), getProperties(), flowController, getReportingTask(), getVariableRegistry());
}
}

View File

@ -287,7 +287,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
}
try {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(worker.getClass())) {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(worker.getClass(), worker.getIdentifier())) {
worker.onTrigger(processContext, sessionFactory);
} catch (final ProcessException pe) {
logger.error("{} failed to process session due to {}", worker, pe.toString());
@ -305,7 +305,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
}
} finally {
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(worker.getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(worker.getClass(), worker.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker, processContext);
}
}
@ -328,7 +328,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
}
try {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(worker.getProcessor().getClass())) {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(worker.getProcessor().getClass(), worker.getIdentifier())) {
worker.onTrigger(processContext, sessionFactory);
} catch (final ProcessException pe) {
final ComponentLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor());
@ -347,7 +347,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
// if the processor is no longer scheduled to run and this is the last thread,
// invoke the OnStopped methods
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(worker.getProcessor().getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(worker.getProcessor().getClass(), worker.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker.getProcessor(), processContext);
}
}

View File

@ -209,7 +209,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
return;
}
try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
}
@ -262,7 +262,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
scheduleState.setScheduled(false);
try {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext);
}
} catch (final Exception e) {
@ -436,7 +436,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) {
final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier()));
try (final NarCloseable x = NarCloseable.withComponentNarLoader(connectable.getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(connectable.getClass(), connectable.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext);
}
}

View File

@ -166,11 +166,11 @@ public class ControllerServiceLoader {
clone.setComments(controllerService.getComments());
if (controllerService.getProperties() != null) {
Map<String,String> properties = new HashMap<>();
for (Map.Entry<PropertyDescriptor, String> propEntry : controllerService.getProperties().entrySet()) {
if (propEntry.getValue() != null) {
clone.setProperty(propEntry.getKey().getName(), propEntry.getValue());
}
properties.put(propEntry.getKey().getName(), propEntry.getValue());
}
clone.setProperties(properties);
}
return clone;
@ -188,14 +188,7 @@ public class ControllerServiceLoader {
private static void configureControllerService(final ControllerServiceNode node, final Element controllerServiceElement, final StringEncryptor encryptor) {
final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
node.setAnnotationData(dto.getAnnotationData());
for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
if (entry.getValue() == null) {
node.removeProperty(entry.getKey());
} else {
node.setProperty(entry.getKey(), entry.getValue());
}
}
node.setProperties(dto.getProperties());
}
}

View File

@ -32,6 +32,7 @@ import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.ReflectionUtils;
@ -61,7 +62,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
private final ControllerService proxedControllerService;
private final ControllerService implementation;
private final ControllerServiceProvider serviceProvider;
private final VariableRegistry variableRegistry;
private final AtomicReference<ControllerServiceState> stateRef = new AtomicReference<>(ControllerServiceState.DISABLED);
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
@ -76,22 +76,22 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
final VariableRegistry variableRegistry) {
final VariableRegistry variableRegistry, final ComponentLog logger) {
this(proxiedControllerService, implementation, id, validationContextFactory, serviceProvider,
implementation.getClass().getSimpleName(), implementation.getClass().getCanonicalName(), variableRegistry);
implementation.getClass().getSimpleName(), implementation.getClass().getCanonicalName(), variableRegistry, logger);
}
public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
final String componentType, final String componentCanonicalClass, VariableRegistry variableRegistry) {
final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry,
final ComponentLog logger) {
super(implementation, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass);
super(implementation, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, logger);
this.proxedControllerService = proxiedControllerService;
this.implementation = implementation;
this.serviceProvider = serviceProvider;
this.active = new AtomicBoolean();
this.variableRegistry = variableRegistry;
}
@ -202,16 +202,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
}
}
@Override
public void setProperty(final String name, final String value) {
super.setProperty(name, value);
}
@Override
public boolean removeProperty(String name) {
return super.removeProperty(name);
}
@Override
public void verifyCanDelete() {
if (getState() != ControllerServiceState.DISABLED) {
@ -340,12 +330,14 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) {
if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) {
this.active.set(true);
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, variableRegistry);
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, getVariableRegistry());
scheduler.execute(new Runnable() {
@Override
public void run() {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
}
boolean shouldEnable = false;
synchronized (active) {
shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED);
@ -367,7 +359,9 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
if (isActive()) {
scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
} else {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
}
stateRef.set(ControllerServiceState.DISABLED);
}
}
@ -403,7 +397,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
}
if (this.stateRef.compareAndSet(ControllerServiceState.ENABLED, ControllerServiceState.DISABLING)) {
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, variableRegistry);
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, getVariableRegistry());
scheduler.execute(new Runnable() {
@Override
public void run() {
@ -423,7 +417,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
*
*/
private void invokeDisable(ConfigurationContext configContext) {
try {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getControllerServiceImplementation().getClass(), getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
} catch (Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;

View File

@ -131,7 +131,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
final ClassLoader cl = ExtensionManager.getClassLoader(type);
final ClassLoader cl = ExtensionManager.getClassLoader(type, id);
final Class<?> rawClass;
try {
@ -165,7 +165,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final boolean disabled = state != ControllerServiceState.ENABLED; // only allow method call if service state is ENABLED.
if (disabled && !validDisabledMethods.contains(method)) {
// Use nar class loader here because we are implicitly calling toString() on the original implementation.
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) {
throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService.getIdentifier()
+ " because the Controller Service is disabled");
} catch (final Throwable e) {
@ -173,7 +173,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) {
return method.invoke(originalService, args);
} catch (final InvocationTargetException e) {
// If the ControllerService throws an Exception, it'll be wrapped in an InvocationTargetException. We want
@ -194,14 +194,15 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final ComponentLog serviceLogger = new SimpleProcessLogger(id, originalService);
originalService.initialize(new StandardControllerServiceInitializationContext(id, serviceLogger, this, getStateManager(id), nifiProperties));
final ComponentLog logger = new SimpleProcessLogger(id, originalService);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this, variableRegistry);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this, variableRegistry);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this, variableRegistry, logger);
serviceNodeHolder.set(serviceNode);
serviceNode.setName(rawClass.getSimpleName());
if (firstTimeAdded) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(originalService.getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + originalService, e);
@ -264,8 +265,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
final ComponentLog logger = new SimpleProcessLogger(id, proxiedService);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, proxiedService, id,
new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry);
new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry, logger);
return serviceNode;
}
@ -585,6 +588,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
group.removeControllerService(serviceNode);
ExtensionManager.removeInstanceClassLoaderIfExists(serviceNode.getIdentifier());
}
@Override

View File

@ -76,7 +76,7 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
if (shouldRun) {
scheduleState.incrementActiveThreadCount();
try {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(connectable.getClass())) {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(connectable.getClass(), connectable.getIdentifier())) {
connectable.onTrigger(processContext, sessionFactory);
} catch (final ProcessException pe) {
logger.error("{} failed to process session due to {}", connectable, pe.toString());
@ -93,7 +93,7 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
}
} finally {
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(connectable.getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(connectable.getClass(), connectable.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext);
}
}

View File

@ -130,7 +130,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
final long finishNanos = startNanos + batchNanos;
int invocationCount = 0;
try {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass())) {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())) {
boolean shouldRun = true;
while (shouldRun) {
procNode.onTrigger(processContext, sessionFactory);

View File

@ -37,7 +37,7 @@ public class ReportingTaskWrapper implements Runnable {
@Override
public synchronized void run() {
scheduleState.incrementActiveThreadCount();
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass())) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) {
taskNode.getReportingTask().onTrigger(taskNode.getReportingContext());
} catch (final Throwable t) {
final ComponentLog componentLog = new SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask());
@ -50,7 +50,7 @@ public class ReportingTaskWrapper implements Runnable {
// if the reporting task is no longer scheduled to run and this is the last thread,
// invoke the OnStopped methods
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, taskNode.getReportingTask(), taskNode.getConfigurationContext());
}
}

View File

@ -53,6 +53,7 @@ import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.VariableRegistry;
@ -349,7 +350,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private void shutdown(final ProcessGroup procGroup) {
for (final ProcessorNode node : procGroup.getProcessors()) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(node.getProcessor().getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(node.getProcessor().getClass(), node.getIdentifier())) {
final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), variableRegistry);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext);
}
@ -708,7 +709,7 @@ public final class StandardProcessGroup implements ProcessGroup {
conn.verifyCanDelete();
}
try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getProcessor().getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getProcessor().getClass(), processor.getIdentifier())) {
final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), variableRegistry);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext);
} catch (final Exception e) {
@ -745,6 +746,7 @@ public final class StandardProcessGroup implements ProcessGroup {
removeConnection(conn);
}
ExtensionManager.removeInstanceClassLoaderIfExists(id);
LOG.info("{} removed from flow", processor);
} finally {
writeLock.unlock();
@ -1847,7 +1849,7 @@ public final class StandardProcessGroup implements ProcessGroup {
service.verifyCanDelete();
try (final NarCloseable x = NarCloseable.withComponentNarLoader(service.getControllerServiceImplementation().getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(service.getControllerServiceImplementation().getClass(), service.getIdentifier())) {
final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null, variableRegistry);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext);
}

View File

@ -17,16 +17,6 @@
package org.apache.nifi.controller;
import static org.junit.Assert.assertEquals;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@ -35,26 +25,72 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.expression.ExpressionLanguageCompiler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.InstanceClassLoader;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.StandardProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.test.processors.ModifiesClasspathNoAnnotationProcessor;
import org.apache.nifi.test.processors.ModifiesClasspathProcessor;
import org.apache.nifi.util.MockPropertyValue;
import org.apache.nifi.util.MockVariableRegistry;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestStandardProcessorNode {
private MockVariableRegistry variableRegistry;
@Before
public void setup() {
variableRegistry = new MockVariableRegistry();
}
@Test(timeout = 10000)
public void testStart() throws InterruptedException {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessorNode.class.getResource("/conf/nifi.properties").getFile());
final ProcessorThatThrowsExceptionOnScheduled processor = new ProcessorThatThrowsExceptionOnScheduled();
final String uuid = UUID.randomUUID().toString();
final StandardProcessorNode procNode = new StandardProcessorNode(processor, uuid, createValidationContextFactory(), null, null, NiFiProperties.createBasicNiFiProperties(null, null));
final ScheduledExecutorService taskScheduler = new FlowEngine(2, "TestStandardProcessorNode", true);
ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(uuid, null, null, null, null);
processor.initialize(initContext);
final StandardProcessorNode procNode = new StandardProcessorNode(processor, uuid, createValidationContextFactory(), null, null,
NiFiProperties.createBasicNiFiProperties(null, null), VariableRegistry.EMPTY_REGISTRY, Mockito.mock(ComponentLog.class));
final ScheduledExecutorService taskScheduler = new FlowEngine(2, "TestClasspathResources", true);
final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, null);
final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() {
@ -81,6 +117,220 @@ public class TestStandardProcessorNode {
assertEquals(1, processor.onStoppedCount);
}
@Test
public void testSinglePropertyDynamicallyModifiesClasspath() throws MalformedURLException {
final PropertyDescriptor classpathProp = new PropertyDescriptor.Builder().name("Classpath Resources")
.dynamicallyModifiesClasspath(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp));
final StandardProcessorNode procNode = createProcessorNode(processor);
final Set<ClassLoader> classLoaders = new HashSet<>();
classLoaders.add(procNode.getProcessor().getClass().getClassLoader());
// Load all of the extensions in src/test/java of this project
ExtensionManager.discoverExtensions(classLoaders);
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){
// Should have an InstanceClassLoader here
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
assertTrue(contextClassLoader instanceof InstanceClassLoader);
final InstanceClassLoader instanceClassLoader = (InstanceClassLoader) contextClassLoader;
// Should not have any of the test resources loaded at this point
final URL[] testResources = getTestResources();
for (URL testResource : testResources) {
if (containsResource(instanceClassLoader.getInstanceResources(), testResource)) {
fail("found resource that should not have been loaded");
}
}
// Simulate setting the properties of the processor to point to the test resources directory
final Map<String, String> properties = new HashMap<>();
properties.put(classpathProp.getName(), "src/test/resources/TestClasspathResources");
procNode.setProperties(properties);
// Should have all of the resources loaded into the InstanceClassLoader now
for (URL testResource : testResources) {
assertTrue(containsResource(instanceClassLoader.getInstanceResources(), testResource));
}
// Should pass validation
assertTrue(procNode.isValid());
} finally {
ExtensionManager.removeInstanceClassLoaderIfExists(procNode.getIdentifier());
}
}
@Test
public void testMultiplePropertiesDynamicallyModifyClasspathWithExpressionLanguage() throws MalformedURLException {
final PropertyDescriptor classpathProp1 = new PropertyDescriptor.Builder().name("Classpath Resource 1")
.dynamicallyModifiesClasspath(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
final PropertyDescriptor classpathProp2 = new PropertyDescriptor.Builder().name("Classpath Resource 2")
.dynamicallyModifiesClasspath(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp1, classpathProp2));
final StandardProcessorNode procNode = createProcessorNode(processor);
final Set<ClassLoader> classLoaders = new HashSet<>();
classLoaders.add(procNode.getProcessor().getClass().getClassLoader());
// Load all of the extensions in src/test/java of this project
ExtensionManager.discoverExtensions(classLoaders);
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){
// Should have an InstanceClassLoader here
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
assertTrue(contextClassLoader instanceof InstanceClassLoader);
final InstanceClassLoader instanceClassLoader = (InstanceClassLoader) contextClassLoader;
// Should not have any of the test resources loaded at this point
final URL[] testResources = getTestResources();
for (URL testResource : testResources) {
if (containsResource(instanceClassLoader.getInstanceResources(), testResource)) {
fail("found resource that should not have been loaded");
}
}
// Simulate setting the properties pointing to two of the resources
final Map<String, String> properties = new HashMap<>();
properties.put(classpathProp1.getName(), "src/test/resources/TestClasspathResources/resource1.txt");
properties.put(classpathProp2.getName(), "src/test/resources/TestClasspathResources/${myResource}");
variableRegistry.setVariable(new VariableDescriptor("myResource"), "resource3.txt");
procNode.setProperties(properties);
// Should have resources 1 and 3 loaded into the InstanceClassLoader now
assertTrue(containsResource(instanceClassLoader.getInstanceResources(), testResources[0]));
assertTrue(containsResource(instanceClassLoader.getInstanceResources(), testResources[2]));
assertFalse(containsResource(instanceClassLoader.getInstanceResources(), testResources[1]));
// Should pass validation
assertTrue(procNode.isValid());
} finally {
ExtensionManager.removeInstanceClassLoaderIfExists(procNode.getIdentifier());
}
}
@Test
public void testSomeNonExistentPropertiesDynamicallyModifyClasspath() throws MalformedURLException {
final PropertyDescriptor classpathProp1 = new PropertyDescriptor.Builder().name("Classpath Resource 1")
.dynamicallyModifiesClasspath(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
final PropertyDescriptor classpathProp2 = new PropertyDescriptor.Builder().name("Classpath Resource 2")
.dynamicallyModifiesClasspath(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp1, classpathProp2));
final StandardProcessorNode procNode = createProcessorNode(processor);
final Set<ClassLoader> classLoaders = new HashSet<>();
classLoaders.add(procNode.getProcessor().getClass().getClassLoader());
// Load all of the extensions in src/test/java of this project
ExtensionManager.discoverExtensions(classLoaders);
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){
// Should have an InstanceClassLoader here
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
assertTrue(contextClassLoader instanceof InstanceClassLoader);
final InstanceClassLoader instanceClassLoader = (InstanceClassLoader) contextClassLoader;
// Should not have any of the test resources loaded at this point
final URL[] testResources = getTestResources();
for (URL testResource : testResources) {
if (containsResource(instanceClassLoader.getInstanceResources(), testResource)) {
fail("found resource that should not have been loaded");
}
}
// Simulate setting the properties pointing to two of the resources
final Map<String, String> properties = new HashMap<>();
properties.put(classpathProp1.getName(), "src/test/resources/TestClasspathResources/resource1.txt");
properties.put(classpathProp2.getName(), "src/test/resources/TestClasspathResources/DoesNotExist.txt");
procNode.setProperties(properties);
// Should have resources 1 and 3 loaded into the InstanceClassLoader now
assertTrue(containsResource(instanceClassLoader.getInstanceResources(), testResources[0]));
assertFalse(containsResource(instanceClassLoader.getInstanceResources(), testResources[1]));
assertFalse(containsResource(instanceClassLoader.getInstanceResources(), testResources[2]));
// Should pass validation
assertTrue(procNode.isValid());
} finally {
ExtensionManager.removeInstanceClassLoaderIfExists(procNode.getIdentifier());
}
}
@Test
public void testPropertyModifiesClasspathWhenProcessorMissingAnnotation() throws MalformedURLException {
final ModifiesClasspathNoAnnotationProcessor processor = new ModifiesClasspathNoAnnotationProcessor();
final StandardProcessorNode procNode = createProcessorNode(processor);
final Set<ClassLoader> classLoaders = new HashSet<>();
classLoaders.add(procNode.getProcessor().getClass().getClassLoader());
// Load all of the extensions in src/test/java of this project
ExtensionManager.discoverExtensions(classLoaders);
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){
// Can't validate the ClassLoader here b/c the class is missing the annotation
// Simulate setting the properties pointing to two of the resources
final Map<String, String> properties = new HashMap<>();
properties.put(ModifiesClasspathNoAnnotationProcessor.CLASSPATH_RESOURCE.getName(),
"src/test/resources/TestClasspathResources/resource1.txt");
procNode.setProperties(properties);
// Should not have loaded any of the resources
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
assertTrue(classLoader instanceof URLClassLoader);
final URL[] testResources = getTestResources();
final URLClassLoader urlClassLoader = (URLClassLoader) classLoader;
assertFalse(containsResource(urlClassLoader.getURLs(), testResources[0]));
assertFalse(containsResource(urlClassLoader.getURLs(), testResources[1]));
assertFalse(containsResource(urlClassLoader.getURLs(), testResources[2]));
// Should pass validation
assertTrue(procNode.isValid());
} finally {
ExtensionManager.removeInstanceClassLoaderIfExists(procNode.getIdentifier());
}
}
private StandardProcessorNode createProcessorNode(Processor processor) {
final String uuid = UUID.randomUUID().toString();
final ValidationContextFactory validationContextFactory = createValidationContextFactory();
final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, null);
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
final ComponentLog componentLog = Mockito.mock(ComponentLog.class);
ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(uuid, componentLog, null, null, null);
processor.initialize(initContext);
return new StandardProcessorNode(processor, uuid, validationContextFactory, processScheduler, null,
niFiProperties, variableRegistry, componentLog);
}
private boolean containsResource(URL[] resources, URL resourceToFind) {
for (URL resource : resources) {
if (resourceToFind.getPath().equals(resource.getPath())) {
return true;
}
}
return false;
}
private URL[] getTestResources() throws MalformedURLException {
URL resource1 = new File("src/test/resources/TestClasspathResources/resource1.txt").toURI().toURL();
URL resource2 = new File("src/test/resources/TestClasspathResources/resource2.txt").toURI().toURL();
URL resource3 = new File("src/test/resources/TestClasspathResources/resource3.txt").toURI().toURL();
return new URL[] { resource1, resource2, resource3 };
}
private ValidationContextFactory createValidationContextFactory() {
return new ValidationContextFactory() {
@ -180,4 +430,5 @@ public class TestStandardProcessorNode {
onStoppedCount++;
}
}
}

View File

@ -83,10 +83,12 @@ public class TestProcessorLifecycle {
private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class);
private FlowController fc;
private Map<String,String> properties = new HashMap<>();
@Before
public void before() throws Exception {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestProcessorLifecycle.class.getResource("/nifi.properties").getFile());
properties.put("P", "hello");
}
@After
@ -124,7 +126,7 @@ public class TestProcessorLifecycle {
this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
testProcNode.setProperties(properties);
assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState());
assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState());
// validates idempotency
@ -149,7 +151,7 @@ public class TestProcessorLifecycle {
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
@ -175,7 +177,7 @@ public class TestProcessorLifecycle {
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
// sets the scenario for the processor to run
@ -198,7 +200,7 @@ public class TestProcessorLifecycle {
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
@ -241,7 +243,7 @@ public class TestProcessorLifecycle {
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
@ -297,7 +299,7 @@ public class TestProcessorLifecycle {
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
@ -333,7 +335,7 @@ public class TestProcessorLifecycle {
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
@ -365,7 +367,7 @@ public class TestProcessorLifecycle {
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
@ -396,7 +398,7 @@ public class TestProcessorLifecycle {
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.blockingInterruptableOnUnschedule(testProcessor);
@ -424,7 +426,7 @@ public class TestProcessorLifecycle {
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
this.blockingUninterruptableOnUnschedule(testProcessor);
@ -457,7 +459,7 @@ public class TestProcessorLifecycle {
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run
@ -504,8 +506,8 @@ public class TestProcessorLifecycle {
ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "serv", true);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNode.setProperty("P", "hello");
testProcNode.setProperty("S", testServiceNode.getIdentifier());
properties.put("S", testServiceNode.getIdentifier());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
testProcessor.withService = true;
@ -529,8 +531,9 @@ public class TestProcessorLifecycle {
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testGroup.addProcessor(testProcNode);
testProcNode.setProperty("P", "hello");
testProcNode.setProperty("S", testServiceNode.getIdentifier());
properties.put("S", testServiceNode.getIdentifier());
testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
testProcessor.withService = true;
@ -556,11 +559,11 @@ public class TestProcessorLifecycle {
this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNodeA = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNodeA.setProperty("P", "hello");
testProcNodeA.setProperties(properties);
testGroup.addProcessor(testProcNodeA);
ProcessorNode testProcNodeB = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
testProcNodeB.setProperty("P", "hello");
testProcNodeB.setProperties(properties);
testGroup.addProcessor(testProcNodeB);
Collection<String> relationNames = new ArrayList<String>();

View File

@ -21,7 +21,9 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@ -57,6 +59,7 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.StandardProcessorInitializationContext;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.registry.VariableRegistry;
@ -95,7 +98,8 @@ public class TestStandardProcessScheduler {
reportingTask.initialize(config);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null, variableRegistry);
taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, variableRegistry);
final ComponentLog logger = Mockito.mock(ComponentLog.class);
taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, variableRegistry, logger);
controller = Mockito.mock(FlowController.class);
rootGroup = new MockProcessGroup();
@ -129,18 +133,24 @@ public class TestStandardProcessScheduler {
@Test(timeout = 60000)
public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException {
final String uuid = UUID.randomUUID().toString();
final Processor proc = new ServiceReferencingProcessor();
proc.initialize(new StandardProcessorInitializationContext(uuid, null, null, null, null));
final StandardControllerServiceProvider serviceProvider =
new StandardControllerServiceProvider(controller, scheduler, null, Mockito.mock(StateManagerProvider.class), variableRegistry, nifiProperties);
final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", true);
rootGroup.addControllerService(service);
final ProcessorNode procNode = new StandardProcessorNode(proc, UUID.randomUUID().toString(),
new StandardValidationContextFactory(serviceProvider, variableRegistry), scheduler, serviceProvider, nifiProperties);
final ProcessorNode procNode = new StandardProcessorNode(proc, uuid,
new StandardValidationContextFactory(serviceProvider, variableRegistry),
scheduler, serviceProvider, nifiProperties, VariableRegistry.EMPTY_REGISTRY,
Mockito.mock(ComponentLog.class));
rootGroup.addProcessor(procNode);
procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier());
Map<String,String> procProps = new HashMap<>();
procProps.put(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier());
procNode.setProperties(procProps);
scheduler.enableControllerService(service);
scheduler.startProcessor(procNode);

View File

@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.FlowController;
@ -44,6 +45,7 @@ import org.apache.nifi.controller.service.mock.ServiceB;
import org.apache.nifi.controller.service.mock.ServiceC;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.StandardProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.NiFiProperties;
@ -88,6 +90,12 @@ public class TestStandardControllerServiceProvider {
return new StandardProcessScheduler(null, null, stateManagerProvider, variableRegistry, NiFiProperties.createBasicNiFiProperties(null, null));
}
private void setProperty(ControllerServiceNode serviceNode, String propName, String propValue) {
Map<String,String> props = new LinkedHashMap<>();
props.put(propName, propValue);
serviceNode.setProperties(props);
}
@Test
public void testDisableControllerService() {
final ProcessGroup procGroup = new MockProcessGroup();
@ -118,7 +126,7 @@ public class TestStandardControllerServiceProvider {
group.addControllerService(serviceNodeA);
group.addControllerService(serviceNodeB);
serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B");
setProperty(serviceNodeA, ServiceA.OTHER_SERVICE.getName(), "B");
try {
provider.enableControllerService(serviceNodeA);
@ -158,7 +166,7 @@ public class TestStandardControllerServiceProvider {
* https://issues.apache.org/jira/browse/NIFI-1143
*/
@Test(timeout = 60000)
public void testConcurrencyWithEnablingReferencingServicesGraph() {
public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException {
final ProcessScheduler scheduler = createScheduler();
for (int i = 0; i < 10000; i++) {
testEnableReferencingServicesGraph(scheduler);
@ -195,10 +203,10 @@ public class TestStandardControllerServiceProvider {
procGroup.addControllerService(serviceNode3);
procGroup.addControllerService(serviceNode4);
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode2, ServiceA.OTHER_SERVICE.getName(), "4");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4");
provider.enableControllerService(serviceNode4);
provider.enableReferencingServices(serviceNode4);
@ -227,7 +235,7 @@ public class TestStandardControllerServiceProvider {
final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceB.class.getName(), "2", false);
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2");
final Map<String, ControllerServiceNode> nodeMap = new LinkedHashMap<>();
nodeMap.put("1", serviceNode1);
@ -257,7 +265,7 @@ public class TestStandardControllerServiceProvider {
// add circular dependency on self.
nodeMap.clear();
serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1");
setProperty(serviceNode1, ServiceA.OTHER_SERVICE_2.getName(), "1");
nodeMap.put("1", serviceNode1);
nodeMap.put("2", serviceNode2);
@ -284,8 +292,8 @@ public class TestStandardControllerServiceProvider {
// like that.
nodeMap.clear();
final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1");
setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "3");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "1");
nodeMap.put("1", serviceNode1);
nodeMap.put("3", serviceNode3);
branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
@ -307,10 +315,10 @@ public class TestStandardControllerServiceProvider {
// Add multiple completely disparate branches.
nodeMap.clear();
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2");
final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
final ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceB.class.getName(), "5", false);
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "4");
nodeMap.put("1", serviceNode1);
nodeMap.put("2", serviceNode2);
nodeMap.put("3", serviceNode3);
@ -341,8 +349,8 @@ public class TestStandardControllerServiceProvider {
// create 2 branches both dependent on the same service
nodeMap.clear();
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2");
nodeMap.put("1", serviceNode1);
nodeMap.put("2", serviceNode2);
nodeMap.put("3", serviceNode3);
@ -367,7 +375,9 @@ public class TestStandardControllerServiceProvider {
private ProcessorNode createProcessor(final StandardProcessScheduler scheduler, final ControllerServiceProvider serviceProvider) {
final ProcessorNode procNode = new StandardProcessorNode(new DummyProcessor(), UUID.randomUUID().toString(),
new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider, NiFiProperties.createBasicNiFiProperties(null, null));
new StandardValidationContextFactory(serviceProvider, null), scheduler, serviceProvider,
NiFiProperties.createBasicNiFiProperties(null, null),
VariableRegistry.EMPTY_REGISTRY, Mockito.mock(ComponentLog.class));
final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, null, variableRegistry);
group.addProcessor(procNode);
@ -422,12 +432,12 @@ public class TestStandardControllerServiceProvider {
procGroup.addControllerService(E);
procGroup.addControllerService(F);
A.setProperty(ServiceA.OTHER_SERVICE.getName(), "B");
B.setProperty(ServiceA.OTHER_SERVICE.getName(), "D");
C.setProperty(ServiceA.OTHER_SERVICE.getName(), "B");
C.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "D");
E.setProperty(ServiceA.OTHER_SERVICE.getName(), "A");
E.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "F");
setProperty(A, ServiceA.OTHER_SERVICE.getName(), "B");
setProperty(B, ServiceA.OTHER_SERVICE.getName(), "D");
setProperty(C, ServiceA.OTHER_SERVICE.getName(), "B");
setProperty(C, ServiceA.OTHER_SERVICE_2.getName(), "D");
setProperty(E, ServiceA.OTHER_SERVICE.getName(), "A");
setProperty(E, ServiceA.OTHER_SERVICE_2.getName(), "F");
provider.enableControllerServices(Arrays.asList(A, B, C, D, E, F));
@ -465,12 +475,12 @@ public class TestStandardControllerServiceProvider {
procGroup.addControllerService(D);
procGroup.addControllerService(F);
A.setProperty(ServiceC.REQ_SERVICE_1.getName(), "B");
A.setProperty(ServiceC.REQ_SERVICE_2.getName(), "D");
B.setProperty(ServiceA.OTHER_SERVICE.getName(), "C");
setProperty(A, ServiceC.REQ_SERVICE_1.getName(), "B");
setProperty(A, ServiceC.REQ_SERVICE_2.getName(), "D");
setProperty(B, ServiceA.OTHER_SERVICE.getName(), "C");
F.setProperty(ServiceA.OTHER_SERVICE.getName(), "D");
D.setProperty(ServiceA.OTHER_SERVICE.getName(), "C");
setProperty(F, ServiceA.OTHER_SERVICE.getName(), "D");
setProperty(D, ServiceA.OTHER_SERVICE.getName(), "C");
provider.enableControllerServices(Arrays.asList(C, F, A, B, D));
@ -506,13 +516,13 @@ public class TestStandardControllerServiceProvider {
procGroup.addControllerService(serviceNode6);
procGroup.addControllerService(serviceNode7);
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "6");
serviceNode7.setProperty(ServiceC.REQ_SERVICE_1.getName(), "2");
serviceNode7.setProperty(ServiceC.REQ_SERVICE_2.getName(), "3");
setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode2, ServiceA.OTHER_SERVICE.getName(), "4");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4");
setProperty(serviceNode5, ServiceA.OTHER_SERVICE.getName(), "6");
setProperty(serviceNode7, ServiceC.REQ_SERVICE_1.getName(), "2");
setProperty(serviceNode7, ServiceC.REQ_SERVICE_2.getName(), "3");
provider.enableControllerServices(Arrays.asList(
serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5, serviceNode7));

View File

@ -50,7 +50,7 @@ public class TestControllerService implements ControllerService {
@Override
public String getIdentifier() {
return null;
return "id";
}
@Override

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.test.processors;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.Collections;
import java.util.List;
/**
* A processor with a property descriptor that attempts to modify the classpath, but the processor
* does not have @RequiresInstanceClassLoading.
*/
public class ModifiesClasspathNoAnnotationProcessor extends AbstractProcessor {
public static final PropertyDescriptor CLASSPATH_RESOURCE = new PropertyDescriptor.Builder()
.name("Classpath Resource")
.dynamicallyModifiesClasspath(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(CLASSPATH_RESOURCE);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.test.processors;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import java.util.List;
@RequiresInstanceClassLoading
public class ModifiesClasspathProcessor extends AbstractProcessor {
private List<PropertyDescriptor> properties;
public ModifiesClasspathProcessor() {
}
public ModifiesClasspathProcessor(List<PropertyDescriptor> properties) {
this.properties = properties;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.test.processors.ModifiesClasspathProcessor
org.apache.nifi.test.processors.ModifiesClasspathNoAnnotationProcessor

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
resource1

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
resource2

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
resource3

View File

@ -30,9 +30,12 @@
<logger name="org.apache.nifi" level="INFO"/>
<logger name="org.apache.nifi.controller.service.mock" level="ERROR"/>
<logger name="StandardProcessSession.claims" level="INFO" />
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>
<logger name="StandardProcessSession.claims" level="DEBUG" />
</configuration>

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.nar;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.authentication.LoginIdentityProvider;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.controller.ControllerService;
@ -27,15 +28,20 @@ import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs).
@ -52,6 +58,9 @@ public class ExtensionManager {
private static final Map<String, ClassLoader> extensionClassloaderLookup = new HashMap<>();
private static final Set<String> requiresInstanceClassLoading = new HashSet<>();
private static final Map<String, ClassLoader> instanceClassloaderLookup = new ConcurrentHashMap<>();
static {
definitionMap.put(Processor.class, new HashSet<>());
definitionMap.put(FlowFilePrioritizer.class, new HashSet<>());
@ -126,6 +135,12 @@ public class ExtensionManager {
if (registeredClassLoader == null) {
classloaderMap.put(className, classLoader);
classes.add(type);
// keep track of which classes require a class loader per component instance
if (type.isAnnotationPresent(RequiresInstanceClassLoading.class)) {
requiresInstanceClassLoading.add(className);
}
} else {
boolean loadedFromAncestor = false;
@ -158,6 +173,73 @@ public class ExtensionManager {
return extensionClassloaderLookup.get(classType);
}
/**
* Determines the effective ClassLoader for the instance of the given type.
*
* @param classType the type of class to lookup the ClassLoader for
* @param instanceIdentifier the identifier of the specific instance of the classType to look up the ClassLoader for
* @return the ClassLoader for the given instance of the given type, or null if the type is not a detected extension type
*/
public static ClassLoader getClassLoader(final String classType, final String instanceIdentifier) {
if (StringUtils.isEmpty(classType) || StringUtils.isEmpty(instanceIdentifier)) {
throw new IllegalArgumentException("Class Type and Instance Identifier must be provided");
}
// Check if we already have a ClassLoader for this instance
ClassLoader instanceClassLoader = instanceClassloaderLookup.get(instanceIdentifier);
// If we don't then we'll create a new ClassLoader for this instance and add it to the map for future lookups
if (instanceClassLoader == null) {
final ClassLoader registeredClassLoader = getClassLoader(classType);
if (registeredClassLoader == null) {
return null;
}
// If the class is annotated with @RequiresInstanceClassLoading and the registered ClassLoader is a URLClassLoader
// then make a new InstanceClassLoader that is a full copy of the NAR Class Loader, otherwise create an empty
// InstanceClassLoader that has the NAR ClassLoader as a parent
if (requiresInstanceClassLoading.contains(classType) && (registeredClassLoader instanceof URLClassLoader)) {
final URLClassLoader registeredUrlClassLoader = (URLClassLoader) registeredClassLoader;
instanceClassLoader = new InstanceClassLoader(instanceIdentifier, registeredUrlClassLoader.getURLs(), registeredUrlClassLoader.getParent());
} else {
instanceClassLoader = new InstanceClassLoader(instanceIdentifier, new URL[0], registeredClassLoader);
}
instanceClassloaderLookup.put(instanceIdentifier, instanceClassLoader);
}
return instanceClassLoader;
}
/**
* Removes the ClassLoader for the given instance and closes it if necessary.
*
* @param instanceIdentifier the identifier of a component to remove the ClassLoader for
* @return the removed ClassLoader for the given instance, or null if not found
*/
public static ClassLoader removeInstanceClassLoaderIfExists(final String instanceIdentifier) {
ClassLoader classLoader = instanceClassloaderLookup.remove(instanceIdentifier);
if (classLoader != null && (classLoader instanceof URLClassLoader)) {
final URLClassLoader urlClassLoader = (URLClassLoader) classLoader;
try {
urlClassLoader.close();
} catch (IOException e) {
logger.warn("Unable to class URLClassLoader for " + instanceIdentifier);
}
}
return classLoader;
}
/**
* Checks if the given class type requires per-instance class loading (i.e. contains the @RequiresInstanceClassLoading annotation)
*
* @param classType the class to check
* @return true if the class is found in the set of classes requiring instance level class loading, false otherwise
*/
public static boolean requiresInstanceClassLoading(final String classType) {
return requiresInstanceClassLoading.contains(classType);
}
public static Set<Class> getExtensions(final Class<?> definition) {
final Set<Class> extensions = definitionMap.get(definition);
return (extensions == null) ? Collections.<Class>emptySet() : extensions;

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.nar;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
/**
* A ClassLoader created for an instance of a component which lets a client add resources to an intermediary ClassLoader
* that will be checked first when loading/finding classes.
*
* Typically an instance of this ClassLoader will be created by passing in the URLs and parent from a NARClassLoader in
* order to create a copy of the NARClassLoader without modifying it.
*/
public class InstanceClassLoader extends URLClassLoader {
private static final Logger logger = LoggerFactory.getLogger(InstanceClassLoader.class);
private final String identifier;
private ShimClassLoader shimClassLoader;
/**
* @param identifier the id of the component this ClassLoader was created for
* @param urls the URLs for the ClassLoader
* @param parent the parent ClassLoader
*/
public InstanceClassLoader(final String identifier, final URL[] urls, final ClassLoader parent) {
super(urls, parent);
this.identifier = identifier;
}
/**
* Initializes a new ShimClassLoader for the provided resources, closing the previous ShimClassLoader if one existed.
*
* @param urls the URLs for the ShimClassLoader
* @throws IOException if the previous ShimClassLoader existed and couldn't be closed
*/
public synchronized void setInstanceResources(final URL[] urls) {
if (shimClassLoader != null) {
try {
shimClassLoader.close();
} catch (IOException e) {
logger.warn("Unable to close URLClassLoader for " + identifier);
}
}
// don't set a parent here b/c otherwise it will create an infinite loop
shimClassLoader = new ShimClassLoader(urls, null);
}
/**
* @return the URLs for the instance resources that have been set
*/
public synchronized URL[] getInstanceResources() {
if (shimClassLoader != null) {
return shimClassLoader.getURLs();
}
return new URL[0];
}
@Override
public Class<?> loadClass(String name) throws ClassNotFoundException {
return this.loadClass(name, false);
}
@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
Class<?> c = null;
// first try the shim
if (shimClassLoader != null) {
try {
c = shimClassLoader.loadClass(name, resolve);
} catch (ClassNotFoundException cnf) {
c = null;
}
}
// if it wasn't in the shim try our self
if (c == null) {
return super.loadClass(name, resolve);
} else {
return c;
}
}
@Override
protected Class<?> findClass(String name) throws ClassNotFoundException {
Class<?> c = null;
// first try the shim
if (shimClassLoader != null) {
try {
c = shimClassLoader.findClass(name);
} catch (ClassNotFoundException cnf) {
c = null;
}
}
// if it wasn't in the shim try our self
if (c == null) {
return super.findClass(name);
} else {
return c;
}
}
/**
* Extend URLClassLoader to increase visibility of protected methods so that InstanceClassLoader can delegate.
*/
private static class ShimClassLoader extends URLClassLoader {
public ShimClassLoader(URL[] urls, ClassLoader parent) {
super(urls, parent);
}
public ShimClassLoader(URL[] urls) {
super(urls);
}
@Override
public Class<?> findClass(String name) throws ClassNotFoundException {
return super.findClass(name);
}
@Override
public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
return super.loadClass(name, resolve);
}
}
}

View File

@ -35,18 +35,20 @@ public class NarCloseable implements Closeable {
}
/**
* Sets the current thread context class loader to the specific appropriate
* Nar class loader for the given configurable component. Restores to the
* previous classloader once complete. If the given class is not assignable
* from ConfigurableComponent then the NarThreadContextClassLoader is used.
* Sets the current thread context class loader to the specific appropriate class loader for the given
* component. If the component requires per-instance class loading then the class loader will be the
* specific class loader for instance with the given identifier, otherwise the class loader will be
* the NARClassLoader.
*
* @param componentClass componentClass
* @return NarCloseable with current thread context classloader jailed to
* the nar of the component
* @param componentClass the component class
* @param componentIdentifier the identifier of the component
* @return NarCloseable with the current thread context classloader jailed to the Nar
* or instance class loader of the component
*/
public static NarCloseable withComponentNarLoader(final Class componentClass) {
public static NarCloseable withComponentNarLoader(final Class componentClass, final String componentIdentifier) {
final ClassLoader current = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(componentClass.getClassLoader());
final ClassLoader instanceClassLoader = ExtensionManager.getClassLoader(componentClass.getName(), componentIdentifier);
Thread.currentThread().setContextClassLoader(instanceClassLoader);
return new NarCloseable(current);
}

View File

@ -1647,7 +1647,7 @@ public class ControllerFacade implements Authorizable {
final SearchContext context = new StandardSearchContext(searchStr, procNode, flowController, variableRegistry);
// search the processor using the appropriate thread context classloader
try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass())) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
final Collection<SearchResult> searchResults = searchable.search(context);
if (CollectionUtils.isNotEmpty(searchResults)) {
for (final SearchResult searchResult : searchResults) {

View File

@ -275,15 +275,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
controllerService.setComments(comments);
}
if (isNotNull(properties)) {
for (final Map.Entry<String, String> entry : properties.entrySet()) {
final String propName = entry.getKey();
final String propVal = entry.getValue();
if (isNotNull(propName) && propVal == null) {
controllerService.removeProperty(propName);
} else if (isNotNull(propName)) {
controllerService.setProperty(propName, propVal);
}
}
controllerService.setProperties(properties);
}
}

View File

@ -161,15 +161,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
processor.setLossTolerant(config.isLossTolerant());
}
if (isNotNull(configProperties)) {
for (final Map.Entry<String, String> entry : configProperties.entrySet()) {
final String propName = entry.getKey();
final String propVal = entry.getValue();
if (isNotNull(propName) && propVal == null) {
processor.removeProperty(propName);
} else if (isNotNull(propName)) {
processor.setProperty(propName, propVal);
}
}
processor.setProperties(configProperties);
}
if (isNotNull(undefinedRelationshipsToTerminate)) {

View File

@ -285,15 +285,7 @@ public class StandardReportingTaskDAO extends ComponentDAO implements ReportingT
reportingTask.setComments(comments);
}
if (isNotNull(properties)) {
for (final Map.Entry<String, String> entry : properties.entrySet()) {
final String propName = entry.getKey();
final String propVal = entry.getValue();
if (isNotNull(propName) && propVal == null) {
reportingTask.removeProperty(propName);
} else if (isNotNull(propName)) {
reportingTask.setProperty(propName, propVal);
}
}
reportingTask.setProperties(properties);
}
}

View File

@ -62,7 +62,10 @@ public class MonitorMemoryTest {
@Test(expected = IllegalStateException.class)
public void validatevalidationKicksInOnWrongPoolNames() throws Exception {
ReportingTaskNode reportingTask = fc.createReportingTask(MonitorMemory.class.getName());
reportingTask.setProperty(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "foo");
Map<String,String> props = new HashMap<>();
props.put(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "foo");
reportingTask.setProperties(props);
ProcessScheduler ps = fc.getProcessScheduler();
ps.schedule(reportingTask);
}
@ -91,9 +94,12 @@ public class MonitorMemoryTest {
CapturingLogger capturingLogger = this.wrapAndReturnCapturingLogger();
ReportingTaskNode reportingTask = fc.createReportingTask(MonitorMemory.class.getName());
reportingTask.setSchedulingPeriod("1 sec");
reportingTask.setProperty(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "PS Old Gen");
reportingTask.setProperty(MonitorMemory.REPORTING_INTERVAL.getName(), "100 millis");
reportingTask.setProperty(MonitorMemory.THRESHOLD_PROPERTY.getName(), threshold);
Map<String,String> props = new HashMap<>();
props.put(MonitorMemory.MEMORY_POOL_PROPERTY.getName(), "PS Old Gen");
props.put(MonitorMemory.REPORTING_INTERVAL.getName(), "100 millis");
props.put(MonitorMemory.THRESHOLD_PROPERTY.getName(), threshold);
reportingTask.setProperties(props);
ProcessScheduler ps = fc.getProcessScheduler();
ps.schedule(reportingTask);

View File

@ -67,6 +67,14 @@ public interface HBaseClientService extends ControllerService {
.defaultValue("1")
.build();
PropertyDescriptor PHOENIX_CLIENT_JAR_LOCATION = new PropertyDescriptor.Builder()
.name("Phoenix Client JAR Location")
.description("The full path to the Phoenix client JAR. Required if Phoenix is installed on top of HBase.")
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(true)
.dynamicallyModifiesClasspath(true)
.build();
/**
* Puts a batch of mutations to the given table.
*

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.hbase;
import java.io.File;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
@ -57,6 +57,7 @@ import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
@ -68,6 +69,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@RequiresInstanceClassLoading
@Tags({ "hbase", "client"})
@CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2. This service can be configured by providing " +
"a comma-separated list of configuration files, or by specifying values for the other properties. If configuration files " +
@ -109,6 +111,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
props.add(ZOOKEEPER_CLIENT_PORT);
props.add(ZOOKEEPER_ZNODE_PARENT);
props.add(HBASE_CLIENT_RETRIES);
props.add(PHOENIX_CLIENT_JAR_LOCATION);
this.properties = Collections.unmodifiableList(props);
}