mirror of
https://github.com/apache/nifi.git
synced 2025-03-03 16:09:19 +00:00
NIFI-13813 Refactored NiFi Runtime Property Loading (#9322)
- Refactored NiFi Runtime class to read application properties for NAR loading and then load application properties using Framework NAR - Removed bootstrap Class Loader from runtime initialization - Removed nifi-properties-loader from nifi-bootstrap
This commit is contained in:
parent
12c3280917
commit
0b85dd696e
@ -29,12 +29,5 @@ language governing permissions and limitations under the License. -->
|
||||
<artifactId>nifi-security-cert-builder</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<!-- Referenced in org.apache.nifi.NiFi -->
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-properties-loader</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
@ -35,4 +35,21 @@
|
||||
<artifactId>nifi-nar-utils</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<!-- Add Implementation-Version for logging -->
|
||||
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
|
||||
</manifest>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
@ -16,317 +16,29 @@
|
||||
*/
|
||||
package org.apache.nifi;
|
||||
|
||||
import org.apache.nifi.bundle.Bundle;
|
||||
import org.apache.nifi.diagnostics.DiagnosticsDump;
|
||||
import org.apache.nifi.nar.ExtensionMapping;
|
||||
import org.apache.nifi.nar.NarClassLoaders;
|
||||
import org.apache.nifi.nar.NarClassLoadersHolder;
|
||||
import org.apache.nifi.nar.NarUnpackMode;
|
||||
import org.apache.nifi.nar.NarUnpacker;
|
||||
import org.apache.nifi.nar.SystemBundle;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.runtime.ManagementServer;
|
||||
import org.apache.nifi.runtime.StandardManagementServer;
|
||||
import org.apache.nifi.util.DiagnosticUtils;
|
||||
import org.apache.nifi.util.FileUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.nifi.runtime.StandardUncaughtExceptionHandler;
|
||||
import org.apache.nifi.runtime.Application;
|
||||
import org.slf4j.bridge.SLF4JBridgeHandler;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Application main class
|
||||
*/
|
||||
public class NiFi {
|
||||
|
||||
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss");
|
||||
|
||||
private static final String MANAGEMENT_SERVER_ADDRESS = "org.apache.nifi.management.server.address";
|
||||
|
||||
private static final Pattern MANAGEMENT_SERVER_ADDRESS_PATTERN = Pattern.compile("^(.+?):([1-9][0-9]{3,4})$");
|
||||
|
||||
private static final String MANAGEMENT_SERVER_DEFAULT_ADDRESS = "127.0.0.1:52020";
|
||||
|
||||
private static final int ADDRESS_GROUP = 1;
|
||||
|
||||
private static final int PORT_GROUP = 2;
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(NiFi.class);
|
||||
|
||||
private final NiFiServer nifiServer;
|
||||
|
||||
private final NiFiProperties properties;
|
||||
|
||||
private final ManagementServer managementServer;
|
||||
|
||||
private volatile boolean shutdown = false;
|
||||
|
||||
public NiFi(final NiFiProperties properties)
|
||||
throws ClassNotFoundException, IOException, IllegalArgumentException {
|
||||
this(properties, ClassLoader.getSystemClassLoader());
|
||||
}
|
||||
|
||||
public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader)
|
||||
throws ClassNotFoundException, IOException, IllegalArgumentException {
|
||||
|
||||
this.properties = properties;
|
||||
|
||||
// There can only be one krb5.conf for the overall Java process so set this globally during
|
||||
// start up so that processors and our Kerberos authentication code don't have to set this
|
||||
final File kerberosConfigFile = properties.getKerberosConfigurationFile();
|
||||
if (kerberosConfigFile != null) {
|
||||
final String kerberosConfigFilePath = kerberosConfigFile.getAbsolutePath();
|
||||
LOGGER.debug("Setting java.security.krb5.conf to {}", kerberosConfigFilePath);
|
||||
System.setProperty("java.security.krb5.conf", kerberosConfigFilePath);
|
||||
}
|
||||
|
||||
setDefaultUncaughtExceptionHandler();
|
||||
|
||||
// register the shutdown hook
|
||||
addShutdownHook();
|
||||
|
||||
// delete the web working dir - if the application does not start successfully
|
||||
// the web app directories might be in an invalid state. when this happens
|
||||
// jetty will not attempt to re-extract the war into the directory. by removing
|
||||
// the working directory, we can be assured that it will attempt to extract the
|
||||
// war every time the application starts.
|
||||
File webWorkingDir = properties.getWebWorkingDirectory();
|
||||
FileUtils.deleteFilesInDirectory(webWorkingDir, null, LOGGER, true, true);
|
||||
FileUtils.deleteFile(webWorkingDir, LOGGER, 3);
|
||||
|
||||
// redirect JUL log events
|
||||
initLogging();
|
||||
|
||||
final Bundle systemBundle = SystemBundle.create(properties, rootClassLoader);
|
||||
|
||||
// expand the nars
|
||||
final NarUnpackMode unpackMode = properties.isUnpackNarsToUberJar() ? NarUnpackMode.UNPACK_TO_UBER_JAR : NarUnpackMode.UNPACK_INDIVIDUAL_JARS;
|
||||
final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle, unpackMode);
|
||||
|
||||
// load the extensions classloaders
|
||||
NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
|
||||
|
||||
narClassLoaders.init(rootClassLoader, properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory(), true);
|
||||
|
||||
// load the framework classloader
|
||||
final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader();
|
||||
if (frameworkClassLoader == null) {
|
||||
throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
|
||||
}
|
||||
|
||||
final Set<Bundle> narBundles = narClassLoaders.getBundles();
|
||||
|
||||
final long startTime = System.nanoTime();
|
||||
nifiServer = narClassLoaders.getServer();
|
||||
if (nifiServer == null) {
|
||||
throw new IllegalStateException("Unable to find a NiFiServer implementation.");
|
||||
}
|
||||
Thread.currentThread().setContextClassLoader(nifiServer.getClass().getClassLoader());
|
||||
// Filter out the framework NAR from being loaded by the NiFiServer
|
||||
nifiServer.initialize(properties,
|
||||
systemBundle,
|
||||
narBundles,
|
||||
extensionMapping);
|
||||
|
||||
managementServer = getManagementServer();
|
||||
if (shutdown) {
|
||||
LOGGER.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
|
||||
} else {
|
||||
nifiServer.start();
|
||||
managementServer.start();
|
||||
|
||||
final long duration = System.nanoTime() - startTime;
|
||||
final double durationSeconds = TimeUnit.NANOSECONDS.toMillis(duration) / 1000.0;
|
||||
LOGGER.info("Started Application Controller in {} seconds ({} ns)", durationSeconds, duration);
|
||||
}
|
||||
}
|
||||
|
||||
public NiFiServer getServer() {
|
||||
return nifiServer;
|
||||
}
|
||||
|
||||
protected void setDefaultUncaughtExceptionHandler() {
|
||||
Thread.setDefaultUncaughtExceptionHandler(new ExceptionHandler());
|
||||
}
|
||||
|
||||
protected void addShutdownHook() {
|
||||
final Thread shutdownHook = Thread.ofPlatform()
|
||||
.name(NiFi.class.getSimpleName())
|
||||
.uncaughtExceptionHandler(new ExceptionHandler())
|
||||
.unstarted(this::stop);
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(shutdownHook);
|
||||
}
|
||||
|
||||
protected void initLogging() {
|
||||
/**
|
||||
* Start Application after initializing logging and uncaught exception handling
|
||||
*
|
||||
* @param arguments Application arguments are ignored
|
||||
*/
|
||||
public static void main(final String[] arguments) {
|
||||
// Install JUL SLF4J Bridge logging before starting application
|
||||
SLF4JBridgeHandler.removeHandlersForRootLogger();
|
||||
SLF4JBridgeHandler.install();
|
||||
}
|
||||
|
||||
private static ClassLoader createBootstrapClassLoader() {
|
||||
//Get list of files in bootstrap folder
|
||||
final List<URL> urls = new ArrayList<>();
|
||||
try (final Stream<Path> files = Files.list(Paths.get("lib/bootstrap"))) {
|
||||
files.forEach(p -> {
|
||||
try {
|
||||
urls.add(p.toUri().toURL());
|
||||
} catch (final MalformedURLException mef) {
|
||||
LOGGER.warn("Unable to load bootstrap library [{}]", p.getFileName(), mef);
|
||||
}
|
||||
});
|
||||
} catch (IOException ioe) {
|
||||
LOGGER.warn("Unable to access lib/bootstrap to create bootstrap classloader", ioe);
|
||||
}
|
||||
//Create the bootstrap classloader
|
||||
return new URLClassLoader(urls.toArray(new URL[0]), Thread.currentThread().getContextClassLoader());
|
||||
}
|
||||
// Set Uncaught Exception Handler before other operations
|
||||
Thread.setDefaultUncaughtExceptionHandler(new StandardUncaughtExceptionHandler());
|
||||
|
||||
/**
|
||||
* Stop Application and shutdown server
|
||||
*/
|
||||
public void stop() {
|
||||
try {
|
||||
runDiagnosticsOnShutdown();
|
||||
shutdown();
|
||||
} catch (final Throwable t) {
|
||||
LOGGER.warn("Application Controller shutdown failed", t);
|
||||
}
|
||||
}
|
||||
|
||||
private void runDiagnosticsOnShutdown() throws IOException {
|
||||
if (properties.isDiagnosticsOnShutdownEnabled()) {
|
||||
final String diagnosticDirectoryPath = properties.getDiagnosticsOnShutdownDirectory();
|
||||
final boolean isCreated = DiagnosticUtils.createDiagnosticDirectory(diagnosticDirectoryPath);
|
||||
if (isCreated) {
|
||||
LOGGER.debug("Diagnostic directory has successfully been created.");
|
||||
}
|
||||
while (DiagnosticUtils.isFileCountExceeded(diagnosticDirectoryPath, properties.getDiagnosticsOnShutdownMaxFileCount())
|
||||
|| DiagnosticUtils.isSizeExceeded(diagnosticDirectoryPath, DataUnit.parseDataSize(properties.getDiagnosticsOnShutdownDirectoryMaxSize(), DataUnit.B).longValue())) {
|
||||
final Path oldestFile = DiagnosticUtils.getOldestFile(diagnosticDirectoryPath);
|
||||
Files.delete(oldestFile);
|
||||
}
|
||||
final String fileName = String.format("%s/diagnostic-%s.log", diagnosticDirectoryPath, DATE_TIME_FORMATTER.format(LocalDateTime.now()));
|
||||
diagnose(new File(fileName), properties.isDiagnosticsOnShutdownVerbose());
|
||||
}
|
||||
}
|
||||
|
||||
private void diagnose(final File file, final boolean verbose) throws IOException {
|
||||
final DiagnosticsDump diagnosticsDump = getServer().getDiagnosticsFactory().create(verbose);
|
||||
try (final OutputStream fileOutputStream = new FileOutputStream(file)) {
|
||||
diagnosticsDump.writeTo(fileOutputStream);
|
||||
}
|
||||
}
|
||||
|
||||
protected void shutdown() {
|
||||
this.shutdown = true;
|
||||
|
||||
LOGGER.info("Application Controller shutdown started");
|
||||
|
||||
managementServer.stop();
|
||||
|
||||
if (nifiServer == null) {
|
||||
LOGGER.info("Application Server not running");
|
||||
} else {
|
||||
nifiServer.stop();
|
||||
}
|
||||
|
||||
LOGGER.info("Application Controller shutdown completed");
|
||||
}
|
||||
|
||||
private ManagementServer getManagementServer() {
|
||||
final String managementServerAddressProperty = System.getProperty(MANAGEMENT_SERVER_ADDRESS, MANAGEMENT_SERVER_DEFAULT_ADDRESS);
|
||||
if (managementServerAddressProperty.isBlank()) {
|
||||
throw new IllegalStateException("Management Server Address System Property [%s] not configured".formatted(MANAGEMENT_SERVER_ADDRESS));
|
||||
}
|
||||
|
||||
final Matcher matcher = MANAGEMENT_SERVER_ADDRESS_PATTERN.matcher(managementServerAddressProperty);
|
||||
if (matcher.matches()) {
|
||||
final String addressGroup = matcher.group(ADDRESS_GROUP);
|
||||
final String portGroup = matcher.group(PORT_GROUP);
|
||||
final int port = Integer.parseInt(portGroup);
|
||||
|
||||
final InetSocketAddress bindAddress = new InetSocketAddress(addressGroup, port);
|
||||
return new StandardManagementServer(bindAddress, nifiServer);
|
||||
} else {
|
||||
throw new IllegalStateException("Management Server Address System Property [%s] not valid [%s]".formatted(MANAGEMENT_SERVER_ADDRESS, managementServerAddressProperty));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Main entry point of the application.
|
||||
*
|
||||
* @param args things which are ignored
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
LOGGER.info("Launching NiFi...");
|
||||
try {
|
||||
NiFiProperties properties = loadProperties();
|
||||
new NiFi(properties);
|
||||
} catch (final Throwable t) {
|
||||
LOGGER.error("Failure to launch NiFi", t);
|
||||
}
|
||||
}
|
||||
|
||||
protected static NiFiProperties loadProperties() {
|
||||
return loadProperties(createBootstrapClassLoader());
|
||||
}
|
||||
|
||||
protected static NiFiProperties loadProperties(final ClassLoader bootstrapClassLoader) {
|
||||
NiFiProperties properties = initializeProperties(bootstrapClassLoader);
|
||||
properties.validate();
|
||||
return properties;
|
||||
}
|
||||
|
||||
private static NiFiProperties initializeProperties(final ClassLoader boostrapLoader) {
|
||||
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(boostrapLoader);
|
||||
|
||||
try {
|
||||
final Class<?> propsLoaderClass = Class.forName("org.apache.nifi.properties.NiFiPropertiesLoader", true, boostrapLoader);
|
||||
final Object loaderInstance = propsLoaderClass.getConstructor().newInstance();
|
||||
final Method getMethod = propsLoaderClass.getMethod("get");
|
||||
final NiFiProperties properties = (NiFiProperties) getMethod.invoke(loaderInstance);
|
||||
LOGGER.info("Application Properties loaded [{}]", properties.size());
|
||||
return properties;
|
||||
} catch (final InstantiationException | InvocationTargetException wrappedException) {
|
||||
final String msg = "There was an issue loading properties";
|
||||
throw new IllegalArgumentException(msg, wrappedException.getCause() == null ? wrappedException : wrappedException.getCause());
|
||||
} catch (final IllegalAccessException | NoSuchMethodException | ClassNotFoundException reex) {
|
||||
final String msg = "Unable to access properties loader in the expected manner - apparent classpath or build issue";
|
||||
throw new IllegalArgumentException(msg, reex);
|
||||
} catch (final RuntimeException e) {
|
||||
final String msg = "There was an issue decrypting protected properties";
|
||||
throw new IllegalArgumentException(msg, e);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(contextClassLoader);
|
||||
}
|
||||
}
|
||||
|
||||
private static class ExceptionHandler implements Thread.UncaughtExceptionHandler {
|
||||
|
||||
@Override
|
||||
public void uncaughtException(final Thread thread, Throwable exception) {
|
||||
LOGGER.error("An Unknown Error Occurred in Thread {}", thread, exception);
|
||||
}
|
||||
// Run Application
|
||||
final Runnable applicationCommand = new Application();
|
||||
applicationCommand.run();
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,164 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.runtime;
|
||||
|
||||
import org.apache.nifi.NiFiServer;
|
||||
import org.apache.nifi.bundle.Bundle;
|
||||
import org.apache.nifi.nar.ExtensionMapping;
|
||||
import org.apache.nifi.nar.NarClassLoaders;
|
||||
import org.apache.nifi.nar.NarClassLoadersHolder;
|
||||
import org.apache.nifi.nar.NarUnpackMode;
|
||||
import org.apache.nifi.nar.NarUnpacker;
|
||||
import org.apache.nifi.nar.SystemBundle;
|
||||
import org.apache.nifi.runtime.command.DiagnosticsCommand;
|
||||
import org.apache.nifi.runtime.command.ShutdownCommand;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Application command encapsulates standard initialization and shutdown hook registration
|
||||
*/
|
||||
public class Application implements Runnable {
|
||||
private static final String SECURITY_KRB5_CONF_PROPERTY = "java.security.krb5.conf";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(Application.class);
|
||||
|
||||
private static final String VERSION_UNKNOWN = "UNKNOWN";
|
||||
|
||||
private static final String APPLICATION_VERSION = Objects.requireNonNullElse(Application.class.getPackage().getImplementationVersion(), VERSION_UNKNOWN);
|
||||
|
||||
/**
|
||||
* Run application
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
logger.info("Starting NiFi {} using Java {} with PID {}", APPLICATION_VERSION, Runtime.version(), ProcessHandle.current().pid());
|
||||
final Instant started = Instant.now();
|
||||
|
||||
try {
|
||||
run(started);
|
||||
} catch (final Throwable e) {
|
||||
logger.error("Starting NiFi failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void run(final Instant started) {
|
||||
final NiFiProperties properties = PropertiesProvider.readProperties();
|
||||
final Bundle systemBundle = createSystemBundle(properties);
|
||||
final NarUnpackMode unpackMode = properties.isUnpackNarsToUberJar() ? NarUnpackMode.UNPACK_TO_UBER_JAR : NarUnpackMode.UNPACK_INDIVIDUAL_JARS;
|
||||
final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle, unpackMode);
|
||||
final NarClassLoaders narClassLoaders = initializeClassLoaders(properties, systemBundle.getClassLoader());
|
||||
|
||||
final NiFiServer applicationServer = narClassLoaders.getServer();
|
||||
if (applicationServer == null) {
|
||||
logger.error("Server implementation of [{}] not found", NiFiServer.class.getName());
|
||||
} else {
|
||||
try {
|
||||
startServer(systemBundle, extensionMapping, narClassLoaders, applicationServer, started);
|
||||
} catch (final Throwable e) {
|
||||
logger.error("Start Server failed", e);
|
||||
applicationServer.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Bundle createSystemBundle(final NiFiProperties properties) {
|
||||
final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
|
||||
final String narLibraryDirectory = properties.getProperty(NiFiProperties.NAR_LIBRARY_DIRECTORY);
|
||||
return SystemBundle.create(narLibraryDirectory, systemClassLoader);
|
||||
}
|
||||
|
||||
private NarClassLoaders initializeClassLoaders(final NiFiProperties properties, final ClassLoader systemClassLoader) {
|
||||
final NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
|
||||
|
||||
final File frameworkWorkingDirectory = properties.getFrameworkWorkingDirectory();
|
||||
final File extensionsWorkingDirectory = properties.getExtensionsWorkingDirectory();
|
||||
|
||||
try {
|
||||
narClassLoaders.init(systemClassLoader, frameworkWorkingDirectory, extensionsWorkingDirectory, true);
|
||||
} catch (final Exception e) {
|
||||
logger.error("NAR Class Loaders initialization failed", e);
|
||||
}
|
||||
|
||||
return narClassLoaders;
|
||||
}
|
||||
|
||||
private void startServer(
|
||||
final Bundle systemBundle,
|
||||
final ExtensionMapping extensionMapping,
|
||||
final NarClassLoaders narClassLoaders,
|
||||
final NiFiServer applicationServer,
|
||||
final Instant started
|
||||
) {
|
||||
// Set Application Server Class Loader for subsequent operations
|
||||
final ClassLoader applicationServerClassLoader = narClassLoaders.getServer().getClass().getClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(applicationServerClassLoader);
|
||||
|
||||
// Read Properties from Framework NAR containing nifi-properties-loader for additional processing
|
||||
final Bundle frameworkBundle = narClassLoaders.getFrameworkBundle();
|
||||
final ClassLoader frameworkClassLoader = frameworkBundle.getClassLoader();
|
||||
final NiFiProperties properties = PropertiesProvider.readProperties(frameworkClassLoader);
|
||||
|
||||
// Initialize Application Server with properties and extensions
|
||||
setKerberosConfiguration(properties);
|
||||
final Set<Bundle> narBundles = narClassLoaders.getBundles();
|
||||
applicationServer.initialize(properties, systemBundle, narBundles, extensionMapping);
|
||||
|
||||
final ManagementServer managementServer = ManagementServerProvider.getManagementServer(applicationServer);
|
||||
|
||||
// Start Application Server before Management Server
|
||||
applicationServer.start();
|
||||
managementServer.start();
|
||||
|
||||
// Add Shutdown Hook after application started
|
||||
final Runnable diagnosticsCommand = new DiagnosticsCommand(properties, applicationServer);
|
||||
final Runnable shutdownCommand = new ShutdownCommand(applicationServer, managementServer, diagnosticsCommand);
|
||||
addShutdownHook(shutdownCommand);
|
||||
|
||||
final Instant completed = Instant.now();
|
||||
final Duration duration = Duration.between(started, completed);
|
||||
final double durationSeconds = duration.toMillis() / 1000.0;
|
||||
logger.info("Started Application in {} seconds ({} ns)", durationSeconds, duration.toNanos());
|
||||
}
|
||||
|
||||
private void addShutdownHook(final Runnable shutdownCommand) {
|
||||
final Thread shutdownHook = Thread.ofPlatform()
|
||||
.name(shutdownCommand.getClass().getSimpleName())
|
||||
.uncaughtExceptionHandler(new StandardUncaughtExceptionHandler())
|
||||
.unstarted(shutdownCommand);
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(shutdownHook);
|
||||
}
|
||||
|
||||
private void setKerberosConfiguration(final NiFiProperties properties) {
|
||||
final File kerberosConfigFile = properties.getKerberosConfigurationFile();
|
||||
if (kerberosConfigFile == null) {
|
||||
logger.debug("Application Kerberos Configuration not specified");
|
||||
} else {
|
||||
final String kerberosConfigFilePath = kerberosConfigFile.getAbsolutePath();
|
||||
// Set System Kerberos Configuration based on application properties
|
||||
System.setProperty(SECURITY_KRB5_CONF_PROPERTY, kerberosConfigFilePath);
|
||||
}
|
||||
}
|
||||
}
|
@ -22,7 +22,7 @@ import org.apache.nifi.NiFiServer;
|
||||
import org.apache.nifi.cluster.ClusterDetailsFactory;
|
||||
import org.apache.nifi.cluster.ConnectionState;
|
||||
import org.apache.nifi.controller.DecommissionTask;
|
||||
import org.apache.nifi.util.HttpExchangeUtils;
|
||||
import org.apache.nifi.runtime.util.HttpExchangeUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
@ -21,7 +21,7 @@ import com.sun.net.httpserver.HttpHandler;
|
||||
import org.apache.nifi.NiFiServer;
|
||||
import org.apache.nifi.diagnostics.DiagnosticsDump;
|
||||
import org.apache.nifi.diagnostics.DiagnosticsFactory;
|
||||
import org.apache.nifi.util.HttpExchangeUtils;
|
||||
import org.apache.nifi.runtime.util.HttpExchangeUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
@ -18,7 +18,7 @@ package org.apache.nifi.runtime;
|
||||
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import org.apache.nifi.util.HttpExchangeUtils;
|
||||
import org.apache.nifi.runtime.util.HttpExchangeUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
@ -21,7 +21,7 @@ import com.sun.net.httpserver.HttpHandler;
|
||||
import org.apache.nifi.NiFiServer;
|
||||
import org.apache.nifi.controller.status.history.StatusHistoryDump;
|
||||
import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory;
|
||||
import org.apache.nifi.util.HttpExchangeUtils;
|
||||
import org.apache.nifi.runtime.util.HttpExchangeUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.runtime;
|
||||
|
||||
import org.apache.nifi.NiFiServer;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Management Server Provider abstracts loading configuration from System properties provided from Bootstrap Process
|
||||
*/
|
||||
class ManagementServerProvider {
|
||||
|
||||
static final String MANAGEMENT_SERVER_ADDRESS = "org.apache.nifi.management.server.address";
|
||||
|
||||
private static final Pattern MANAGEMENT_SERVER_ADDRESS_PATTERN = Pattern.compile("^(.+?):([1-9][0-9]{3,4})$");
|
||||
|
||||
private static final String MANAGEMENT_SERVER_DEFAULT_ADDRESS = "127.0.0.1:52020";
|
||||
|
||||
private static final int ADDRESS_GROUP = 1;
|
||||
|
||||
private static final int PORT_GROUP = 2;
|
||||
|
||||
static ManagementServer getManagementServer(final NiFiServer nifiServer) {
|
||||
final String managementServerAddressProperty = System.getProperty(MANAGEMENT_SERVER_ADDRESS, MANAGEMENT_SERVER_DEFAULT_ADDRESS);
|
||||
final Matcher matcher = MANAGEMENT_SERVER_ADDRESS_PATTERN.matcher(managementServerAddressProperty);
|
||||
if (matcher.matches()) {
|
||||
final String addressGroup = matcher.group(ADDRESS_GROUP);
|
||||
final String portGroup = matcher.group(PORT_GROUP);
|
||||
final int port = Integer.parseInt(portGroup);
|
||||
|
||||
final InetSocketAddress bindAddress = new InetSocketAddress(addressGroup, port);
|
||||
return new StandardManagementServer(bindAddress, nifiServer);
|
||||
} else {
|
||||
throw new IllegalStateException("Management Server Address System Property [%s] not valid [%s]".formatted(MANAGEMENT_SERVER_ADDRESS, managementServerAddressProperty));
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.runtime;
|
||||
|
||||
import org.apache.nifi.util.NiFiBootstrapPropertiesLoader;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
/**
|
||||
* Properties Provider abstracts loading Application Properties based on available Class Loader
|
||||
*/
|
||||
class PropertiesProvider {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PropertiesProvider.class);
|
||||
|
||||
private static final String LOADER_CLASS = "org.apache.nifi.properties.NiFiPropertiesLoader";
|
||||
|
||||
private static final String GET_METHOD = "get";
|
||||
|
||||
/**
|
||||
* Read Properties without using Properties Loader
|
||||
*
|
||||
* @return Application Properties
|
||||
*/
|
||||
static NiFiProperties readProperties() {
|
||||
final NiFiBootstrapPropertiesLoader bootstrapPropertiesLoader = new NiFiBootstrapPropertiesLoader();
|
||||
final String propertiesFilePath = bootstrapPropertiesLoader.getDefaultApplicationPropertiesFilePath();
|
||||
logger.info("Loading Application Properties [{}]", propertiesFilePath);
|
||||
return NiFiProperties.createBasicNiFiProperties(propertiesFilePath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read Properties using Properties Loader provided in Class Loader
|
||||
*
|
||||
* @param propertiesClassLoader Properties Class Loader from Framework NAR
|
||||
* @return Application Properties
|
||||
*/
|
||||
static NiFiProperties readProperties(final ClassLoader propertiesClassLoader) {
|
||||
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(propertiesClassLoader);
|
||||
|
||||
try {
|
||||
final Class<?> loaderClass = Class.forName(LOADER_CLASS, true, propertiesClassLoader);
|
||||
final Object loader = loaderClass.getConstructor().newInstance();
|
||||
final Method getMethod = loaderClass.getMethod(GET_METHOD);
|
||||
return (NiFiProperties) getMethod.invoke(loader);
|
||||
} catch (final Exception e) {
|
||||
throw new IllegalStateException("Application Properties loading failed", e);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(contextClassLoader);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.runtime;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Standard implementation of Uncaught Exception Handler with logging
|
||||
*/
|
||||
public class StandardUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
|
||||
private static final Logger logger = LoggerFactory.getLogger(StandardUncaughtExceptionHandler.class);
|
||||
|
||||
@Override
|
||||
public void uncaughtException(final Thread thread, final Throwable e) {
|
||||
logger.error("Uncaught Exception in Thread [{}] ID [{}]", thread.getName(), thread.threadId(), e);
|
||||
}
|
||||
}
|
@ -0,0 +1,151 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.runtime.command;
|
||||
|
||||
import org.apache.nifi.NiFiServer;
|
||||
import org.apache.nifi.diagnostics.DiagnosticsDump;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Comparator;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.ToLongFunction;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Diagnostics Command abstracted for invocation in Shutdown Command
|
||||
*/
|
||||
public class DiagnosticsCommand implements Runnable {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DiagnosticsCommand.class);
|
||||
|
||||
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss");
|
||||
|
||||
private static final String DIAGNOSTICS_FILE_FORMAT = "diagnostic-%s.log";
|
||||
|
||||
private final NiFiProperties properties;
|
||||
|
||||
private final NiFiServer server;
|
||||
|
||||
public DiagnosticsCommand(final NiFiProperties properties, final NiFiServer server) {
|
||||
this.properties = Objects.requireNonNull(properties, "Properties required");
|
||||
this.server = Objects.requireNonNull(server, "Server required");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (properties.isDiagnosticsOnShutdownEnabled()) {
|
||||
final File diagnosticDirectory = new File(properties.getDiagnosticsOnShutdownDirectory());
|
||||
if (diagnosticDirectory.mkdir()) {
|
||||
logger.info("Diagnostics Directory created [{}]", diagnosticDirectory);
|
||||
}
|
||||
purgeOldestFiles(diagnosticDirectory);
|
||||
|
||||
final String localDateTime = DATE_TIME_FORMATTER.format(LocalDateTime.now());
|
||||
final String diagnosticsFileName = DIAGNOSTICS_FILE_FORMAT.formatted(localDateTime);
|
||||
final File diagnosticsPath = new File(diagnosticDirectory, diagnosticsFileName);
|
||||
writeDiagnostics(diagnosticsPath);
|
||||
}
|
||||
}
|
||||
|
||||
private void purgeOldestFiles(final File diagnosticDirectory) {
|
||||
final long maxSize = DataUnit.parseDataSize(properties.getDiagnosticsOnShutdownDirectoryMaxSize(), DataUnit.B).longValue();
|
||||
final int maxFileCount = properties.getDiagnosticsOnShutdownMaxFileCount();
|
||||
while (isFileCountExceeded(diagnosticDirectory, maxFileCount) || isSizeExceeded(diagnosticDirectory, maxSize)) {
|
||||
try {
|
||||
final Path oldestFile = getOldestFile(diagnosticDirectory);
|
||||
Files.delete(oldestFile);
|
||||
} catch (final IOException e) {
|
||||
logger.warn("Delete oldest diagnostics failed", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeDiagnostics(final File diagnosticsPath) {
|
||||
final DiagnosticsDump diagnosticsDump = server.getDiagnosticsFactory().create(properties.isDiagnosticsOnShutdownVerbose());
|
||||
try (final OutputStream fileOutputStream = new FileOutputStream(diagnosticsPath)) {
|
||||
diagnosticsDump.writeTo(fileOutputStream);
|
||||
} catch (final IOException e) {
|
||||
logger.warn("Write Diagnostics failed [{}]", diagnosticsPath, e);
|
||||
}
|
||||
}
|
||||
|
||||
private Path getOldestFile(final File diagnosticDirectory) throws IOException {
|
||||
final Comparator<? super Path> lastModifiedComparator = Comparator.comparingLong(p -> p.toFile().lastModified());
|
||||
|
||||
final Optional<Path> oldestFile;
|
||||
|
||||
try (Stream<Path> paths = Files.walk(diagnosticDirectory.toPath())) {
|
||||
oldestFile = paths
|
||||
.filter(Files::isRegularFile)
|
||||
.min(lastModifiedComparator);
|
||||
}
|
||||
|
||||
return oldestFile.orElseThrow(
|
||||
() -> new RuntimeException(String.format("Could not find oldest file in diagnostic directory: %s", diagnosticDirectory))
|
||||
);
|
||||
}
|
||||
|
||||
private boolean isFileCountExceeded(final File diagnosticDirectory, final int maxFileCount) {
|
||||
final String[] fileNames = diagnosticDirectory.list();
|
||||
if (fileNames == null) {
|
||||
logger.warn("Diagnostics Directory [{}] listing files failed", diagnosticDirectory);
|
||||
return false;
|
||||
}
|
||||
return fileNames.length >= maxFileCount;
|
||||
}
|
||||
|
||||
private boolean isSizeExceeded(final File diagnosticDirectory, final long maxSizeInBytes) {
|
||||
return getDirectorySize(diagnosticDirectory.toPath()) >= maxSizeInBytes;
|
||||
}
|
||||
|
||||
private long getDirectorySize(Path path) {
|
||||
long size = 0;
|
||||
try (Stream<Path> walk = Files.walk(path)) {
|
||||
size = walk
|
||||
.filter(Files::isRegularFile)
|
||||
.mapToLong(getFileSizeByPathFunction())
|
||||
.sum();
|
||||
|
||||
} catch (IOException e) {
|
||||
logger.warn("Directory [{}] size calculation failed", path, e);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
private ToLongFunction<Path> getFileSizeByPathFunction() {
|
||||
return path -> {
|
||||
try {
|
||||
return Files.size(path);
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to get size of file {}", path, e);
|
||||
return 0L;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.runtime.command;
|
||||
|
||||
import org.apache.nifi.NiFiServer;
|
||||
import org.apache.nifi.runtime.ManagementServer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Shutdown Command registered as a Shutdown Hook
|
||||
*/
|
||||
public class ShutdownCommand implements Runnable {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ShutdownCommand.class);
|
||||
|
||||
private final NiFiServer applicationServer;
|
||||
|
||||
private final ManagementServer managementServer;
|
||||
|
||||
private final Runnable diagnosticsCommand;
|
||||
|
||||
public ShutdownCommand(final NiFiServer applicationServer, final ManagementServer managementServer, final Runnable diagnosticsCommand) {
|
||||
this.applicationServer = Objects.requireNonNull(applicationServer, "Application Server required");
|
||||
this.managementServer = Objects.requireNonNull(managementServer, "Management Server required");
|
||||
this.diagnosticsCommand = Objects.requireNonNull(diagnosticsCommand, "Diagnostics Command required");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
logger.info("Application shutdown started");
|
||||
|
||||
try {
|
||||
diagnosticsCommand.run();
|
||||
} catch (final Throwable e) {
|
||||
logger.warn("Diagnostics Command failed", e);
|
||||
}
|
||||
|
||||
try {
|
||||
managementServer.stop();
|
||||
} catch (final Throwable e) {
|
||||
logger.warn("Management Server shutdown failed", e);
|
||||
}
|
||||
|
||||
try {
|
||||
applicationServer.stop();
|
||||
} catch (final Throwable e) {
|
||||
logger.warn("Application Server shutdown failed", e);
|
||||
}
|
||||
|
||||
logger.info("Application shutdown completed");
|
||||
}
|
||||
}
|
@ -15,7 +15,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.util;
|
||||
package org.apache.nifi.runtime.util;
|
||||
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import org.slf4j.Logger;
|
@ -1,98 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.util;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Comparator;
|
||||
import java.util.Optional;
|
||||
import java.util.function.ToLongFunction;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public final class DiagnosticUtils {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DiagnosticUtils.class);
|
||||
|
||||
private DiagnosticUtils() {
|
||||
// utility class, not meant to be instantiated
|
||||
}
|
||||
|
||||
public static Path getOldestFile(final String diagnosticDirectoryPath) throws IOException {
|
||||
Comparator<? super Path> lastModifiedComparator = Comparator.comparingLong(p -> p.toFile().lastModified());
|
||||
|
||||
final Optional<Path> oldestFile;
|
||||
|
||||
try (Stream<Path> paths = Files.walk(Paths.get(diagnosticDirectoryPath))) {
|
||||
oldestFile = paths
|
||||
.filter(Files::isRegularFile)
|
||||
.min(lastModifiedComparator);
|
||||
}
|
||||
|
||||
return oldestFile.orElseThrow(
|
||||
() -> new RuntimeException(String.format("Could not find oldest file in diagnostic directory: %s", diagnosticDirectoryPath)));
|
||||
}
|
||||
|
||||
public static boolean isFileCountExceeded(final String diagnosticDirectoryPath, final int maxFileCount) {
|
||||
final String[] fileNames = new File(diagnosticDirectoryPath).list();
|
||||
if (fileNames == null) {
|
||||
logger.error("The diagnostic directory path provided is either invalid or not permitted to be listed.");
|
||||
return false;
|
||||
}
|
||||
return fileNames.length >= maxFileCount;
|
||||
}
|
||||
|
||||
public static boolean isSizeExceeded(final String diagnosticDirectoryPath, final long maxSizeInBytes) {
|
||||
return getDirectorySize(Paths.get(diagnosticDirectoryPath)) >= maxSizeInBytes;
|
||||
}
|
||||
|
||||
|
||||
public static boolean createDiagnosticDirectory(final String diagnosticDirectoryPath) {
|
||||
File file = new File(diagnosticDirectoryPath);
|
||||
return file.mkdir();
|
||||
}
|
||||
|
||||
private static long getDirectorySize(Path path) {
|
||||
long size = 0;
|
||||
try (Stream<Path> walk = Files.walk(path)) {
|
||||
size = walk
|
||||
.filter(Files::isRegularFile)
|
||||
.mapToLong(getFileSizeByPathFunction())
|
||||
.sum();
|
||||
|
||||
} catch (IOException e) {
|
||||
logger.error("Directory [{}] size calculation failed", path, e);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
private static ToLongFunction<Path> getFileSizeByPathFunction() {
|
||||
return path -> {
|
||||
try {
|
||||
return Files.size(path);
|
||||
} catch (IOException e) {
|
||||
logger.error("Failed to get size of file {}", path, e);
|
||||
return 0L;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.runtime;
|
||||
|
||||
import org.apache.nifi.NiFiServer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class ManagementServerProviderTest {
|
||||
private static final String ADDRESS_WITHOUT_PORT = "127.0.0.1";
|
||||
|
||||
@Mock
|
||||
private NiFiServer server;
|
||||
|
||||
@Test
|
||||
void testGetManagementServer() {
|
||||
final ManagementServer managementServer = ManagementServerProvider.getManagementServer(server);
|
||||
|
||||
assertNotNull(managementServer);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetManagementServerPropertyNotValid() {
|
||||
System.setProperty(ManagementServerProvider.MANAGEMENT_SERVER_ADDRESS, ADDRESS_WITHOUT_PORT);
|
||||
try {
|
||||
final IllegalStateException exception = assertThrows(IllegalStateException.class, () -> ManagementServerProvider.getManagementServer(server));
|
||||
|
||||
assertTrue(exception.getMessage().contains(ManagementServerProvider.MANAGEMENT_SERVER_ADDRESS));
|
||||
} finally {
|
||||
System.clearProperty(ManagementServerProvider.MANAGEMENT_SERVER_ADDRESS);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,101 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.runtime.command;
|
||||
|
||||
import org.apache.nifi.NiFiServer;
|
||||
import org.apache.nifi.diagnostics.DiagnosticsDump;
|
||||
import org.apache.nifi.diagnostics.DiagnosticsFactory;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class DiagnosticsCommandTest {
|
||||
private static final String DIRECTORY_MAX_SIZE = "1 MB";
|
||||
|
||||
private static final int MAX_FILE_COUNT = 1;
|
||||
|
||||
private static final String DUMP_CONTENT = StandardDiagnosticsDump.class.getSimpleName();
|
||||
|
||||
@Mock
|
||||
private NiFiServer server;
|
||||
|
||||
@Mock
|
||||
private NiFiProperties properties;
|
||||
|
||||
@Mock
|
||||
private DiagnosticsFactory diagnosticsFactory;
|
||||
|
||||
private DiagnosticsCommand command;
|
||||
|
||||
@BeforeEach
|
||||
void setCommand() {
|
||||
command = new DiagnosticsCommand(properties, server);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRunDiagnosticsDisabled() {
|
||||
when(properties.isDiagnosticsOnShutdownEnabled()).thenReturn(false);
|
||||
|
||||
command.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRunDiagnosticsEnabled(@TempDir final File tempDir) throws IOException {
|
||||
when(properties.isDiagnosticsOnShutdownEnabled()).thenReturn(true);
|
||||
when(properties.getDiagnosticsOnShutdownDirectory()).thenReturn(tempDir.getAbsolutePath());
|
||||
when(properties.getDiagnosticsOnShutdownDirectoryMaxSize()).thenReturn(DIRECTORY_MAX_SIZE);
|
||||
when(properties.getDiagnosticsOnShutdownMaxFileCount()).thenReturn(MAX_FILE_COUNT);
|
||||
|
||||
when(server.getDiagnosticsFactory()).thenReturn(diagnosticsFactory);
|
||||
when(diagnosticsFactory.create(anyBoolean())).thenReturn(new StandardDiagnosticsDump());
|
||||
|
||||
command.run();
|
||||
|
||||
final File[] files = tempDir.listFiles();
|
||||
|
||||
assertNotNull(files);
|
||||
assertEquals(MAX_FILE_COUNT, files.length);
|
||||
|
||||
final File file = files[0];
|
||||
final String content = Files.readString(file.toPath());
|
||||
assertEquals(DUMP_CONTENT, content);
|
||||
}
|
||||
|
||||
private static class StandardDiagnosticsDump implements DiagnosticsDump {
|
||||
|
||||
@Override
|
||||
public void writeTo(final OutputStream outputStream) throws IOException {
|
||||
outputStream.write(DUMP_CONTENT.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,87 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.runtime.command;
|
||||
|
||||
import org.apache.nifi.NiFiServer;
|
||||
import org.apache.nifi.runtime.ManagementServer;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class ShutdownCommandTest {
|
||||
@Mock
|
||||
private NiFiServer server;
|
||||
|
||||
@Mock
|
||||
private ManagementServer managementServer;
|
||||
|
||||
@Mock
|
||||
private DiagnosticsCommand diagnosticsCommand;
|
||||
|
||||
private ShutdownCommand command;
|
||||
|
||||
@BeforeEach
|
||||
void setCommand() {
|
||||
command = new ShutdownCommand(server, managementServer, diagnosticsCommand);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRun() {
|
||||
command.run();
|
||||
|
||||
assertStopped();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRunDiagnosticsException() {
|
||||
doThrow(new RuntimeException()).when(diagnosticsCommand).run();
|
||||
|
||||
command.run();
|
||||
|
||||
assertStopped();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRunManagementServerException() {
|
||||
doThrow(new RuntimeException()).when(managementServer).stop();
|
||||
|
||||
command.run();
|
||||
|
||||
assertStopped();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRunApplicationServerException() {
|
||||
doThrow(new RuntimeException()).when(server).stop();
|
||||
|
||||
command.run();
|
||||
|
||||
assertStopped();
|
||||
}
|
||||
|
||||
private void assertStopped() {
|
||||
verify(diagnosticsCommand).run();
|
||||
verify(managementServer).stop();
|
||||
verify(server).stop();
|
||||
}
|
||||
}
|
@ -98,6 +98,7 @@ import org.apache.nifi.ui.extension.UiExtension;
|
||||
import org.apache.nifi.ui.extension.UiExtensionMapping;
|
||||
import org.apache.nifi.ui.extension.contentviewer.ContentViewer;
|
||||
import org.apache.nifi.ui.extension.contentviewer.SupportedMimeTypes;
|
||||
import org.apache.nifi.util.FileUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.web.ContentAccess;
|
||||
import org.apache.nifi.web.NiFiWebConfigurationContext;
|
||||
@ -220,6 +221,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
|
||||
}
|
||||
|
||||
public void init() {
|
||||
clearWorkingDirectory();
|
||||
final QueuedThreadPool threadPool = new QueuedThreadPool(props.getWebThreads());
|
||||
threadPool.setName("NiFi Web Server");
|
||||
this.server = new Server(threadPool);
|
||||
@ -258,6 +260,17 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
|
||||
return handlerCollection;
|
||||
}
|
||||
|
||||
private void clearWorkingDirectory() {
|
||||
// Clear the working directory to ensure that Jetty loads the latest WAR application files
|
||||
final File webWorkingDir = props.getWebWorkingDirectory();
|
||||
try {
|
||||
FileUtils.deleteFilesInDirectory(webWorkingDir, null, logger, true, true);
|
||||
} catch (final IOException e) {
|
||||
logger.warn("Clear Working Directory failed [{}]", webWorkingDir, e);
|
||||
}
|
||||
FileUtils.deleteFile(webWorkingDir, logger, 3);
|
||||
}
|
||||
|
||||
private Handler loadInitialWars(final Set<Bundle> bundles) {
|
||||
final Map<File, Bundle> warToBundleLookup = findWars(bundles);
|
||||
|
||||
|
@ -21,6 +21,9 @@ working.dir=./target/node1
|
||||
# How long to wait after telling NiFi to shutdown before explicitly killing the Process
|
||||
graceful.shutdown.seconds=20
|
||||
|
||||
# Runtime Management Server Address
|
||||
management.server.address=http://127.0.0.1:56710
|
||||
|
||||
# JVM memory settings
|
||||
java.arg.2=-Xms512m
|
||||
java.arg.3=-Xmx512m
|
||||
|
@ -21,6 +21,9 @@ working.dir=./target/node2
|
||||
# How long to wait after telling NiFi to shutdown before explicitly killing the Process
|
||||
graceful.shutdown.seconds=20
|
||||
|
||||
# Runtime Management Server Address
|
||||
management.server.address=http://127.0.0.1:56720
|
||||
|
||||
# JVM memory settings
|
||||
java.arg.2=-Xms512m
|
||||
java.arg.3=-Xmx512m
|
||||
|
@ -21,6 +21,9 @@ working.dir=./target/standalone-instance
|
||||
# How long to wait after telling NiFi to shutdown before explicitly killing the Process
|
||||
graceful.shutdown.seconds=20
|
||||
|
||||
# Runtime Management Server Address
|
||||
management.server.address=http://127.0.0.1:56730
|
||||
|
||||
# JVM memory settings
|
||||
java.arg.2=-Xms512m
|
||||
java.arg.3=-Xmx512m
|
||||
|
@ -21,6 +21,9 @@ working.dir=./target/pythonic-instance
|
||||
# How long to wait after telling NiFi to shutdown before explicitly killing the Process
|
||||
graceful.shutdown.seconds=20
|
||||
|
||||
# Runtime Management Server Address
|
||||
management.server.address=http://127.0.0.1:56730
|
||||
|
||||
# JVM memory settings
|
||||
java.arg.2=-Xms512m
|
||||
java.arg.3=-Xmx512m
|
||||
|
Loading…
x
Reference in New Issue
Block a user