From 3ebfcd5ae5132e29482d571e767142c806e30469 Mon Sep 17 00:00:00 2001 From: "Peter G. Horvath" Date: Sun, 5 Nov 2017 12:25:15 +0100 Subject: [PATCH] NIFI-4424 Added functionality to allow NiFi to run in "embedded" mode for eventual integration test access. This closes #2251. Signed-off-by: Andy LoPresto --- .../org/apache/nifi/nar/NarClassLoaders.java | 31 ++++-- .../java/org/apache/nifi/EmbeddedNiFi.java | 57 +++++++++++ .../src/main/java/org/apache/nifi/NiFi.java | 99 ++++++++++++------- 3 files changed, 147 insertions(+), 40 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/EmbeddedNiFi.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java index 005a8faa27..8921b25754 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java @@ -100,7 +100,26 @@ public final class NarClassLoaders { * @throws IllegalStateException already initialized with a given pair of * directories cannot reinitialize or use a different pair of directories. */ - public void init(final File frameworkWorkingDir, final File extensionsWorkingDir) throws IOException, ClassNotFoundException { + public void init(File frameworkWorkingDir, File extensionsWorkingDir) throws IOException, ClassNotFoundException { + init(ClassLoader.getSystemClassLoader(), frameworkWorkingDir, extensionsWorkingDir); + } + + /** + * Initializes and loads the NarClassLoaders. This method must be called + * before the rest of the methods to access the classloaders are called and + * it can be safely called any number of times provided the same framework + * and extension working dirs are used. + * + * @param rootClassloader the root classloader to use for booting Jetty + * @param frameworkWorkingDir where to find framework artifacts + * @param extensionsWorkingDir where to find extension artifacts + * @throws java.io.IOException if any issue occurs while exploding nar working directories. + * @throws java.lang.ClassNotFoundException if unable to load class definition + * @throws IllegalStateException already initialized with a given pair of + * directories cannot reinitialize or use a different pair of directories. + */ + public void init(final ClassLoader rootClassloader, + final File frameworkWorkingDir, final File extensionsWorkingDir) throws IOException, ClassNotFoundException { if (frameworkWorkingDir == null || extensionsWorkingDir == null) { throw new NullPointerException("cannot have empty arguments"); } @@ -109,7 +128,7 @@ public final class NarClassLoaders { synchronized (this) { ic = initContext; if (ic == null) { - initContext = ic = load(frameworkWorkingDir, extensionsWorkingDir); + initContext = ic = load(rootClassloader, frameworkWorkingDir, extensionsWorkingDir); } } } @@ -123,9 +142,9 @@ public final class NarClassLoaders { /** * Should be called at most once. */ - private InitContext load(final File frameworkWorkingDir, final File extensionsWorkingDir) throws IOException, ClassNotFoundException { - // get the system classloader - final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader(); + private InitContext load(final ClassLoader rootClassloader, + final File frameworkWorkingDir, final File extensionsWorkingDir) + throws IOException, ClassNotFoundException { // find all nar files and create class loaders for them. final Map narDirectoryBundleLookup = new LinkedHashMap<>(); @@ -181,7 +200,7 @@ public final class NarClassLoaders { // look for the jetty nar if (JETTY_NAR_ID.equals(narDetail.getCoordinate().getId())) { // create the jetty classloader - jettyClassLoader = createNarClassLoader(narDetail.getWorkingDirectory(), systemClassLoader); + jettyClassLoader = createNarClassLoader(narDetail.getWorkingDirectory(), rootClassloader); // remove the jetty nar since its already loaded narDirectoryBundleLookup.put(narDetail.getWorkingDirectory().getCanonicalPath(), new Bundle(narDetail, jettyClassLoader)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/EmbeddedNiFi.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/EmbeddedNiFi.java new file mode 100644 index 0000000000..790947c7c6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/EmbeddedNiFi.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; + +/** + *

+ * Starts an instance of NiFi within the same JVM, which can later properly be shut down. + * Intended to be used for testing purposes.

