NIFI-9382: Created a new ClassloaderIsolationKey mechanism by which H… (#5533)

* NIFI-9382: Created a new ClassloaderIsolationKey mechanism by which Hadoop related processors (and potentially others) can indicate that they need full classloaders to be cloned but can share with other instances in certain circumstances
- Added system tests

* NIFI-9382: Renamed interface based on review feedback

* NIFI-9382: Removed ReentrantKerberosUser.
This commit is contained in:
markap14 2021-11-22 14:55:19 -05:00 committed by GitHub
parent c033debdf3
commit 839fbf7d19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 657 additions and 182 deletions

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.components;
import org.apache.nifi.context.PropertyContext;
/**
* <p>
* There are times when a component must be created in such a way that each instance gets its own ClassLoader hierarchy,
* rather than sharing the ClassLoader with other components (see {@link org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading @RequiresInstanceClassLoading}).
* This, however, can be extremely expensive, as all of the classes must be loaded again for each instance of the component. When thousands of these
* components are used in a single flow, the startup time can be great, and it can lead to massive amounts of RAM being required.
* </p>
*
* <p>
* For components that do require instance ClassLoading that clones ancestor resources, this interface can be optional implemented by the component.
* If the interface is implemented, the component is given the opportunity to return a distinct "key" that can be used to identify instances that may share
* the same ClassLoader.
* </p>
*/
public interface ClassloaderIsolationKeyProvider {
/**
* <p>
* Determines the key that identifies a shared ClassLoader that this component may use. Any two instances of the same component that return
* the same key may be assigned the same base ClassLoader (though it is not guaranteed that this will be the case).
* </p>
*
* <p>
* If a subsequent call to this method returns a different value, the component will be recreated with a different ClassLoader.
* </p>
*
* <p>
* Implementation Note: for components that implement this interface, this method will be called often. Therefore, performance characteristics
* of the implementation are critical. The method is expected to return the value of a configured property or derive a value to return based off
* of the values of a few properties. Accessing a remote resource, is too expensive. If the necessary computation is non-trivial, then it should be
* performed out of band and the pre-computed value simply returned by this method.
* </p>
*
* @param context the PropertyContext that can be used for determining the key
* @return a distinct key that can be used to indicate which shared ClassLoader is allowed to be used
*/
String getClassloaderIsolationKey(PropertyContext context);
}

View File

@ -16,6 +16,11 @@
*/
package org.apache.nifi.util.file.classloader;
import org.apache.nifi.util.security.MessageDigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.DatatypeConverter;
import java.io.File;
import java.io.FilenameFilter;
import java.net.MalformedURLException;
@ -31,11 +36,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.xml.bind.DatatypeConverter;
import org.apache.nifi.util.security.MessageDigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClassLoaderUtils {
@ -143,11 +143,12 @@ public class ClassLoaderUtils {
* @param urls URLs used for generating fingerprint string
* @return Fingerprint string from provided URLs
*/
public static String generateAdditionalUrlsFingerprint(final Set<URL> urls) {
public static String generateAdditionalUrlsFingerprint(final Set<URL> urls, final String classloaderIsolationKey) {
final StringBuilder formattedUrls = new StringBuilder();
final List<String> sortedUrls = urls.stream().map(Object::toString).sorted().collect(Collectors.toList());
sortedUrls.forEach(url -> formattedUrls.append(url).append("-").append(getLastModified(url)).append(";"));
formattedUrls.append(classloaderIsolationKey);
final byte[] formattedUrlsBinary = formattedUrls.toString().getBytes(StandardCharsets.UTF_8);
return DatatypeConverter.printHexBinary(MessageDigestUtils.getDigest(formattedUrlsBinary));

View File

@ -118,7 +118,7 @@ public class TestClassLoaderUtils {
final Set<URL> urls = new HashSet<>();
URL testUrl = Paths.get("src/test/resources/TestClassLoaderUtils/TestSuccess.jar").toUri().toURL();
urls.add(testUrl);
String testFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(urls);
String testFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(urls, null);
assertNotNull(testFingerprint);
}

View File

@ -22,11 +22,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslPlainServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -56,7 +56,6 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -77,7 +76,7 @@ import java.util.regex.Pattern;
* @see SecurityUtil#loginKerberos(Configuration, String, String)
*/
@RequiresInstanceClassLoading(cloneAncestorResources = true)
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
public abstract class AbstractHadoopProcessor extends AbstractProcessor implements ClassloaderIsolationKeyProvider {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
private static final String DENY_LFS_ACCESS = "NIFI_HDFS_DENY_LOCAL_FILE_SYSTEM_ACCESS";
@ -204,6 +203,30 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
return properties;
}
@Override
public String getClassloaderIsolationKey(final PropertyContext context) {
final String explicitKerberosPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
if (explicitKerberosPrincipal != null) {
return explicitKerberosPrincipal;
}
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
if (credentialsService != null) {
final String credentialsServicePrincipal = credentialsService.getPrincipal();
if (credentialsServicePrincipal != null) {
return credentialsServicePrincipal;
}
}
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
if (kerberosUserService != null) {
final KerberosUser kerberosUser = kerberosUserService.createKerberosUser();
return kerberosUser.getPrincipal();
}
return null;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
@ -362,28 +385,6 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
}
}
}
final KerberosUser kerberosUser = resources.getKerberosUser();
if (kerberosUser != null) {
try {
kerberosUser.logout();
} catch (final Exception e) {
getLogger().warn("Error logging out KerberosUser: {}", e.getMessage(), e);
}
}
// Clean-up the static reference to the Configuration instance
UserGroupInformation.setConfiguration(new Configuration());
// Clean-up the reference to the InstanceClassLoader that was put into Configuration
final Configuration configuration = resources.getConfiguration();
if (configuration != null) {
configuration.setClassLoader(null);
}
// Need to remove the Provider instance from the JVM's Providers class so that InstanceClassLoader can be GC'd eventually
final SaslPlainServer.SecurityProvider saslProvider = new SaslPlainServer.SecurityProvider();
Security.removeProvider(saslProvider.getName());
}
// Clear out the reference to the resources
@ -395,14 +396,14 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
statsField.setAccessible(true);
final Object statsObj = statsField.get(fileSystem);
if (statsObj != null && statsObj instanceof FileSystem.Statistics) {
if (statsObj instanceof FileSystem.Statistics) {
final FileSystem.Statistics statistics = (FileSystem.Statistics) statsObj;
final Field statsThreadField = statistics.getClass().getDeclaredField("STATS_DATA_CLEANER");
statsThreadField.setAccessible(true);
final Object statsThreadObj = statsThreadField.get(statistics);
if (statsThreadObj != null && statsThreadObj instanceof Thread) {
if (statsThreadObj instanceof Thread) {
final Thread statsThread = (Thread) statsThreadObj;
try {
statsThread.interrupt();
@ -603,7 +604,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
public static String getPathDifference(final Path root, final Path child) {
final int depthDiff = child.depth() - root.depth();
if (depthDiff <= 1) {
return "".intern();
return "";
}
String lastRoot = root.getName();
Path childsParent = child.getParent();

View File

@ -946,10 +946,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public synchronized void reload(final Set<URL> additionalUrls) throws ProcessorInstantiationException {
if (isRunning()) {
throw new IllegalStateException("Cannot reload Processor while the Processor is running");
}
String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls);
final String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, determineClasloaderIsolationKey());
setAdditionalResourcesFingerprint(additionalResourcesFingerprint);
getReloadComponent().reload(this, getCanonicalClassName(), getBundleCoordinate(), additionalUrls);
}
@ -1097,8 +1094,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final Bundle bundle = extensionManager.getBundle(getBundleCoordinate());
final Set<URL> classpathUrls = getAdditionalClasspathResources(context.getProperties().keySet(), descriptor -> context.getProperty(descriptor).getValue());
final String classloaderIsolationKey = getClassLoaderIsolationKey(context);
final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
try (final InstanceClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(getComponentType(), getIdentifier(), bundle, classpathUrls, false)) {
try (final InstanceClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(getComponentType(), getIdentifier(), bundle, classpathUrls, false,
classloaderIsolationKey)) {
Thread.currentThread().setContextClassLoader(detectedClassLoader);
results.addAll(verifiable.verify(context, logger, attributes));
} finally {
@ -1617,8 +1617,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// Create a task to invoke the @OnScheduled annotation of the processor
final Callable<Void> startupTask = () -> {
final ProcessContext processContext = processContextFactory.get();
final ScheduledState currentScheduleState = scheduledState.get();
if (currentScheduleState == ScheduledState.STOPPING || currentScheduleState == ScheduledState.STOPPED || getDesiredState() == ScheduledState.STOPPED) {
LOG.debug("{} is stopped. Will not call @OnScheduled lifecycle methods or begin trigger onTrigger() method", StandardProcessorNode.this);
@ -1644,6 +1642,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// Now that the task has been scheduled, set the timeout
completionTimestampRef.set(System.currentTimeMillis() + timeoutMilis);
final ProcessContext processContext = processContextFactory.get();
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {
try {
hasActiveThreads = true;

View File

@ -347,7 +347,7 @@ public abstract class AbstractFlowManager implements FlowManager {
}
public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final boolean firstTimeAdded) {
return createProcessor(type, id, coordinate, Collections.emptySet(), firstTimeAdded, true);
return createProcessor(type, id, coordinate, Collections.emptySet(), firstTimeAdded, true, null);
}
public ReportingTaskNode createReportingTask(final String type, final BundleCoordinate bundleCoordinate) {
@ -360,7 +360,7 @@ public abstract class AbstractFlowManager implements FlowManager {
@Override
public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) {
return createReportingTask(type, id, bundleCoordinate, Collections.emptySet(), firstTimeAdded, true);
return createReportingTask(type, id, bundleCoordinate, Collections.emptySet(), firstTimeAdded, true, null);
}
public ReportingTaskNode getReportingTaskNode(final String taskId) {

View File

@ -171,10 +171,7 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im
@Override
public void reload(final Set<URL> additionalUrls) throws ReportingTaskInstantiationException {
if (isRunning()) {
throw new IllegalStateException("Cannot reload Reporting Task while Reporting Task is running");
}
String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls);
final String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, determineClasloaderIsolationKey());
setAdditionalResourcesFingerprint(additionalResourcesFingerprint);
getReloadComponent().reload(this, getCanonicalClassName(), getBundleCoordinate(), additionalUrls);
}
@ -363,8 +360,10 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im
final Bundle bundle = extensionManager.getBundle(getBundleCoordinate());
final Set<URL> classpathUrls = getAdditionalClasspathResources(context.getProperties().keySet(), descriptor -> context.getProperty(descriptor).getValue());
final String classloaderIsolationKey = getClassLoaderIsolationKey(context);
final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
try (final InstanceClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(getComponentType(), getIdentifier(), bundle, classpathUrls, false)) {
try (final InstanceClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(getComponentType(), getIdentifier(), bundle, classpathUrls, false,
classloaderIsolationKey)) {
Thread.currentThread().setContextClassLoader(detectedClassLoader);
results.addAll(verifiable.verify(context, logger));
} finally {

View File

@ -210,15 +210,21 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
@Override
public void reload(final Set<URL> additionalUrls) throws ControllerServiceInstantiationException {
synchronized (this.active) {
if (isActive()) {
throw new IllegalStateException("Cannot reload Controller Service while service is active");
}
String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls);
final String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, determineClasloaderIsolationKey());
setAdditionalResourcesFingerprint(additionalResourcesFingerprint);
getReloadComponent().reload(this, getCanonicalClassName(), getBundleCoordinate(), additionalUrls);
}
}
@Override
public void setProperties(final Map<String, String> properties, final boolean allowRemovalOfRequiredProperties) {
super.setProperties(properties, allowRemovalOfRequiredProperties);
// It's possible that changing the properties of this Controller Service could alter the Classloader Isolation Key of a referencing
// component so reload any referencing component as necessary.
getReferences().findRecursiveReferences(ComponentNode.class).forEach(ComponentNode::reloadAdditionalResourcesIfNecessary);
}
@Override
public ProcessGroup getProcessGroup() {
return processGroup;
@ -432,7 +438,9 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
final Set<URL> classpathUrls = getAdditionalClasspathResources(context.getProperties().keySet(), descriptor -> context.getProperty(descriptor).getValue());
final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
try (final InstanceClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(getComponentType(), getIdentifier(), bundle, classpathUrls, false)) {
final String classLoaderIsolationKey = getClassLoaderIsolationKey(context);
try (final InstanceClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(getComponentType(), getIdentifier(), bundle, classpathUrls, false,
classLoaderIsolationKey)) {
Thread.currentThread().setContextClassLoader(detectedClassLoader);
results.addAll(verifiable.verify(context, logger, variables));
} finally {

View File

@ -1510,6 +1510,7 @@ public final class StandardProcessGroup implements ProcessGroup {
} else if (state == ScheduledState.RUNNING) {
return CompletableFuture.completedFuture(null);
}
processor.reloadAdditionalResourcesIfNecessary();
return scheduler.startProcessor(processor, failIfStopping);
@ -4902,7 +4903,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final boolean firstTimeAdded = true;
final Set<URL> additionalUrls = Collections.emptySet();
final ControllerServiceNode newService = flowManager.createControllerService(type, id, coordinate, additionalUrls, firstTimeAdded, true);
final ControllerServiceNode newService = flowManager.createControllerService(type, id, coordinate, additionalUrls, firstTimeAdded, true, null);
newService.setVersionedComponentId(proposed.getIdentifier());
destination.addControllerService(newService);

View File

@ -22,6 +22,7 @@ import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.attribute.expression.language.VariableImpact;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.ConfigurableComponent;
@ -236,6 +237,9 @@ public abstract class AbstractComponentNode implements ComponentNode {
try {
verifyCanUpdateProperties(properties);
// Determine the Classloader Isolation Key, if applicable, so we can determine whether or not the key changes by setting properties.
final String initialIsolationKey = determineClasloaderIsolationKey();
final PropertyConfigurationMapper configurationMapper = new PropertyConfigurationMapper();
final Map<String, PropertyConfiguration> configurationMap = configurationMapper.mapRawPropertyValuesToPropertyConfiguration(this, properties);
@ -273,8 +277,12 @@ public abstract class AbstractComponentNode implements ComponentNode {
}
}
// Determine the updated Classloader Isolation Key, if applicable.
final String updatedIsolationKey = determineClasloaderIsolationKey();
final boolean classloaderIsolationKeyChanged = !Objects.equals(initialIsolationKey, updatedIsolationKey);
// if at least one property with dynamicallyModifiesClasspath(true) was set, then reload the component with the new urls
if (classpathChanged) {
if (classpathChanged || classloaderIsolationKeyChanged) {
logger.info("Updating classpath for " + this.componentType + " with the ID " + this.getIdentifier());
final Set<URL> additionalUrls = getAdditionalClasspathResources(getComponent().getPropertyDescriptors());
@ -297,6 +305,18 @@ public abstract class AbstractComponentNode implements ComponentNode {
}
}
protected String determineClasloaderIsolationKey() {
final ConfigurableComponent component = getComponent();
if (!(component instanceof ClassloaderIsolationKeyProvider)) {
return null;
}
final ValidationContext validationContext = getValidationContextFactory().newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier(),
getParameterContext(), true);
return getClassLoaderIsolationKey(validationContext);
}
public void verifyCanUpdateProperties(final Map<String, String> properties) {
verifyModifiable();
@ -620,7 +640,7 @@ public abstract class AbstractComponentNode implements ComponentNode {
final Set<PropertyDescriptor> descriptors = this.getProperties().keySet();
final Set<URL> additionalUrls = this.getAdditionalClasspathResources(descriptors);
final String newFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls);
final String newFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, determineClasloaderIsolationKey());
if(!StringUtils.equals(additionalResourcesFingerprint, newFingerprint)) {
setAdditionalResourcesFingerprint(newFingerprint);
try {

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.controller;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
@ -27,13 +26,16 @@ import org.apache.nifi.authorization.resource.ComponentAuthorizable;
import org.apache.nifi.authorization.resource.RestrictedComponentsAuthorizableFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.validation.ValidationState;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.registry.ComponentVariableRegistry;
@ -45,7 +47,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
public interface ComponentNode extends ComponentAuthorizable {
@Override
public String getIdentifier();
@ -284,4 +285,19 @@ public interface ComponentNode extends ComponentAuthorizable {
}
ParameterLookup getParameterLookup();
default String getClassLoaderIsolationKey(final PropertyContext context) {
final ConfigurableComponent component = getComponent();
if (!(component instanceof ClassloaderIsolationKeyProvider)) {
return null;
}
try {
return ((ClassloaderIsolationKeyProvider) component).getClassloaderIsolationKey(context);
} catch (final Exception e) {
getLogger().error("Failed to determine ClassLoader Isolation Key for " + this + ". This could result in unexpected behavior by this processor.", e);
return null;
}
}
}

View File

@ -225,10 +225,13 @@ public interface FlowManager {
* @param firstTimeAdded whether or not this is the first time this
* Processor is added to the graph. If {@code true}, will invoke methods
* annotated with the {@link org.apache.nifi.annotation.lifecycle.OnAdded} annotation.
* @param classloaderIsolationKey a classloader key that can be used in order to specify which shared class loader can be used as the instance class loader's parent, or <code>null</code> if the
* parent class loader should be shared or if cloning ancestors is not necessary
* @return new processor node
* @throws NullPointerException if either arg is null
*/
ProcessorNode createProcessor(String type, String id, BundleCoordinate coordinate, Set<URL> additionalUrls, boolean firstTimeAdded, boolean registerLogObserver);
ProcessorNode createProcessor(String type, String id, BundleCoordinate coordinate, Set<URL> additionalUrls, boolean firstTimeAdded, boolean registerLogObserver,
String classloaderIsolationKey);
@ -299,7 +302,7 @@ public interface FlowManager {
ReportingTaskNode createReportingTask(String type, String id, BundleCoordinate bundleCoordinate, boolean firstTimeAdded);
ReportingTaskNode createReportingTask(String type, String id, BundleCoordinate bundleCoordinate, Set<URL> additionalUrls, boolean firstTimeAdded, boolean register);
ReportingTaskNode createReportingTask(String type, String id, BundleCoordinate bundleCoordinate, Set<URL> additionalUrls, boolean firstTimeAdded, boolean register, String classloaderIsolationKey);
ReportingTaskNode getReportingTaskNode(String taskId);
@ -314,7 +317,7 @@ public interface FlowManager {
ControllerServiceNode getControllerServiceNode(String id);
ControllerServiceNode createControllerService(String type, String id, BundleCoordinate bundleCoordinate, Set<URL> additionalUrls, boolean firstTimeAdded,
boolean registerLogObserver);
boolean registerLogObserver, String classloaderIsolationKey);
Set<ControllerServiceNode> getRootControllerServices();

View File

@ -84,6 +84,7 @@ public class ExtensionBuilder {
private ReloadComponent reloadComponent;
private FlowController flowController;
private StateManagerProvider stateManagerProvider;
private String classloaderIsolationKey;
public ExtensionBuilder type(final String type) {
this.type = type;
@ -163,6 +164,11 @@ public class ExtensionBuilder {
return this;
}
public ExtensionBuilder classloaderIsolationKey(final String classloaderIsolationKey) {
this.classloaderIsolationKey = classloaderIsolationKey;
return this;
}
public ProcessorNode buildProcessor() {
if (identifier == null) {
throw new IllegalStateException("Processor ID must be specified");
@ -535,7 +541,8 @@ public class ExtensionBuilder {
throw new IllegalStateException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate());
}
final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, identifier, bundle, classpathUrls == null ? Collections.emptySet() : classpathUrls);
final ClassLoader detectedClassLoader = extensionManager.createInstanceClassLoader(type, identifier, bundle, classpathUrls == null ? Collections.emptySet() : classpathUrls, true,
classloaderIsolationKey);
final Class<?> rawClass = Class.forName(type, true, detectedClassLoader);
Thread.currentThread().setContextClassLoader(detectedClassLoader);

View File

@ -231,7 +231,7 @@ public class StandardFlowSnippet implements FlowSnippet {
for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
final ControllerServiceNode serviceNode = flowManager.createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(),
bundleCoordinate, Collections.emptySet(), true,true);
bundleCoordinate, Collections.emptySet(), true,true, null);
serviceNode.pauseValidationTrigger();
serviceNodes.add(serviceNode);
@ -403,6 +403,12 @@ public class StandardFlowSnippet implements FlowSnippet {
procNode.setAutoTerminatedRelationships(relationships);
}
// We need to add the processor to the ProcessGroup before calling ProcessorNode.setProperties. This will notify the FlowManager that the Processor
// has been added to the flow, which is important before calling ProcessorNode.setProperties, since #setProperties may call methods that result in looking
// up a Controller Service (such as #getClassloaderIsolationKey). The Processor must be registered with the FlowManager and its parent Process Group
// before that can happen, in order to ensure that it has access to any referenced Controller Service.
group.addProcessor(procNode);
if (config.getProperties() != null) {
procNode.setProperties(config.getProperties());
}
@ -411,8 +417,6 @@ public class StandardFlowSnippet implements FlowSnippet {
final StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(), flowController.getEncryptor(),
flowController.getStateManagerProvider().getStateManager(procNode.getProcessor().getIdentifier()), () -> false, flowController);
procNode.onConfigurationRestored(processContext);
group.addProcessor(procNode);
} finally {
procNode.resumeValidationTrigger();
}

View File

@ -20,8 +20,6 @@ import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.StandardConfigurationContext;
@ -51,8 +49,7 @@ public class StandardReloadComponent implements ReloadComponent {
@Override
public void reload(final ProcessorNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
throws ProcessorInstantiationException {
public void reload(final ProcessorNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls) {
if (existingNode == null) {
throw new IllegalStateException("Existing ProcessorNode cannot be null");
}
@ -70,21 +67,22 @@ public class StandardReloadComponent implements ReloadComponent {
// save the instance class loader to use it for calling OnRemoved on the existing processor
final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
// create a new node with firstTimeAdded as true so lifecycle methods get fired
// attempt the creation to make sure it works before firing the OnRemoved methods below
final ProcessorNode newNode = flowController.getFlowManager().createProcessor(newType, id, bundleCoordinate, additionalUrls, true, false);
final StateManager stateManager = flowController.getStateManagerProvider().getStateManager(id);
final StandardProcessContext processContext = new StandardProcessContext(existingNode, flowController.getControllerServiceProvider(),
flowController.getEncryptor(), stateManager, () -> false, flowController);
// call OnRemoved for the existing processor using the previous instance class loader
try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
final StateManager stateManager = flowController.getStateManagerProvider().getStateManager(id);
final StandardProcessContext processContext = new StandardProcessContext(existingNode, flowController.getControllerServiceProvider(),
flowController.getEncryptor(), stateManager, () -> false, flowController);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getProcessor(), processContext);
} finally {
extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
}
// create a new node with firstTimeAdded as true so lifecycle methods get fired
// attempt the creation to make sure it works before firing the OnRemoved methods below
final String classloaderIsolationKey = existingNode.getClassLoaderIsolationKey(processContext);
final ProcessorNode newNode = flowController.getFlowManager().createProcessor(newType, id, bundleCoordinate, additionalUrls, true, false, classloaderIsolationKey);
// set the new processor in the existing node
final ComponentLog componentLogger = new SimpleProcessLogger(id, newNode.getProcessor());
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);
@ -98,14 +96,13 @@ public class StandardReloadComponent implements ReloadComponent {
existingNode.refreshProperties();
// Notify the processor node that the configuration (properties, e.g.) has been restored
final StandardProcessContext processContext = new StandardProcessContext(existingNode, flowController.getControllerServiceProvider(), flowController.getEncryptor(),
flowController.getStateManagerProvider().getStateManager(existingNode.getProcessor().getIdentifier()), () -> false, flowController);
existingNode.onConfigurationRestored(processContext);
logger.debug("Triggering async validation of {} due to processor reload", existingNode);
flowController.getValidationTrigger().trigger(existingNode);
}
@Override
public void reload(final ControllerServiceNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
throws ControllerServiceInstantiationException {
@ -126,20 +123,20 @@ public class StandardReloadComponent implements ReloadComponent {
// save the instance class loader to use it for calling OnRemoved on the existing service
final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
// create a new node with firstTimeAdded as true so lifecycle methods get called
// attempt the creation to make sure it works before firing the OnRemoved methods below
final ControllerServiceNode newNode = flowController.getFlowManager().createControllerService(newType, id, bundleCoordinate, additionalUrls, true, false);
// call OnRemoved for the existing service using the previous instance class loader
final ConfigurationContext configurationContext = new StandardConfigurationContext(existingNode, flowController.getControllerServiceProvider(),
null, flowController.getVariableRegistry());
try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
final ConfigurationContext configurationContext = new StandardConfigurationContext(existingNode, flowController.getControllerServiceProvider(),
null, flowController.getVariableRegistry());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getControllerServiceImplementation(), configurationContext);
} finally {
extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
}
// create a new node with firstTimeAdded as true so lifecycle methods get called
// attempt the creation to make sure it works before firing the OnRemoved methods below
final String classloaderIsolationKey = existingNode.getClassLoaderIsolationKey(configurationContext);
final ControllerServiceNode newNode = flowController.getFlowManager().createControllerService(newType, id, bundleCoordinate, additionalUrls, true, false, classloaderIsolationKey);
// take the invocation handler that was created for new proxy and is set to look at the new node,
// and set it to look at the existing node
final ControllerServiceInvocationHandler invocationHandler = newNode.getInvocationHandler();
@ -165,8 +162,7 @@ public class StandardReloadComponent implements ReloadComponent {
}
@Override
public void reload(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls)
throws ReportingTaskInstantiationException {
public void reload(final ReportingTaskNode existingNode, final String newType, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls) {
if (existingNode == null) {
throw new IllegalStateException("Existing ReportingTaskNode cannot be null");
}
@ -184,17 +180,19 @@ public class StandardReloadComponent implements ReloadComponent {
// save the instance class loader to use it for calling OnRemoved on the existing processor
final ClassLoader existingInstanceClassLoader = extensionManager.getInstanceClassLoader(id);
// set firstTimeAdded to true so lifecycle annotations get fired, but don't register this node
// attempt the creation to make sure it works before firing the OnRemoved methods below
final ReportingTaskNode newNode = flowController.getFlowManager().createReportingTask(newType, id, bundleCoordinate, additionalUrls, true, false);
// call OnRemoved for the existing reporting task using the previous instance class loader
final ConfigurationContext configurationContext = existingNode.getConfigurationContext();
try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getReportingTask(), existingNode.getConfigurationContext());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, existingNode.getReportingTask(), configurationContext);
} finally {
extensionManager.closeURLClassLoader(id, existingInstanceClassLoader);
}
// set firstTimeAdded to true so lifecycle annotations get fired, but don't register this node
// attempt the creation to make sure it works before firing the OnRemoved methods below
final String classloaderIsolationKey = existingNode.getClassLoaderIsolationKey(configurationContext);
final ReportingTaskNode newNode = flowController.getFlowManager().createReportingTask(newType, id, bundleCoordinate, additionalUrls, true, false, classloaderIsolationKey);
// set the new reporting task into the existing node
final ComponentLog componentLogger = new SimpleProcessLogger(id, existingNode.getReportingTask());
final TerminationAwareLogger terminationAwareLogger = new TerminationAwareLogger(componentLogger);

View File

@ -306,8 +306,9 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
}
}
@Override
public ProcessorNode createProcessor(final String type, String id, final BundleCoordinate coordinate, final Set<URL> additionalUrls,
final boolean firstTimeAdded, final boolean registerLogObserver) {
final boolean firstTimeAdded, final boolean registerLogObserver, final String classloaderIsolationKey) {
// make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
@ -326,6 +327,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
.addClasspathUrls(additionalUrls)
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
.extensionManager(extensionManager)
.classloaderIsolationKey(classloaderIsolationKey)
.buildProcessor();
LogRepositoryFactory.getRepository(procNode.getIdentifier()).setLogger(procNode.getLogger());
@ -352,7 +354,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
}
public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls,
final boolean firstTimeAdded, final boolean register) {
final boolean firstTimeAdded, final boolean register, final String classloaderIsolationKey) {
requireNonNull(type);
requireNonNull(id);
requireNonNull(bundleCoordinate);
@ -375,6 +377,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
.flowController(flowController)
.extensionManager(extensionManager)
.classloaderIsolationKey(classloaderIsolationKey)
.buildReportingTask();
LogRepositoryFactory.getRepository(taskNode.getIdentifier()).setLogger(taskNode.getLogger());
@ -458,7 +461,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
}
public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded,
final boolean registerLogObserver) {
final boolean registerLogObserver, final String classloaderIsolationKey) {
// make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
final ExtensionManager extensionManager = flowController.getExtensionManager();
@ -478,6 +481,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
.stateManagerProvider(flowController.getStateManagerProvider())
.extensionManager(extensionManager)
.classloaderIsolationKey(classloaderIsolationKey)
.buildControllerService();
LogRepositoryFactory.getRepository(serviceNode.getIdentifier()).setLogger(serviceNode.getLogger());

View File

@ -16,21 +16,6 @@
*/
package org.apache.nifi.controller.service;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.FlowController;
@ -51,6 +36,22 @@ import org.w3c.dom.Element;
import org.xml.sax.SAXException;
import org.xml.sax.SAXParseException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.ParserConfigurationException;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
public class ControllerServiceLoader {
private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class);
@ -172,7 +173,7 @@ public class ControllerServiceLoader {
final UUID id = UUID.nameUUIDFromBytes(controllerService.getIdentifier().getBytes(StandardCharsets.UTF_8));
final ControllerServiceNode clone = flowController.getFlowManager().createControllerService(controllerService.getCanonicalClassName(), id.toString(),
controllerService.getBundleCoordinate(), Collections.emptySet(), false, true);
controllerService.getBundleCoordinate(), Collections.emptySet(), false, true, null);
clone.setName(controllerService.getName());
clone.setComments(controllerService.getComments());
@ -203,7 +204,7 @@ public class ControllerServiceLoader {
}
}
final ControllerServiceNode node = flowController.getFlowManager().createControllerService(dto.getType(), dto.getId(), coordinate, Collections.emptySet(), false, true);
final ControllerServiceNode node = flowController.getFlowManager().createControllerService(dto.getType(), dto.getId(), coordinate, Collections.emptySet(), false, true, null);
node.setName(dto.getName());
node.setComments(dto.getComments());
node.setVersionedComponentId(dto.getVersionedComponentId());

View File

@ -678,7 +678,7 @@ public class TestFlowController {
@Test
public void testCreateMissingControllerService() throws ProcessorInstantiationException {
final ControllerServiceNode serviceNode = controller.getFlowManager().createControllerService("org.apache.nifi.NonExistingControllerService", "1234-Controller-Service",
systemBundle.getBundleDetails().getCoordinate(), null, false, true);
systemBundle.getBundleDetails().getCoordinate(), null, false, true, null);
assertNotNull(serviceNode);
assertEquals("org.apache.nifi.NonExistingControllerService", serviceNode.getCanonicalClassName());
assertEquals("(Missing) NonExistingControllerService", serviceNode.getComponentType());
@ -738,7 +738,7 @@ public class TestFlowController {
ProcessGroup pg = controller.getFlowManager().createProcessGroup("my-process-group");
pg.setName("my-process-group");
ControllerServiceNode cs = controller.getFlowManager().createControllerService("org.apache.nifi.NonExistingControllerService", "my-controller-service",
systemBundle.getBundleDetails().getCoordinate(), null, false, true);
systemBundle.getBundleDetails().getCoordinate(), null, false, true, null);
pg.addControllerService(cs);
controller.getFlowManager().getRootGroup().addProcessGroup(pg);
controller.getFlowManager().getRootGroup().removeProcessGroup(pg);
@ -828,7 +828,7 @@ public class TestFlowController {
public void testReloadControllerService() {
final String id = "ServiceA" + System.currentTimeMillis();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ControllerServiceNode controllerServiceNode = controller.getFlowManager().createControllerService(ServiceA.class.getName(), id, coordinate, null, true, true);
final ControllerServiceNode controllerServiceNode = controller.getFlowManager().createControllerService(ServiceA.class.getName(), id, coordinate, null, true, true, null);
final String originalName = controllerServiceNode.getName();
assertEquals(id, controllerServiceNode.getIdentifier());
@ -863,7 +863,7 @@ public class TestFlowController {
final String id = "ServiceA" + System.currentTimeMillis();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ControllerServiceNode controllerServiceNode = controller.getFlowManager().createControllerService(ServiceA.class.getName(), id, coordinate, null, true, true);
final ControllerServiceNode controllerServiceNode = controller.getFlowManager().createControllerService(ServiceA.class.getName(), id, coordinate, null, true, true, null);
// the instance class loader shouldn't have any of the resources yet
URLClassLoader instanceClassLoader = extensionManager.getInstanceClassLoader(id);
@ -1125,7 +1125,7 @@ public class TestFlowController {
public void testInstantiateSnippetWhenControllerServiceMissingBundle() throws ProcessorInstantiationException {
final String id = UUID.randomUUID().toString();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ControllerServiceNode controllerServiceNode = controller.getFlowManager().createControllerService(ServiceA.class.getName(), id, coordinate, null, true, true);
final ControllerServiceNode controllerServiceNode = controller.getFlowManager().createControllerService(ServiceA.class.getName(), id, coordinate, null, true, true, null);
// create the controller service dto
final ControllerServiceDTO csDto = new ControllerServiceDTO();
@ -1156,7 +1156,7 @@ public class TestFlowController {
public void testInstantiateSnippetWithControllerService() throws ProcessorInstantiationException {
final String id = UUID.randomUUID().toString();
final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate();
final ControllerServiceNode controllerServiceNode = controller.getFlowManager().createControllerService(ServiceA.class.getName(), id, coordinate, null, true, true);
final ControllerServiceNode controllerServiceNode = controller.getFlowManager().createControllerService(ServiceA.class.getName(), id, coordinate, null, true, true, null);
// create the controller service dto
final ControllerServiceDTO csDto = new ControllerServiceDTO();

View File

@ -110,6 +110,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@ -213,7 +214,8 @@ public class TestStandardProcessScheduler {
serviceProvider.onControllerServiceAdded(serviceNode);
return serviceNode;
}
}).when(flowManager).createControllerService(anyString(), anyString(), any(BundleCoordinate.class), AdditionalMatchers.or(anySet(), isNull()), anyBoolean(), anyBoolean());
}).when(flowManager).createControllerService(anyString(), anyString(), any(BundleCoordinate.class),
AdditionalMatchers.or(anySet(), isNull()), anyBoolean(), anyBoolean(), nullable(String.class));
}
@After
@ -257,7 +259,7 @@ public class TestStandardProcessScheduler {
final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
final ControllerServiceNode service = flowManager.createControllerService(NoStartServiceImpl.class.getName(), "service",
systemBundle.getBundleDetails().getCoordinate(), null, true, true);
systemBundle.getBundleDetails().getCoordinate(), null, true, true, null);
rootGroup.addControllerService(service);
@ -340,7 +342,7 @@ public class TestStandardProcessScheduler {
final StandardProcessScheduler scheduler = createScheduler();
final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true, null);
serviceNode.performValidation();
@ -382,7 +384,7 @@ public class TestStandardProcessScheduler {
final StandardProcessScheduler scheduler = createScheduler();
final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true, null);
final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
final ExecutorService executor = Executors.newCachedThreadPool();
@ -418,7 +420,7 @@ public class TestStandardProcessScheduler {
public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception {
final StandardProcessScheduler scheduler = createScheduler();
final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true, null);
assertSame(ValidationStatus.VALID, serviceNode.performValidation());
@ -455,7 +457,7 @@ public class TestStandardProcessScheduler {
final StandardProcessScheduler scheduler = createScheduler();
final ControllerServiceNode serviceNode = flowManager.createControllerService(FailingService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true, null);
serviceNode.performValidation();
final Future<?> future = scheduler.enableControllerService(serviceNode);
@ -497,7 +499,7 @@ public class TestStandardProcessScheduler {
final ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 200; i++) {
final ControllerServiceNode serviceNode = flowManager.createControllerService(RandomShortDelayEnablingService.class.getName(), "1",
systemBundle.getBundleDetails().getCoordinate(), null, false, true);
systemBundle.getBundleDetails().getCoordinate(), null, false, true, nullable(String.class));
executor.execute(new Runnable() {
@Override
@ -538,7 +540,7 @@ public class TestStandardProcessScheduler {
final StandardProcessScheduler scheduler = createScheduler();
final ControllerServiceNode serviceNode = flowManager.createControllerService(LongEnablingService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
"1", systemBundle.getBundleDetails().getCoordinate(), null, false, true, null);
final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
ts.setLimit(Long.MAX_VALUE);

View File

@ -404,7 +404,7 @@ public class FrameworkIntegrationTest {
protected final ProcessorNode createProcessorNode(final String processorType, final ProcessGroup destination) {
final String uuid = getSimpleTypeName(processorType) + "-" + UUID.randomUUID().toString();
final BundleCoordinate bundleCoordinate = SystemBundle.SYSTEM_BUNDLE_COORDINATE;
final ProcessorNode procNode = flowController.getFlowManager().createProcessor(processorType, uuid, bundleCoordinate, Collections.emptySet(), true, true);
final ProcessorNode procNode = flowController.getFlowManager().createProcessor(processorType, uuid, bundleCoordinate, Collections.emptySet(), true, true, null);
destination.addProcessor(procNode);
return procNode;
@ -417,7 +417,7 @@ public class FrameworkIntegrationTest {
protected final ControllerServiceNode createControllerServiceNode(final String controllerServiceType) {
final String uuid = getSimpleTypeName(controllerServiceType) + "-" + UUID.randomUUID().toString();
final BundleCoordinate bundleCoordinate = SystemBundle.SYSTEM_BUNDLE_COORDINATE;
final ControllerServiceNode serviceNode = flowController.getFlowManager().createControllerService(controllerServiceType, uuid, bundleCoordinate, Collections.emptySet(), true, true);
final ControllerServiceNode serviceNode = flowController.getFlowManager().createControllerService(controllerServiceType, uuid, bundleCoordinate, Collections.emptySet(), true, true, null);
rootProcessGroup.addControllerService(serviceNode);
return serviceNode;
}

View File

@ -46,7 +46,7 @@ public interface ExtensionManager {
* @return the ClassLoader for the given instance of the given type, or null if the type is not a detected extension type
*/
default InstanceClassLoader createInstanceClassLoader(String classType, String instanceIdentifier, Bundle bundle, Set<URL> additionalUrls) {
return createInstanceClassLoader(classType, instanceIdentifier, bundle, additionalUrls, true);
return createInstanceClassLoader(classType, instanceIdentifier, bundle, additionalUrls, true, null);
}
/**
@ -57,9 +57,11 @@ public interface ExtensionManager {
* @param bundle the bundle where the classType exists
* @param additionalUrls additional URLs to add to the instance class loader
* @param registerClassLoader whether or not to register the class loader as the new classloader for the component with the given ID
* @param classloaderIsolationKey a classloader key that can be used in order to specify which shared class loader can be used as the instance class loader's parent, or <code>null</code> if the
* parent class loader should be shared or if cloning ancestors is not necessary
* @return the ClassLoader for the given instance of the given type, or null if the type is not a detected extension type
*/
InstanceClassLoader createInstanceClassLoader(String classType, String instanceIdentifier, Bundle bundle, Set<URL> additionalUrls, boolean registerClassLoader);
InstanceClassLoader createInstanceClassLoader(String classType, String instanceIdentifier, Bundle bundle, Set<URL> additionalUrls, boolean registerClassLoader, String classloaderIsolationKey);
/**
* Retrieves the InstanceClassLoader for the component with the given identifier.

View File

@ -20,6 +20,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@ -62,15 +63,26 @@ public class InstanceClassLoader extends AbstractNativeLibHandlingClassLoader {
final Set<URL> instanceUrls,
final Set<URL> additionalResourceUrls,
final Set<File> narNativeLibDirs,
final ClassLoader parent
) {
final ClassLoader parent) {
super(combineURLs(instanceUrls, additionalResourceUrls), parent, initNativeLibDirList(narNativeLibDirs, additionalResourceUrls), identifier);
this.identifier = identifier;
this.instanceType = type;
this.instanceUrls = Collections.unmodifiableSet(
instanceUrls == null ? Collections.emptySet() : new LinkedHashSet<>(instanceUrls));
this.additionalResourceUrls = Collections.unmodifiableSet(
additionalResourceUrls == null ? Collections.emptySet() : new LinkedHashSet<>(additionalResourceUrls));
this.instanceUrls = instanceUrls == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet<>(instanceUrls));
this.additionalResourceUrls = additionalResourceUrls == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet<>(additionalResourceUrls));
if (parent instanceof SharedInstanceClassLoader) {
((SharedInstanceClassLoader) parent).incrementReferenceCount();
}
}
@Override
public void close() throws IOException {
super.close();
final ClassLoader parent = getParent();
if (parent instanceof SharedInstanceClassLoader) {
((SharedInstanceClassLoader) parent).close();
}
}
/**

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.nar;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Set;
public class SharedInstanceClassLoader extends InstanceClassLoader {
private long referenceCount = 0L;
public SharedInstanceClassLoader(final String identifier, final String type, final Set<URL> instanceUrls, final Set<URL> additionalResourceUrls,
final Set<File> narNativeLibDirs, final ClassLoader parent) {
super(identifier, type, instanceUrls, additionalResourceUrls, narNativeLibDirs, parent);
}
@Override
public synchronized void close() throws IOException {
referenceCount--;
if (referenceCount <= 0) {
super.close();
}
}
public synchronized void incrementReferenceCount() {
referenceCount++;
}
}

View File

@ -23,6 +23,7 @@ import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.UserGroupProvider;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateProvider;
@ -61,8 +62,10 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
@ -85,6 +88,7 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
private final Map<String, ConfigurableComponent> tempComponentLookup = new HashMap<>();
private final Map<String, InstanceClassLoader> instanceClassloaderLookup = new ConcurrentHashMap<>();
private final ConcurrentMap<BaseClassLoaderKey, ClassLoader> sharedBaseClassloaders = new ConcurrentHashMap<>();
public StandardExtensionDiscoveringManager() {
this(Collections.emptyList());
@ -358,7 +362,8 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
}
@Override
public InstanceClassLoader createInstanceClassLoader(final String classType, final String instanceIdentifier, final Bundle bundle, final Set<URL> additionalUrls, final boolean register) {
public InstanceClassLoader createInstanceClassLoader(final String classType, final String instanceIdentifier, final Bundle bundle, final Set<URL> additionalUrls, final boolean register,
final String classloaderIsolationKey) {
if (StringUtils.isEmpty(classType)) {
throw new IllegalArgumentException("Class-Type is required");
}
@ -383,42 +388,80 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
final ConfigurableComponent tempComponent = getTempComponent(classType, bundle.getBundleDetails().getCoordinate());
final Class<?> type = tempComponent.getClass();
final RequiresInstanceClassLoading requiresInstanceClassLoading = type.getAnnotation(RequiresInstanceClassLoading.class);
final boolean allowsSharedClassloader = tempComponent instanceof ClassloaderIsolationKeyProvider;
if (allowsSharedClassloader && classloaderIsolationKey == null) {
instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, Collections.emptySet(), additionalUrls, bundleClassLoader);
} else {
final BaseClassLoaderKey baseClassLoaderKey = classloaderIsolationKey == null ? null : new BaseClassLoaderKey(bundle, classloaderIsolationKey);
final NarClassLoader narBundleClassLoader = (NarClassLoader) bundleClassLoader;
logger.debug("Including ClassLoader resources from {} for component {}", new Object[]{bundle.getBundleDetails(), instanceIdentifier});
final NarClassLoader narBundleClassLoader = (NarClassLoader) bundleClassLoader;
logger.debug("Including ClassLoader resources from {} for component {}", new Object[] {bundle.getBundleDetails(), instanceIdentifier});
final Set<URL> instanceUrls = new LinkedHashSet<>(Arrays.asList(narBundleClassLoader.getURLs()));
final Set<File> narNativeLibDirs = new LinkedHashSet<>();
narNativeLibDirs.add(narBundleClassLoader.getNARNativeLibDir());
final Set<URL> instanceUrls = new LinkedHashSet<>();
final Set<File> narNativeLibDirs = new LinkedHashSet<>();
ClassLoader ancestorClassLoader = narBundleClassLoader.getParent();
narNativeLibDirs.add(narBundleClassLoader.getNARNativeLibDir());
instanceUrls.addAll(Arrays.asList(narBundleClassLoader.getURLs()));
ClassLoader ancestorClassLoader = narBundleClassLoader.getParent();
if (requiresInstanceClassLoading.cloneAncestorResources()) {
final ConfigurableComponent component = getTempComponent(classType, bundle.getBundleDetails().getCoordinate());
final Set<BundleCoordinate> reachableApiBundles = findReachableApiBundles(component);
while (ancestorClassLoader instanceof NarClassLoader) {
final Bundle ancestorNarBundle = classLoaderBundleLookup.get(ancestorClassLoader);
// stop including ancestor resources when we reach one of the APIs, or when we hit the Jetty NAR
if (ancestorNarBundle == null || reachableApiBundles.contains(ancestorNarBundle.getBundleDetails().getCoordinate())
|| ancestorNarBundle.getBundleDetails().getCoordinate().getId().equals(NarClassLoaders.JETTY_NAR_ID)) {
break;
boolean resolvedSharedClassLoader = false;
final RequiresInstanceClassLoading requiresInstanceClassLoading = type.getAnnotation(RequiresInstanceClassLoading.class);
if (requiresInstanceClassLoading.cloneAncestorResources()) {
// Check to see if there's already a shared ClassLoader that can be used as the parent/base classloader
if (baseClassLoaderKey != null) {
final ClassLoader sharedBaseClassloader = sharedBaseClassloaders.get(baseClassLoaderKey);
if (sharedBaseClassloader != null) {
resolvedSharedClassLoader = true;
ancestorClassLoader = sharedBaseClassloader;
logger.debug("Creating InstanceClassLoader for type {} using shared Base ClassLoader {} for component {}", type, sharedBaseClassloader, instanceIdentifier);
}
}
final NarClassLoader ancestorNarClassLoader = (NarClassLoader) ancestorClassLoader;
// If we didn't find a shared ClassLoader to use, go ahead and clone the bundle's ClassLoader.
if (!resolvedSharedClassLoader) {
final ConfigurableComponent component = getTempComponent(classType, bundle.getBundleDetails().getCoordinate());
final Set<BundleCoordinate> reachableApiBundles = findReachableApiBundles(component);
narNativeLibDirs.add(ancestorNarClassLoader.getNARNativeLibDir());
Collections.addAll(instanceUrls, ancestorNarClassLoader.getURLs());
while (ancestorClassLoader instanceof NarClassLoader) {
final Bundle ancestorNarBundle = classLoaderBundleLookup.get(ancestorClassLoader);
ancestorClassLoader = ancestorNarClassLoader.getParent();
// stop including ancestor resources when we reach one of the APIs, or when we hit the Jetty NAR
if (ancestorNarBundle == null || reachableApiBundles.contains(ancestorNarBundle.getBundleDetails().getCoordinate())
|| ancestorNarBundle.getBundleDetails().getCoordinate().getId().equals(NarClassLoaders.JETTY_NAR_ID)) {
break;
}
final NarClassLoader ancestorNarClassLoader = (NarClassLoader) ancestorClassLoader;
narNativeLibDirs.add(ancestorNarClassLoader.getNARNativeLibDir());
Collections.addAll(instanceUrls, ancestorNarClassLoader.getURLs());
ancestorClassLoader = ancestorNarClassLoader.getParent();
}
}
}
// register our new InstanceClassLoader as the shared base classloader.
if (baseClassLoaderKey != null && !resolvedSharedClassLoader) {
// Created a shared class loader that is everything we need except for the additional URLs, as the additional URLs are instance-specific.
final SharedInstanceClassLoader sharedClassLoader = new SharedInstanceClassLoader(instanceIdentifier, classType, instanceUrls,
Collections.emptySet(), narNativeLibDirs, ancestorClassLoader);
instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, Collections.emptySet(), additionalUrls, Collections.emptySet(), sharedClassLoader);
logger.debug("Creating InstanceClassLoader for type {} using newly created shared Base ClassLoader {} for component {}", type, sharedClassLoader, instanceIdentifier);
if (logger.isTraceEnabled()) {
for (URL url : sharedClassLoader.getURLs()) {
logger.trace("Shared Base ClassLoader URL resource: {}", new Object[] {url.toExternalForm()});
}
}
sharedBaseClassloaders.putIfAbsent(baseClassLoaderKey, sharedClassLoader);
} else {
// If we resolved a shared classloader, the shared classloader already has the instance URLs, so there's no need to provide them for the Instance ClassLoader.
// But if we did not resolve to a shared ClassLoader, it's important to pull in the instanceUrls.
final Set<URL> resolvedInstanceUrls = resolvedSharedClassLoader ? Collections.emptySet() : instanceUrls;
instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, resolvedInstanceUrls, additionalUrls, narNativeLibDirs, ancestorClassLoader);
}
}
instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, instanceUrls, additionalUrls, narNativeLibDirs, ancestorClassLoader);
} else {
instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, Collections.emptySet(), additionalUrls, bundleClassLoader);
}
@ -436,6 +479,7 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
return instanceClassLoader;
}
/**
* Find the bundle coordinates for any service APIs that are referenced by this component and not part of the same bundle.
*
@ -659,4 +703,37 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering
}
}
private static class BaseClassLoaderKey {
private final Bundle bundle;
private final String classloaderIsolationKey;
public BaseClassLoaderKey(final Bundle bundle, final String classloaderIsolationKey) {
this.bundle = Objects.requireNonNull(bundle);
this.classloaderIsolationKey = Objects.requireNonNull(classloaderIsolationKey);
}
public Bundle getBundle() {
return bundle;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final BaseClassLoaderKey that = (BaseClassLoaderKey) o;
return bundle.equals(that.bundle) && classloaderIsolationKey.equals(that.classloaderIsolationKey);
}
@Override
public int hashCode() {
return Objects.hash(bundle, classloaderIsolationKey);
}
}
}

View File

@ -92,7 +92,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
final FlowManager flowManager = flowController.getFlowManager();
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
final ControllerServiceNode controllerService = flowManager.createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate,
Collections.emptySet(), true, true);
Collections.emptySet(), true, true, null);
// ensure we can perform the update
verifyUpdate(controllerService, controllerServiceDTO);

View File

@ -271,7 +271,9 @@ public class PutHDFS extends AbstractHadoopProcessor {
@OnStopped
public void onStopped() {
aclCache.invalidateAll();
if (aclCache != null) { // aclCache may be null if the parent class's @OnScheduled method failed
aclCache.invalidateAll();
}
}
@Override

View File

@ -279,11 +279,12 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
return new KerberosKeytabUser(principal, keytab) {
@Override
public synchronized void login() {
if (!isLoggedIn()) {
super.login();
createKuduClient(context);
if (isLoggedIn()) {
return;
}
super.login();
createKuduClient(context);
}
};
}
@ -292,11 +293,12 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
return new KerberosPasswordUser(principal, password) {
@Override
public synchronized void login() {
if (!isLoggedIn()) {
super.login();
createKuduClient(context);
if (isLoggedIn()) {
return;
}
super.login();
createKuduClient(context);
}
};
}

View File

@ -384,7 +384,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
private ReportingTaskNode createReportingTask(final ReportingTaskDefinition taskDefinition) {
final BundleCoordinate bundleCoordinate = determineBundleCoordinate(taskDefinition, "Reporting Task");
final ReportingTaskNode taskNode = flowManager.createReportingTask(taskDefinition.getType(), UUID.randomUUID().toString(), bundleCoordinate, Collections.emptySet(), true, true);
final ReportingTaskNode taskNode = flowManager.createReportingTask(taskDefinition.getType(), UUID.randomUUID().toString(), bundleCoordinate, Collections.emptySet(), true, true, null);
final Map<String, String> properties = resolveProperties(taskDefinition.getPropertyValues(), taskNode.getComponent(), taskNode.getProperties().keySet());
taskNode.setProperties(properties);

View File

@ -155,7 +155,7 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan
@Override
public ProcessorNode createProcessor(final String type, final String id, final BundleCoordinate coordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded,
final boolean registerLogObserver) {
final boolean registerLogObserver, final String classloaderIsolationKey) {
logger.debug("Creating Processor of type {} with id {}", type, id);
// make sure the first reference to LogRepository happens outside of a NarCloseable so that we use the framework's ClassLoader
@ -266,7 +266,7 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan
@Override
public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded,
final boolean register) {
final boolean register, final String classloaderIsolationKey) {
if (type == null || id == null || bundleCoordinate == null) {
throw new NullPointerException("Must supply type, id, and bundle coordinate in order to create Reporting Task. Provided arguments were type=" + type + ", id=" + id
@ -333,7 +333,7 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls,
final boolean firstTimeAdded, final boolean registerLogObserver) {
final boolean firstTimeAdded, final boolean registerLogObserver, final String classloaderIsolationKey) {
logger.debug("Creating Controller Service of type {} with id {}", type, id);
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);

View File

@ -77,7 +77,7 @@ public class StatelessReloadComponent implements ReloadComponent {
// create a new node with firstTimeAdded as true so lifecycle methods get fired
// attempt the creation to make sure it works before firing the OnRemoved methods below
final ProcessorNode newNode = statelessEngine.getFlowManager().createProcessor(
newType, id, bundleCoordinate, additionalUrls, true, false);
newType, id, bundleCoordinate, additionalUrls, true, false, null);
// call OnRemoved for the existing processor using the previous instance class loader
try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
@ -133,7 +133,7 @@ public class StatelessReloadComponent implements ReloadComponent {
// create a new node with firstTimeAdded as true so lifecycle methods get called
// attempt the creation to make sure it works before firing the OnRemoved methods below
final ControllerServiceNode newNode = statelessEngine.getFlowManager().createControllerService(newType, id, bundleCoordinate, additionalUrls, true, false);
final ControllerServiceNode newNode = statelessEngine.getFlowManager().createControllerService(newType, id, bundleCoordinate, additionalUrls, true, false, null);
// call OnRemoved for the existing service using the previous instance class loader
try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {
@ -189,7 +189,7 @@ public class StatelessReloadComponent implements ReloadComponent {
// set firstTimeAdded to true so lifecycle annotations get fired, but don't register this node
// attempt the creation to make sure it works before firing the OnRemoved methods below
final ReportingTaskNode newNode = statelessEngine.getFlowManager().createReportingTask(newType, id, bundleCoordinate, additionalUrls, true, false);
final ReportingTaskNode newNode = statelessEngine.getFlowManager().createReportingTask(newType, id, bundleCoordinate, additionalUrls, true, false, null);
// call OnRemoved for the existing reporting task using the previous instance class loader
try (final NarCloseable x = NarCloseable.withComponentNarLoader(existingInstanceClassLoader)) {

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
@RequiresInstanceClassLoading(cloneAncestorResources = true)
public class WriteFlowFileCountToFile extends AbstractProcessor implements ClassloaderIsolationKeyProvider {
private static final AtomicLong counter = new AtomicLong(0L);
static final PropertyDescriptor ISOLATION_KEY = new Builder()
.name("Isolation Key")
.displayName("Isolation Key")
.description("The key to use as the ClassLoader Isolation Key")
.required(true)
.addValidator(NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(NONE)
.build();
static final PropertyDescriptor FILE_TO_WRITE = new Builder()
.name("File to Write")
.displayName("File to Write")
.description("File to write the counts to")
.required(true)
.addValidator(NON_EMPTY_VALIDATOR)
.defaultValue("counts.txt")
.build();
private final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(ISOLATION_KEY, FILE_TO_WRITE);
}
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
public String getClassloaderIsolationKey(final PropertyContext context) {
return context.getProperty(ISOLATION_KEY).getValue();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long counterValue = counter.incrementAndGet();
final byte[] fileContents = String.valueOf(counterValue).getBytes(StandardCharsets.UTF_8);
final File file = new File(context.getProperty(FILE_TO_WRITE).getValue());
try (final OutputStream fos = new FileOutputStream(file)) {
fos.write(fileContents);
} catch (Exception e) {
throw new ProcessException(e);
}
session.transfer(flowFile, REL_SUCCESS);
}
}

View File

@ -40,4 +40,5 @@ org.apache.nifi.processors.tests.system.TransferBatch
org.apache.nifi.processors.tests.system.ThrowProcessException
org.apache.nifi.processors.tests.system.ValidateFileExists
org.apache.nifi.processors.tests.system.VerifyContents
org.apache.nifi.processors.tests.system.WriteFlowFileCountToFile
org.apache.nifi.processors.tests.system.WriteToFile

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.classloaders;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collections;
public class ClassloaderIsolationKeyIT extends NiFiSystemIT {
@Test
public void testClassloaderChanges() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity firstCounter = getClientUtil().createProcessor("WriteFlowFileCountToFile");
final ProcessorEntity secondCounter = getClientUtil().createProcessor("WriteFlowFileCountToFile");
getClientUtil().createConnection(generate, firstCounter, "success");
getClientUtil().createConnection(firstCounter, secondCounter, "success");
getClientUtil().setAutoTerminatedRelationships(secondCounter, "success");
getClientUtil().updateProcessorProperties(firstCounter, Collections.singletonMap("File to Write", "count1.txt"));
getClientUtil().updateProcessorProperties(secondCounter, Collections.singletonMap("File to Write", "count2.txt"));
getClientUtil().updateProcessorProperties(firstCounter, Collections.singletonMap("Isolation Key", "key-1"));
getClientUtil().updateProcessorProperties(secondCounter, Collections.singletonMap("Isolation Key", "key-1"));
getClientUtil().startProcessGroupComponents("root");
final File nifiHome = getNiFiInstance().getInstanceDirectory();
final File firstCountFile = new File(nifiHome, "count1.txt");
final File secondCountFile = new File(nifiHome, "count2.txt");
waitForCount(firstCountFile, 1);
waitForCount(secondCountFile, 2);
// Stop processors and change Isolation Key for the first processor. This should result in a new classloader which will result in
// the processor having a new statically defined AtomicLong. This means we'll now get a new count of 1, while count2.txt should increment again to a value of 3.
getClientUtil().stopProcessGroupComponents("root");
getClientUtil().updateProcessorProperties(firstCounter, Collections.singletonMap("Isolation Key", "key-2"));
getClientUtil().startProcessGroupComponents("root");
waitForCount(firstCountFile, 1);
waitForCount(secondCountFile, 3);
// Change key back to key-1, which should result in going back to the original statically defined AtomicLong.
getClientUtil().stopProcessGroupComponents("root");
getClientUtil().updateProcessorProperties(firstCounter, Collections.singletonMap("Isolation Key", "key-1"));
getClientUtil().startProcessGroupComponents("root");
waitForCount(firstCountFile, 4);
waitForCount(secondCountFile, 5);
// Change both processors to a new Isolation Key and restart. This should result in both processors still having the same ClassLoader (and therefore the same
// AtomicLong after restart).
getClientUtil().stopProcessGroupComponents("root");
getClientUtil().updateProcessorProperties(firstCounter, Collections.singletonMap("Isolation Key", "other"));
getClientUtil().updateProcessorProperties(secondCounter, Collections.singletonMap("Isolation Key", "other"));
getNiFiInstance().stop();
getNiFiInstance().start(true);
getClientUtil().startProcessGroupComponents("root");
waitForCount(firstCountFile, 1);
waitForCount(secondCountFile, 2);
}
private void waitForCount(final File file, final long expectedValue) throws InterruptedException {
waitFor(() -> {
try {
return getCount(file) == expectedValue;
} catch (IOException e) {
return false;
}
});
}
private long getCount(final File file) throws IOException {
final byte[] fileContents = Files.readAllBytes(file.toPath());
return Long.parseLong(new String(fileContents, StandardCharsets.UTF_8));
}
}