mirror of https://github.com/apache/nifi.git
NIFI-9576: Introduced a BlockListClassLoader that can be used by stateless in order to isolate both the Stateless Engine and the NiFi extensions from extraneous classes that exist in the System ClassLoader
This closes #5705. Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
parent
5022a5ee07
commit
1617406041
|
@ -23,5 +23,6 @@ import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
|
|||
import java.io.IOException;
|
||||
|
||||
public interface StatelessDataflowFactory<T> {
|
||||
StatelessDataflow createDataflow(StatelessEngineConfiguration statelessEngineConfiguration, DataflowDefinition<T> dataflowDefinition) throws IOException, StatelessConfigurationException;
|
||||
StatelessDataflow createDataflow(StatelessEngineConfiguration statelessEngineConfiguration, DataflowDefinition<T> dataflowDefinition, ClassLoader extensionClassLoader)
|
||||
throws IOException, StatelessConfigurationException;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.stateless.bootstrap;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* A ClassLoader that blocks a specific set of selected classes from being loaded by its parent. This ClassLoader does not load any classes itself
|
||||
* but serves as a mechanism for preventing unwanted classes from a parent from being used.
|
||||
* </p>
|
||||
* <p>
|
||||
* Because Stateless NiFi is designed to run in an embedded environment, the classes that are available on the "System ClassLoader" (aka App ClassLoader)
|
||||
* cannot be determined - or prevented/controlled. However, if there are conflicts between what is in the System ClassLoader and an extension (such as bringing
|
||||
* in different versions of a popular JSON parsing library, for instance), this can cause the extensions not to function properly.
|
||||
* </p>
|
||||
* <p>
|
||||
* Because we cannot control what is loaded by the System ClassLoader (that's up to the embedding application), the best that we can do is to block NiFi's extensions'
|
||||
* ClassLoaders from accessing those classes. This ClassLoader allows us to do just that, blocking specific classes that have been loaded by the parent ClassLoader
|
||||
* from being accessible by child ClassLoaders.
|
||||
* </p>
|
||||
*/
|
||||
public class BlockListClassLoader extends ClassLoader {
|
||||
private final Set<String> blockList;
|
||||
|
||||
public BlockListClassLoader(final ClassLoader parent, final Set<String> blockList) {
|
||||
super(parent);
|
||||
this.blockList = blockList;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the set of all Class names that will be blocked from loading by the parent
|
||||
*/
|
||||
public Set<String> getClassesBlocked() {
|
||||
return Collections.unmodifiableSet(blockList);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<?> loadClass(final String name, final boolean resolve) throws ClassNotFoundException {
|
||||
if (blockList.contains(name)) {
|
||||
throw new ClassNotFoundException(name + " was blocked by BlockListClassLoader");
|
||||
}
|
||||
|
||||
return super.loadClass(name, resolve);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<?> findClass(final String name) throws ClassNotFoundException {
|
||||
if (blockList.contains(name)) {
|
||||
throw new ClassNotFoundException(name + " was blocked by BlockListClassLoader");
|
||||
}
|
||||
|
||||
return super.findClass(name);
|
||||
}
|
||||
}
|
|
@ -35,46 +35,55 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.jar.JarFile;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.zip.ZipEntry;
|
||||
|
||||
public class StatelessBootstrap {
|
||||
private static final Logger logger = LoggerFactory.getLogger(StatelessBootstrap.class);
|
||||
private static final Pattern STATELESS_NAR_PATTERN = Pattern.compile("nifi-stateless-nar-.*\\.nar-unpacked");
|
||||
private final ClassLoader statelessClassLoader;
|
||||
private final ClassLoader engineClassLoader;
|
||||
private final ClassLoader extensionClassLoader;
|
||||
private final StatelessEngineConfiguration engineConfiguration;
|
||||
|
||||
private StatelessBootstrap(final ClassLoader statelessClassLoader, final StatelessEngineConfiguration engineConfiguration) {
|
||||
this.statelessClassLoader = statelessClassLoader;
|
||||
private StatelessBootstrap(final ClassLoader engineClassLoader, final ClassLoader extensionClassLoader, final StatelessEngineConfiguration engineConfiguration) {
|
||||
this.engineClassLoader = engineClassLoader;
|
||||
this.extensionClassLoader = extensionClassLoader;
|
||||
this.engineConfiguration = engineConfiguration;
|
||||
}
|
||||
|
||||
public <T> StatelessDataflow createDataflow(final DataflowDefinition<T> dataflowDefinition)
|
||||
throws IOException, StatelessConfigurationException {
|
||||
final StatelessDataflowFactory<T> dataflowFactory = getSingleInstance(statelessClassLoader, StatelessDataflowFactory.class);
|
||||
final StatelessDataflow dataflow = dataflowFactory.createDataflow(engineConfiguration, dataflowDefinition);
|
||||
final StatelessDataflowFactory<T> dataflowFactory = getSingleInstance(engineClassLoader, StatelessDataflowFactory.class);
|
||||
final StatelessDataflow dataflow = dataflowFactory.createDataflow(engineConfiguration, dataflowDefinition, extensionClassLoader);
|
||||
return dataflow;
|
||||
}
|
||||
|
||||
public DataflowDefinition<?> parseDataflowDefinition(final File flowDefinitionFile, final List<ParameterOverride> parameterOverrides)
|
||||
throws StatelessConfigurationException, IOException {
|
||||
final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(statelessClassLoader, DataflowDefinitionParser.class);
|
||||
final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(engineClassLoader, DataflowDefinitionParser.class);
|
||||
final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionFile, engineConfiguration, parameterOverrides);
|
||||
return dataflowDefinition;
|
||||
}
|
||||
|
||||
public DataflowDefinition<?> parseDataflowDefinition(final Map<String, String> flowDefinitionProperties, final List<ParameterOverride> parameterOverrides)
|
||||
throws StatelessConfigurationException, IOException {
|
||||
final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(statelessClassLoader, DataflowDefinitionParser.class);
|
||||
final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(engineClassLoader, DataflowDefinitionParser.class);
|
||||
final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionProperties, engineConfiguration, parameterOverrides);
|
||||
return dataflowDefinition;
|
||||
}
|
||||
|
@ -118,21 +127,137 @@ public class StatelessBootstrap {
|
|||
throw new IOException("Could not access contents of Stateless NAR dependencies at " + statelessNarDependencies);
|
||||
}
|
||||
|
||||
final URL[] urls = new URL[statelessNarContents.length];
|
||||
for (int i=0; i < statelessNarContents.length; i++) {
|
||||
final File dependency = statelessNarContents[i];
|
||||
final List<URL> urls = new ArrayList<>();
|
||||
final List<String> filenames = new ArrayList<>();
|
||||
for (final File dependency : statelessNarContents) {
|
||||
final URL url = dependency.toURI().toURL();
|
||||
urls[i] = url;
|
||||
urls.add(url);
|
||||
filenames.add(dependency.getName());
|
||||
}
|
||||
|
||||
logger.info("Creating Stateless Bootstrap with the following URLs in the classpath: {}", Arrays.asList(urls));
|
||||
if (rootClassLoader instanceof URLClassLoader) {
|
||||
logger.info("Additionally, Root ClassLoader has the following URLs available: {}", Arrays.asList(((URLClassLoader) rootClassLoader).getURLs()));
|
||||
logger.info("Creating Stateless Bootstrap with the following files in the classpath: {}", filenames);
|
||||
|
||||
final URL[] urlArray = urls.toArray(new URL[0]);
|
||||
final BlockListClassLoader extensionClassLoader = createExtensionRootClassLoader(narDirectory, rootClassLoader);
|
||||
|
||||
final Set<String> classesBlockedExtensions = extensionClassLoader.getClassesBlocked();
|
||||
|
||||
// For the engine ClassLoader, we also want to block everything that we block for extensions except for the classes
|
||||
// that the engine needs specifically (i.e., the classes in the stateless engine nar). We do this because there are some
|
||||
// classes that may need to be shared between the bootstrap and the caller. For example, VersionedFlowSnapshot.
|
||||
final URLClassLoader engineUrlClassLoader = new URLClassLoader(urlArray, rootClassLoader);
|
||||
final Set<String> engineSpecificClassNames = new HashSet<>();
|
||||
final Set<String> engineSpecificFiles = new HashSet<>();
|
||||
findClassNamesInJars(urls, engineSpecificClassNames, engineSpecificFiles);
|
||||
|
||||
final Set<String> classesBlockedEngine = new HashSet<>(classesBlockedExtensions);
|
||||
classesBlockedEngine.removeAll(engineSpecificClassNames);
|
||||
|
||||
logger.debug("Blocking the following classes from being loaded from parent {} for Engine by Stateless ClassLoaders: {}", engineUrlClassLoader, classesBlockedEngine);
|
||||
logger.debug("Blocking the following files from being loaded from parent {} for Engine by Stateless ClassLoaders: {}", engineUrlClassLoader, engineSpecificFiles);
|
||||
|
||||
final BlockListClassLoader engineClassLoader = new BlockListClassLoader(engineUrlClassLoader, classesBlockedEngine);
|
||||
|
||||
Thread.currentThread().setContextClassLoader(engineClassLoader);
|
||||
return new StatelessBootstrap(engineClassLoader, extensionClassLoader, engineConfiguration);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a ClassLoader that is to be used as the 'root'/parent for all NiFi Extensions' ClassLoaders. The ClassLoader will inherit from its parent
|
||||
* any classes that exist in JAR files that can be found in the given NAR Directory. However, it will not allow any other classes to be loaded from the parent.
|
||||
* This approach is important because we need to ensure that the ClassLoader that is provided to extensions when run from NiFi Stateless is the same as the ClassLoader
|
||||
* that will be provided to it in traditional NiFi. Whereas in traditional NiFi, we have the ability to control the System ClassLoader, Stateless NiFi is designed to be
|
||||
* embedded, so we cannot control the System ClassLoader of the embedding application. This gives us a way to ensure that we control what is available to Extensions and
|
||||
* still provides us the ability to load the necessary classes from the System ClassLoader, which prevents ClassCastExceptions that might otherwise occur if we were to
|
||||
* load the same classes from another ClassLoader.
|
||||
*
|
||||
* @param narDirectory the NAR directory whose .jar files should be made available via the parent.
|
||||
* @param parent the parent class loader that the given BlockListClassLoader should delegate to for classes that it does not block
|
||||
* @return a BlockListClassLoader that allows only the appropriate classes to be loaded from the given parent
|
||||
*/
|
||||
private static BlockListClassLoader createExtensionRootClassLoader(final File narDirectory, final ClassLoader parent) throws IOException {
|
||||
if (!(parent instanceof URLClassLoader)) {
|
||||
return new BlockListClassLoader(parent, Collections.emptySet());
|
||||
}
|
||||
|
||||
final URLClassLoader statelessClassLoader = new URLClassLoader(urls, rootClassLoader);
|
||||
Thread.currentThread().setContextClassLoader(statelessClassLoader);
|
||||
return new StatelessBootstrap(statelessClassLoader, engineConfiguration);
|
||||
final File[] narDirectoryFiles = narDirectory.listFiles();
|
||||
if (narDirectoryFiles == null) {
|
||||
throw new IOException("Could not get a listing of the NAR directory");
|
||||
}
|
||||
|
||||
final Set<URL> urls = new HashSet<>();
|
||||
findClassLoaderUrls(parent, urls);
|
||||
|
||||
final Set<String> classesBlocked = new HashSet<>();
|
||||
final Set<String> filesBlocked = new HashSet<>();
|
||||
findClassNamesInJars(urls, classesBlocked, filesBlocked);
|
||||
|
||||
final Set<String> classesAllowed = new HashSet<>();
|
||||
final Set<String> filesAllowed = new HashSet<>();
|
||||
for (final File file : narDirectoryFiles) {
|
||||
findClassNamesInJar(file, classesAllowed);
|
||||
filesAllowed.add(file.getName());
|
||||
}
|
||||
|
||||
classesBlocked.removeAll(classesAllowed);
|
||||
filesBlocked.removeAll(filesAllowed);
|
||||
|
||||
logger.debug("Blocking the following JAR files from being loaded by Stateless Extensions ClassLoaders from parent {}: {}", parent, filesBlocked);
|
||||
logger.debug("Blocking the following classes from being loaded by Stateless Extension ClassLoaders from parent {}: {}", parent, classesBlocked);
|
||||
|
||||
final BlockListClassLoader blockingClassLoader = new BlockListClassLoader(parent, classesBlocked);
|
||||
return blockingClassLoader;
|
||||
}
|
||||
|
||||
private static void findClassNamesInJars(final Collection<URL> jarUrls, final Set<String> classesFound, final Set<String> jarFilesFound) throws IOException {
|
||||
for (final URL url : jarUrls) {
|
||||
final File file;
|
||||
try {
|
||||
file = new File(url.toURI());
|
||||
} catch (URISyntaxException e) {
|
||||
logger.warn("Could not find file for {} in classpath", url);
|
||||
continue;
|
||||
}
|
||||
|
||||
findClassNamesInJar(file, classesFound);
|
||||
jarFilesFound.add(file.getName());
|
||||
}
|
||||
}
|
||||
|
||||
private static void findClassLoaderUrls(final ClassLoader classLoader, final Set<URL> urls) {
|
||||
if (classLoader == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (classLoader instanceof URLClassLoader) {
|
||||
final URLClassLoader urlClassLoader = (URLClassLoader) classLoader;
|
||||
urls.addAll(Arrays.asList(urlClassLoader.getURLs()));
|
||||
}
|
||||
|
||||
// If the classLoader is the system class loader, we are done. We don't want to process the parent of
|
||||
// the system class loader (which would be the Launcher$ExtClassLoader that contains the JDK/JRE classes, etc)
|
||||
if (classLoader != ClassLoader.getSystemClassLoader()) {
|
||||
findClassLoaderUrls(classLoader.getParent(), urls);
|
||||
}
|
||||
}
|
||||
|
||||
private static void findClassNamesInJar(final File file, final Set<String> classNames) throws IOException {
|
||||
if (!file.getName().endsWith(".jar") || !file.isFile() || !file.exists()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final JarFile jarFile = new JarFile(file);
|
||||
final Enumeration<? extends ZipEntry> enumeration = jarFile.entries();
|
||||
while (enumeration.hasMoreElements()) {
|
||||
final ZipEntry zipEntry = enumeration.nextElement();
|
||||
final String entryName = zipEntry.getName();
|
||||
|
||||
if (entryName.endsWith(".class")) {
|
||||
final int lastIndex = entryName.lastIndexOf(".class");
|
||||
final String className = entryName.substring(0, lastIndex).replace("/", ".");
|
||||
classNames.add(className);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static File locateStatelessNarWorkingDirectory(final File workingDirectory) throws IOException {
|
||||
|
|
|
@ -92,7 +92,8 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
|
|||
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);
|
||||
|
||||
@Override
|
||||
public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition)
|
||||
public StatelessDataflow createDataflow(final StatelessEngineConfiguration engineConfiguration, final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition,
|
||||
final ClassLoader extensionRootClassLoader)
|
||||
throws IOException, StatelessConfigurationException {
|
||||
final long start = System.currentTimeMillis();
|
||||
|
||||
|
@ -119,7 +120,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
|
|||
|
||||
final NarClassLoaders narClassLoaders = new NarClassLoaders();
|
||||
final File extensionsWorkingDir = new File(narExpansionDirectory, "extensions");
|
||||
final ClassLoader systemClassLoader = createSystemClassLoader(engineConfiguration.getNarDirectory());
|
||||
final ClassLoader systemClassLoader = createSystemClassLoader(engineConfiguration.getNarDirectory(), extensionRootClassLoader);
|
||||
final ExtensionDiscoveringManager extensionManager = ExtensionDiscovery.discover(extensionsWorkingDir, systemClassLoader, narClassLoaders, engineConfiguration.isLogExtensionDiscovery());
|
||||
|
||||
flowFileEventRepo = new RingBufferEventRepository(5);
|
||||
|
@ -306,17 +307,16 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
|
|||
return "nexus".equalsIgnoreCase(type.trim());
|
||||
}
|
||||
|
||||
private ClassLoader createSystemClassLoader(final File narDirectory) throws StatelessConfigurationException {
|
||||
final ClassLoader systemClassLoader = StatelessDataflowFactory.class.getClassLoader();
|
||||
private ClassLoader createSystemClassLoader(final File narDirectory, final ClassLoader extensionRootClassLoader) throws StatelessConfigurationException {
|
||||
final int javaMajorVersion = getJavaMajorVersion();
|
||||
if (javaMajorVersion >= 11) {
|
||||
// If running on Java 11 or greater, add the JAXB/activation/annotation libs to the classpath.
|
||||
// TODO: Once the minimum Java version requirement of NiFi is 11, this processing should be removed.
|
||||
// JAXB/activation/annotation will be added as an actual dependency via pom.xml.
|
||||
return createJava11OrLaterSystemClassLoader(javaMajorVersion, narDirectory, systemClassLoader);
|
||||
return createJava11OrLaterSystemClassLoader(javaMajorVersion, narDirectory, extensionRootClassLoader);
|
||||
}
|
||||
|
||||
return systemClassLoader;
|
||||
return extensionRootClassLoader;
|
||||
}
|
||||
|
||||
private ClassLoader createJava11OrLaterSystemClassLoader(final int javaMajorVersion, final File narDirectory, final ClassLoader parentClassLoader) throws StatelessConfigurationException {
|
||||
|
|
|
@ -29,11 +29,9 @@
|
|||
<useTransitiveFiltering>true</useTransitiveFiltering>
|
||||
|
||||
<excludes>
|
||||
<exclude>nifi-stateless-api</exclude>
|
||||
<exclude>nifi-framework-api</exclude>
|
||||
<exclude>nifi-stateless-bootstrap</exclude>
|
||||
<exclude>jackson-core</exclude>
|
||||
<exclude>jackson-databind</exclude>
|
||||
|
||||
<!-- TODO: remove these once minimum Java version is 11 -->
|
||||
<exclude>org.glassfish.jaxb:jaxb-runtime</exclude>
|
||||
<exclude>jakarta.xml.bind:jakarta.xml.bind-api</exclude>
|
||||
|
|
Loading…
Reference in New Issue