+ * + */ +public class EmbeddedNiFi extends NiFi { + + public EmbeddedNiFi(String[] args, ClassLoader rootClassLoader) + throws ClassNotFoundException, IOException, NoSuchMethodException, + InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { + + super(convertArgumentsToValidatedNiFiProperties(args), rootClassLoader); + } + + @Override + protected void initLogging() { + // do nothing when running in embedded mode + } + + @Override + protected void setDefaultUncaughtExceptionHandler() { + // do nothing when running in embedded mode + } + + @Override + protected void addShutdownHook() { + // do nothing when running in embedded mode + } + + @Override + public void shutdown() { + super.shutdown(); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java index 58d17e44c6..68ee8c7c46 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java @@ -68,6 +68,13 @@ public class NiFi { public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { + this(properties, ClassLoader.getSystemClassLoader()); + + } + + public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader) + throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { + // There can only be one krb5.conf for the overall Java process so set this globally during // start up so that processors and our Kerberos authentication code don't have to set this final File kerberosConfigFile = properties.getKerberosConfigurationFile(); @@ -77,22 +84,10 @@ public class NiFi { System.setProperty("java.security.krb5.conf", kerberosConfigFilePath); } - Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString()); - LOGGER.error("", e); - } - }); + setDefaultUncaughtExceptionHandler(); // register the shutdown hook - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - // shutdown the jetty server - shutdownHook(); - } - })); + addShutdownHook(); final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY); if (bootstrapPort != null) { @@ -125,8 +120,7 @@ public class NiFi { detectTimingIssues(); // redirect JUL log events - SLF4JBridgeHandler.removeHandlersForRootLogger(); - SLF4JBridgeHandler.install(); + initLogging(); final Bundle systemBundle = SystemBundle.create(properties); @@ -134,15 +128,18 @@ public class NiFi { final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle); // load the extensions classloaders - NarClassLoaders.getInstance().init(properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory()); + NarClassLoaders narClassLoaders = NarClassLoaders.getInstance(); + + narClassLoaders.init(rootClassLoader, + properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory()); // load the framework classloader - final ClassLoader frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkBundle().getClassLoader(); + final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader(); if (frameworkClassLoader == null) { throw new IllegalStateException("Unable to find the framework NAR ClassLoader."); } - final Set narBundles = NarClassLoaders.getInstance().getBundles(); + final Set narBundles = narClassLoaders.getBundles(); // load the server from the framework classloader Thread.currentThread().setContextClassLoader(frameworkClassLoader); @@ -169,6 +166,31 @@ public class NiFi { } } + protected void setDefaultUncaughtExceptionHandler() { + Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(final Thread t, final Throwable e) { + LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString()); + LOGGER.error("", e); + } + }); + } + + protected void addShutdownHook() { + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + // shutdown the jetty server + shutdownHook(); + } + })); + } + + protected void initLogging() { + SLF4JBridgeHandler.removeHandlersForRootLogger(); + SLF4JBridgeHandler.install(); + } + private static ClassLoader createBootstrapClassLoader() throws IOException { //Get list of files in bootstrap folder final List urls = new ArrayList<>(); @@ -185,21 +207,25 @@ public class NiFi { protected void shutdownHook() { try { - this.shutdown = true; - - LOGGER.info("Initiating shutdown of Jetty web server..."); - if (nifiServer != null) { - nifiServer.stop(); - } - if (bootstrapListener != null) { - bootstrapListener.stop(); - } - LOGGER.info("Jetty web server shutdown completed (nicely or otherwise)."); + shutdown(); } catch (final Throwable t) { LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to " + t); } } + protected void shutdown() { + this.shutdown = true; + + LOGGER.info("Initiating shutdown of Jetty web server..."); + if (nifiServer != null) { + nifiServer.stop(); + } + if (bootstrapListener != null) { + bootstrapListener.stop(); + } + LOGGER.info("Jetty web server shutdown completed (nicely or otherwise)."); + } + /** * Determine if the machine we're running on has timing issues. */ @@ -262,15 +288,20 @@ public class NiFi { public static void main(String[] args) { LOGGER.info("Launching NiFi..."); try { - final ClassLoader bootstrap = createBootstrapClassLoader(); - NiFiProperties properties = initializeProperties(args, bootstrap); - properties.validate(); + NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args); new NiFi(properties); } catch (final Throwable t) { LOGGER.error("Failure to launch NiFi due to " + t, t); } } + protected static NiFiProperties convertArgumentsToValidatedNiFiProperties(String[] args) throws IOException { + final ClassLoader bootstrap = createBootstrapClassLoader(); + NiFiProperties properties = initializeProperties(args, bootstrap); + properties.validate(); + return properties; + } + private static NiFiProperties initializeProperties(final String[] args, final ClassLoader boostrapLoader) { // Try to get key // If key doesn't exist, instantiate without @@ -321,9 +352,9 @@ public class NiFi { if (null == key) { return ""; } else if (!isHexKeyValid(key)) { - throw new IllegalArgumentException("The key was not provided in valid hex format and of the correct length"); + throw new IllegalArgumentException("The key was not provided in valid hex format and of the correct length"); } else { - return key; + return key; } }