From 97198e35a04c12e66684d9545ff24156d16c60f6 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 8 Dec 2021 11:14:23 -0500 Subject: [PATCH] NIFI-9382: This closes #5584. Added system test that replicates issue in which a closed shared classloader causes issues when used again NIFI-9382: Fixed issue with SharedInstanceClassLoader where the classloader may get closed but then get used again. When the SharedInstanceClassLoader is closed, we will now ensure that we don't use anymore and instead create a new one. Signed-off-by: Joe Witt --- .../hadoop/AbstractHadoopProcessor.java | 2 +- .../StandardControllerServiceNode.java | 5 +- .../apache/nifi/nar/InstanceClassLoader.java | 4 -- .../nifi/nar/NarThreadContextClassLoader.java | 1 + .../nifi/nar/SharedInstanceClassLoader.java | 9 ++- .../StandardExtensionDiscoveringManager.java | 8 ++- .../system/WriteFlowFileCountToFile.java | 30 +++++++++- .../ClassloaderIsolationKeyIT.java | 55 +++++++++++++++++++ .../conf/clustered/node1/logback.xml | 41 ++++++++++++-- .../conf/clustered/node2/logback.xml | 45 +++++++++++---- .../test/resources/conf/default/logback.xml | 43 +++++++++++++-- 11 files changed, 208 insertions(+), 35 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 222effd25f..7c574a72f8 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -205,7 +205,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor implemen @Override public String getClassloaderIsolationKey(final PropertyContext context) { - final String explicitKerberosPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + final String explicitKerberosPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); if (explicitKerberosPrincipal != null) { return explicitKerberosPrincipal; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index bdae2b11b0..8593201423 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -338,8 +338,9 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme @Override public void verifyCanEnable() { - if (getState() != ControllerServiceState.DISABLED) { - throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled"); + final ControllerServiceState state = getState(); + if (state != ControllerServiceState.DISABLED) { + throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled - it has a state of " + state); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java index e7153d7438..b024bb523b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java @@ -69,10 +69,6 @@ public class InstanceClassLoader extends AbstractNativeLibHandlingClassLoader { this.instanceType = type; this.instanceUrls = instanceUrls == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet<>(instanceUrls)); this.additionalResourceUrls = additionalResourceUrls == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet<>(additionalResourceUrls)); - - if (parent instanceof SharedInstanceClassLoader) { - ((SharedInstanceClassLoader) parent).incrementReferenceCount(); - } } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java index 949d207342..915afa5288 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java @@ -266,6 +266,7 @@ public class NarThreadContextClassLoader extends URLClassLoader { private static ClassLoader createClassLoader(final String implementationClassName, final String instanceId, final Bundle bundle, final ExtensionManager extensionManager) throws ClassNotFoundException { + final ClassLoader bundleClassLoader = bundle.getClassLoader(); final Class rawClass = Class.forName(implementationClassName, true, bundleClassLoader); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/SharedInstanceClassLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/SharedInstanceClassLoader.java index 5aabef6ac2..5ec1327e93 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/SharedInstanceClassLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/SharedInstanceClassLoader.java @@ -24,6 +24,7 @@ import java.util.Set; public class SharedInstanceClassLoader extends InstanceClassLoader { private long referenceCount = 0L; + private boolean closed = false; public SharedInstanceClassLoader(final String identifier, final String type, final Set instanceUrls, final Set additionalResourceUrls, final Set narNativeLibDirs, final ClassLoader parent) { @@ -35,11 +36,17 @@ public class SharedInstanceClassLoader extends InstanceClassLoader { referenceCount--; if (referenceCount <= 0) { + closed = true; super.close(); } } - public synchronized void incrementReferenceCount() { + public synchronized boolean incrementReferenceCount() { + if (closed) { + return false; + } + referenceCount++; + return true; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java index baa29b6411..f8e1806abf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java @@ -88,7 +88,7 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering private final Map tempComponentLookup = new HashMap<>(); private final Map instanceClassloaderLookup = new ConcurrentHashMap<>(); - private final ConcurrentMap sharedBaseClassloaders = new ConcurrentHashMap<>(); + private final ConcurrentMap sharedBaseClassloaders = new ConcurrentHashMap<>(); public StandardExtensionDiscoveringManager() { this(Collections.emptyList()); @@ -407,8 +407,8 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering if (requiresInstanceClassLoading.cloneAncestorResources()) { // Check to see if there's already a shared ClassLoader that can be used as the parent/base classloader if (baseClassLoaderKey != null) { - final ClassLoader sharedBaseClassloader = sharedBaseClassloaders.get(baseClassLoaderKey); - if (sharedBaseClassloader != null) { + final SharedInstanceClassLoader sharedBaseClassloader = sharedBaseClassloaders.get(baseClassLoaderKey); + if (sharedBaseClassloader != null && sharedBaseClassloader.incrementReferenceCount()) { resolvedSharedClassLoader = true; ancestorClassLoader = sharedBaseClassloader; logger.debug("Creating InstanceClassLoader for type {} using shared Base ClassLoader {} for component {}", type, sharedBaseClassloader, instanceIdentifier); @@ -444,6 +444,8 @@ public class StandardExtensionDiscoveringManager implements ExtensionDiscovering // Created a shared class loader that is everything we need except for the additional URLs, as the additional URLs are instance-specific. final SharedInstanceClassLoader sharedClassLoader = new SharedInstanceClassLoader(instanceIdentifier, classType, instanceUrls, Collections.emptySet(), narNativeLibDirs, ancestorClassLoader); + sharedClassLoader.incrementReferenceCount(); + instanceClassLoader = new InstanceClassLoader(instanceIdentifier, classType, Collections.emptySet(), additionalUrls, Collections.emptySet(), sharedClassLoader); logger.debug("Creating InstanceClassLoader for type {} using newly created shared Base ClassLoader {} for component {}", type, sharedClassLoader, instanceIdentifier); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteFlowFileCountToFile.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteFlowFileCountToFile.java index 7e78f0a0e0..ef6eb75016 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteFlowFileCountToFile.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/WriteFlowFileCountToFile.java @@ -34,7 +34,7 @@ import java.io.FileOutputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -64,18 +64,31 @@ public class WriteFlowFileCountToFile extends AbstractProcessor implements Class .defaultValue("counts.txt") .build(); + static final PropertyDescriptor CLASS_TO_CREATE = new Builder() + .name("Class to Create") + .displayName("Class to Create") + .description("If specified, each iteration of #onTrigger will create an instance of this class in order to test ClassLoader behavior. If unable to create the object, the FlowFile will be " + + "routed to failure") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + private final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .build(); + private final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .autoTerminateDefault(true) + .build(); @Override protected List getSupportedPropertyDescriptors() { - return Arrays.asList(ISOLATION_KEY, FILE_TO_WRITE); + return Arrays.asList(ISOLATION_KEY, FILE_TO_WRITE, CLASS_TO_CREATE); } @Override public Set getRelationships() { - return Collections.singleton(REL_SUCCESS); + return new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)); } @Override @@ -90,6 +103,17 @@ public class WriteFlowFileCountToFile extends AbstractProcessor implements Class return; } + final String className = context.getProperty(CLASS_TO_CREATE).getValue(); + if (className != null) { + try { + Class.forName(className, true, Thread.currentThread().getContextClassLoader()); + } catch (final ClassNotFoundException e) { + getLogger().error("Failed to load class {} for {}; routing to failure", className, flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + final long counterValue = counter.incrementAndGet(); final byte[] fileContents = String.valueOf(counterValue).getBytes(StandardCharsets.UTF_8); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/classloaders/ClassloaderIsolationKeyIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/classloaders/ClassloaderIsolationKeyIT.java index 0665ae85f1..4c5fb822e5 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/classloaders/ClassloaderIsolationKeyIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/classloaders/ClassloaderIsolationKeyIT.java @@ -19,6 +19,7 @@ package org.apache.nifi.tests.system.classloaders; import org.apache.nifi.tests.system.NiFiSystemIT; import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.junit.Test; @@ -28,8 +29,62 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Collections; +import static org.junit.Assert.assertEquals; + public class ClassloaderIsolationKeyIT extends NiFiSystemIT { + /** + * After creating 1+ processors with the same ClassLoader Isolation Key, and then removing them, + * the SharedInstanceClassLoader will be closed. If we then create a new processor with the same + * ClassLoader Isolation Key, we need to ensure that we are then able to load classes from the ClassLoader + * that were not loaded previously. + */ + @Test + public void testRemoveAllInstancesThenCreateForSameIsolationKeyAllowsClassLoading() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + final ProcessorEntity counter = getClientUtil().createProcessor("WriteFlowFileCountToFile"); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + + getClientUtil().updateProcessorProperties(counter, Collections.singletonMap("File to Write", "count1.txt")); + getClientUtil().updateProcessorProperties(counter, Collections.singletonMap("Isolation Key", "abc123")); + + getClientUtil().createConnection(generate, counter, "success"); + final ConnectionEntity counterToTerminate = getClientUtil().createConnection(counter, terminate, "success"); + + getClientUtil().waitForValidProcessor(counter.getId()); + + getClientUtil().startProcessor(generate); + getClientUtil().startProcessor(counter); + + waitForQueueCount(counterToTerminate.getId(), 1); + + // Stop components, purge FlowFiles, delete all components + destroyFlow(); + + final ProcessorEntity newGenerate = getClientUtil().createProcessor("GenerateFlowFile"); + final ProcessorEntity newCounter = getClientUtil().createProcessor("WriteFlowFileCountToFile"); + final ProcessorEntity terminateSuccess = getClientUtil().createProcessor("TerminateFlowFile"); + final ProcessorEntity terminateFailure = getClientUtil().createProcessor("TerminateFlowFile"); + + final ConnectionEntity generateToCounter = getClientUtil().createConnection(newGenerate, newCounter, "success"); + final ConnectionEntity counterSuccess = getClientUtil().createConnection(newCounter, terminateSuccess, "success"); + final ConnectionEntity counterFailure = getClientUtil().createConnection(newCounter, terminateFailure, "failure"); + + getClientUtil().updateProcessorProperties(newCounter, Collections.singletonMap("Class to Create", "org.apache.nifi.processors.tests.system.CountEvents")); + getClientUtil().updateProcessorProperties(newCounter, Collections.singletonMap("File to Write", "count1.txt")); + getClientUtil().updateProcessorProperties(newCounter, Collections.singletonMap("Isolation Key", "abc123")); + + getClientUtil().waitForValidProcessor(newCounter.getId()); + + getClientUtil().startProcessor(newGenerate); + getClientUtil().startProcessor(newCounter); + + waitForQueueCount(generateToCounter.getId(), 0); + assertEquals(0, getConnectionQueueSize(counterFailure.getId())); + assertEquals(1, getConnectionQueueSize(counterSuccess.getId())); + } + + @Test public void testClassloaderChanges() throws NiFiClientException, IOException, InterruptedException { final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml index 6c3f0bbb1e..c9bc0330f5 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml +++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml @@ -57,6 +57,17 @@ + + ${org.apache.nifi.bootstrap.config.log.dir}/nifi-request.log + + ${org.apache.nifi.bootstrap.config.log.dir}/nifi-request_%d.log + 30 + + + %msg%n + + + ${org.apache.nifi.bootstrap.config.log.dir}/nifi-bootstrap.log @@ -82,21 +93,22 @@ + - - + + @@ -128,6 +140,17 @@ + + + + + + + + + + + + + + + - - - - - - + + @@ -132,6 +140,17 @@ + + + + + + + + + + + + + + - + - + + + @@ -127,6 +140,17 @@ + + + + + + + + + + + + + +