mirror of https://github.com/apache/nifi.git
NIFI-4424 Added functionality to allow NiFi to run in "embedded" mode for eventual integration test access.
This closes #2251. Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
parent
05e9e6eaa5
commit
3ebfcd5ae5
|
@ -100,7 +100,26 @@ public final class NarClassLoaders {
|
||||||
* @throws IllegalStateException already initialized with a given pair of
|
* @throws IllegalStateException already initialized with a given pair of
|
||||||
* directories cannot reinitialize or use a different pair of directories.
|
* directories cannot reinitialize or use a different pair of directories.
|
||||||
*/
|
*/
|
||||||
public void init(final File frameworkWorkingDir, final File extensionsWorkingDir) throws IOException, ClassNotFoundException {
|
public void init(File frameworkWorkingDir, File extensionsWorkingDir) throws IOException, ClassNotFoundException {
|
||||||
|
init(ClassLoader.getSystemClassLoader(), frameworkWorkingDir, extensionsWorkingDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes and loads the NarClassLoaders. This method must be called
|
||||||
|
* before the rest of the methods to access the classloaders are called and
|
||||||
|
* it can be safely called any number of times provided the same framework
|
||||||
|
* and extension working dirs are used.
|
||||||
|
*
|
||||||
|
* @param rootClassloader the root classloader to use for booting Jetty
|
||||||
|
* @param frameworkWorkingDir where to find framework artifacts
|
||||||
|
* @param extensionsWorkingDir where to find extension artifacts
|
||||||
|
* @throws java.io.IOException if any issue occurs while exploding nar working directories.
|
||||||
|
* @throws java.lang.ClassNotFoundException if unable to load class definition
|
||||||
|
* @throws IllegalStateException already initialized with a given pair of
|
||||||
|
* directories cannot reinitialize or use a different pair of directories.
|
||||||
|
*/
|
||||||
|
public void init(final ClassLoader rootClassloader,
|
||||||
|
final File frameworkWorkingDir, final File extensionsWorkingDir) throws IOException, ClassNotFoundException {
|
||||||
if (frameworkWorkingDir == null || extensionsWorkingDir == null) {
|
if (frameworkWorkingDir == null || extensionsWorkingDir == null) {
|
||||||
throw new NullPointerException("cannot have empty arguments");
|
throw new NullPointerException("cannot have empty arguments");
|
||||||
}
|
}
|
||||||
|
@ -109,7 +128,7 @@ public final class NarClassLoaders {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
ic = initContext;
|
ic = initContext;
|
||||||
if (ic == null) {
|
if (ic == null) {
|
||||||
initContext = ic = load(frameworkWorkingDir, extensionsWorkingDir);
|
initContext = ic = load(rootClassloader, frameworkWorkingDir, extensionsWorkingDir);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -123,9 +142,9 @@ public final class NarClassLoaders {
|
||||||
/**
|
/**
|
||||||
* Should be called at most once.
|
* Should be called at most once.
|
||||||
*/
|
*/
|
||||||
private InitContext load(final File frameworkWorkingDir, final File extensionsWorkingDir) throws IOException, ClassNotFoundException {
|
private InitContext load(final ClassLoader rootClassloader,
|
||||||
// get the system classloader
|
final File frameworkWorkingDir, final File extensionsWorkingDir)
|
||||||
final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
|
throws IOException, ClassNotFoundException {
|
||||||
|
|
||||||
// find all nar files and create class loaders for them.
|
// find all nar files and create class loaders for them.
|
||||||
final Map<String, Bundle> narDirectoryBundleLookup = new LinkedHashMap<>();
|
final Map<String, Bundle> narDirectoryBundleLookup = new LinkedHashMap<>();
|
||||||
|
@ -181,7 +200,7 @@ public final class NarClassLoaders {
|
||||||
// look for the jetty nar
|
// look for the jetty nar
|
||||||
if (JETTY_NAR_ID.equals(narDetail.getCoordinate().getId())) {
|
if (JETTY_NAR_ID.equals(narDetail.getCoordinate().getId())) {
|
||||||
// create the jetty classloader
|
// create the jetty classloader
|
||||||
jettyClassLoader = createNarClassLoader(narDetail.getWorkingDirectory(), systemClassLoader);
|
jettyClassLoader = createNarClassLoader(narDetail.getWorkingDirectory(), rootClassloader);
|
||||||
|
|
||||||
// remove the jetty nar since its already loaded
|
// remove the jetty nar since its already loaded
|
||||||
narDirectoryBundleLookup.put(narDetail.getWorkingDirectory().getCanonicalPath(), new Bundle(narDetail, jettyClassLoader));
|
narDirectoryBundleLookup.put(narDetail.getWorkingDirectory().getCanonicalPath(), new Bundle(narDetail, jettyClassLoader));
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Starts an instance of NiFi within the <b>same JVM</b>, which can later properly be shut down.
|
||||||
|
* Intended to be used for testing purposes.</p>
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class EmbeddedNiFi extends NiFi {
|
||||||
|
|
||||||
|
public EmbeddedNiFi(String[] args, ClassLoader rootClassLoader)
|
||||||
|
throws ClassNotFoundException, IOException, NoSuchMethodException,
|
||||||
|
InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
|
||||||
|
|
||||||
|
super(convertArgumentsToValidatedNiFiProperties(args), rootClassLoader);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void initLogging() {
|
||||||
|
// do nothing when running in embedded mode
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setDefaultUncaughtExceptionHandler() {
|
||||||
|
// do nothing when running in embedded mode
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void addShutdownHook() {
|
||||||
|
// do nothing when running in embedded mode
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
super.shutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -68,6 +68,13 @@ public class NiFi {
|
||||||
public NiFi(final NiFiProperties properties)
|
public NiFi(final NiFiProperties properties)
|
||||||
throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
|
throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
|
||||||
|
|
||||||
|
this(properties, ClassLoader.getSystemClassLoader());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader)
|
||||||
|
throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
|
||||||
|
|
||||||
// There can only be one krb5.conf for the overall Java process so set this globally during
|
// There can only be one krb5.conf for the overall Java process so set this globally during
|
||||||
// start up so that processors and our Kerberos authentication code don't have to set this
|
// start up so that processors and our Kerberos authentication code don't have to set this
|
||||||
final File kerberosConfigFile = properties.getKerberosConfigurationFile();
|
final File kerberosConfigFile = properties.getKerberosConfigurationFile();
|
||||||
|
@ -77,22 +84,10 @@ public class NiFi {
|
||||||
System.setProperty("java.security.krb5.conf", kerberosConfigFilePath);
|
System.setProperty("java.security.krb5.conf", kerberosConfigFilePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
|
setDefaultUncaughtExceptionHandler();
|
||||||
@Override
|
|
||||||
public void uncaughtException(final Thread t, final Throwable e) {
|
|
||||||
LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString());
|
|
||||||
LOGGER.error("", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// register the shutdown hook
|
// register the shutdown hook
|
||||||
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
|
addShutdownHook();
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
// shutdown the jetty server
|
|
||||||
shutdownHook();
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
|
final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
|
||||||
if (bootstrapPort != null) {
|
if (bootstrapPort != null) {
|
||||||
|
@ -125,8 +120,7 @@ public class NiFi {
|
||||||
detectTimingIssues();
|
detectTimingIssues();
|
||||||
|
|
||||||
// redirect JUL log events
|
// redirect JUL log events
|
||||||
SLF4JBridgeHandler.removeHandlersForRootLogger();
|
initLogging();
|
||||||
SLF4JBridgeHandler.install();
|
|
||||||
|
|
||||||
final Bundle systemBundle = SystemBundle.create(properties);
|
final Bundle systemBundle = SystemBundle.create(properties);
|
||||||
|
|
||||||
|
@ -134,15 +128,18 @@ public class NiFi {
|
||||||
final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);
|
final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);
|
||||||
|
|
||||||
// load the extensions classloaders
|
// load the extensions classloaders
|
||||||
NarClassLoaders.getInstance().init(properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
|
NarClassLoaders narClassLoaders = NarClassLoaders.getInstance();
|
||||||
|
|
||||||
|
narClassLoaders.init(rootClassLoader,
|
||||||
|
properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
|
||||||
|
|
||||||
// load the framework classloader
|
// load the framework classloader
|
||||||
final ClassLoader frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkBundle().getClassLoader();
|
final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader();
|
||||||
if (frameworkClassLoader == null) {
|
if (frameworkClassLoader == null) {
|
||||||
throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
|
throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
|
||||||
}
|
}
|
||||||
|
|
||||||
final Set<Bundle> narBundles = NarClassLoaders.getInstance().getBundles();
|
final Set<Bundle> narBundles = narClassLoaders.getBundles();
|
||||||
|
|
||||||
// load the server from the framework classloader
|
// load the server from the framework classloader
|
||||||
Thread.currentThread().setContextClassLoader(frameworkClassLoader);
|
Thread.currentThread().setContextClassLoader(frameworkClassLoader);
|
||||||
|
@ -169,6 +166,31 @@ public class NiFi {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void setDefaultUncaughtExceptionHandler() {
|
||||||
|
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
|
||||||
|
@Override
|
||||||
|
public void uncaughtException(final Thread t, final Throwable e) {
|
||||||
|
LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString());
|
||||||
|
LOGGER.error("", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addShutdownHook() {
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// shutdown the jetty server
|
||||||
|
shutdownHook();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void initLogging() {
|
||||||
|
SLF4JBridgeHandler.removeHandlersForRootLogger();
|
||||||
|
SLF4JBridgeHandler.install();
|
||||||
|
}
|
||||||
|
|
||||||
private static ClassLoader createBootstrapClassLoader() throws IOException {
|
private static ClassLoader createBootstrapClassLoader() throws IOException {
|
||||||
//Get list of files in bootstrap folder
|
//Get list of files in bootstrap folder
|
||||||
final List<URL> urls = new ArrayList<>();
|
final List<URL> urls = new ArrayList<>();
|
||||||
|
@ -185,6 +207,13 @@ public class NiFi {
|
||||||
|
|
||||||
protected void shutdownHook() {
|
protected void shutdownHook() {
|
||||||
try {
|
try {
|
||||||
|
shutdown();
|
||||||
|
} catch (final Throwable t) {
|
||||||
|
LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to " + t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void shutdown() {
|
||||||
this.shutdown = true;
|
this.shutdown = true;
|
||||||
|
|
||||||
LOGGER.info("Initiating shutdown of Jetty web server...");
|
LOGGER.info("Initiating shutdown of Jetty web server...");
|
||||||
|
@ -195,9 +224,6 @@ public class NiFi {
|
||||||
bootstrapListener.stop();
|
bootstrapListener.stop();
|
||||||
}
|
}
|
||||||
LOGGER.info("Jetty web server shutdown completed (nicely or otherwise).");
|
LOGGER.info("Jetty web server shutdown completed (nicely or otherwise).");
|
||||||
} catch (final Throwable t) {
|
|
||||||
LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to " + t);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -262,15 +288,20 @@ public class NiFi {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
LOGGER.info("Launching NiFi...");
|
LOGGER.info("Launching NiFi...");
|
||||||
try {
|
try {
|
||||||
final ClassLoader bootstrap = createBootstrapClassLoader();
|
NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args);
|
||||||
NiFiProperties properties = initializeProperties(args, bootstrap);
|
|
||||||
properties.validate();
|
|
||||||
new NiFi(properties);
|
new NiFi(properties);
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
LOGGER.error("Failure to launch NiFi due to " + t, t);
|
LOGGER.error("Failure to launch NiFi due to " + t, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static NiFiProperties convertArgumentsToValidatedNiFiProperties(String[] args) throws IOException {
|
||||||
|
final ClassLoader bootstrap = createBootstrapClassLoader();
|
||||||
|
NiFiProperties properties = initializeProperties(args, bootstrap);
|
||||||
|
properties.validate();
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
private static NiFiProperties initializeProperties(final String[] args, final ClassLoader boostrapLoader) {
|
private static NiFiProperties initializeProperties(final String[] args, final ClassLoader boostrapLoader) {
|
||||||
// Try to get key
|
// Try to get key
|
||||||
// If key doesn't exist, instantiate without
|
// If key doesn't exist, instantiate without
|
||||||
|
|
Loading…
Reference in New Issue