diff --git a/nifi/nifi-assembly/src/main/assembly/dependencies.xml b/nifi/nifi-assembly/src/main/assembly/dependencies.xml index 27eb32dd8c..a3e3a18597 100644 --- a/nifi/nifi-assembly/src/main/assembly/dependencies.xml +++ b/nifi/nifi-assembly/src/main/assembly/dependencies.xml @@ -49,6 +49,8 @@ true nifi-bootstrap + slf4j-api + logback-classic diff --git a/nifi/nifi-bootstrap/pom.xml b/nifi/nifi-bootstrap/pom.xml index bdf508928b..ff27fd3c5c 100644 --- a/nifi/nifi-bootstrap/pom.xml +++ b/nifi/nifi-bootstrap/pom.xml @@ -21,4 +21,11 @@ nifi-bootstrap jar + + + + org.slf4j + slf4j-api + + diff --git a/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java b/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java index 171347880e..8d74f16a89 100644 --- a/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java +++ b/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java @@ -23,6 +23,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.nifi.bootstrap.util.LimitingInputStream; @@ -40,6 +41,7 @@ public class NiFiListener { listener = new Listener(serverSocket, runner); final Thread listenThread = new Thread(listener); listenThread.setName("Listen to NiFi"); + listenThread.setDaemon(true); listenThread.start(); return localPort; } @@ -62,7 +64,16 @@ public class NiFiListener { public Listener(final ServerSocket serverSocket, final RunNiFi runner) { this.serverSocket = serverSocket; - this.executor = Executors.newFixedThreadPool(2); + this.executor = Executors.newFixedThreadPool(2, new ThreadFactory() { + @Override + public Thread newThread(final Runnable runnable) { + final Thread t = Executors.defaultThreadFactory().newThread(runnable); + t.setDaemon(true); + t.setName("NiFi Bootstrap Command Listener"); + return t; + } + }); + this.runner = runner; } diff --git a/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index e6b1bc5039..2bc44cc726 100644 --- a/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -41,13 +41,17 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.ConsoleHandler; -import java.util.logging.Handler; -import java.util.logging.Level; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @@ -94,19 +98,27 @@ public class RunNiFi { private final File bootstrapConfigFile; - private final java.util.logging.Logger logger; + // used for logging initial info; these will be logged to console by default when the app is started + private final Logger cmdLogger = LoggerFactory.getLogger("org.apache.nifi.bootstrap.Command"); + // used for logging all info. These by default will be written to the log file + private final Logger defaultLogger = LoggerFactory.getLogger(RunNiFi.class); + + + private final ExecutorService loggingExecutor; + private volatile Set> loggingFutures = new HashSet<>(2); public RunNiFi(final File bootstrapConfigFile, final boolean verbose) { this.bootstrapConfigFile = bootstrapConfigFile; - logger = java.util.logging.Logger.getLogger("Bootstrap"); - if (verbose) { - logger.info("Enabling Verbose Output"); - logger.setLevel(Level.FINE); - final Handler handler = new ConsoleHandler(); - handler.setLevel(Level.FINE); - logger.addHandler(handler); - } + loggingExecutor = Executors.newFixedThreadPool(2, new ThreadFactory() { + @Override + public Thread newThread(final Runnable runnable) { + final Thread t = Executors.defaultThreadFactory().newThread(runnable); + t.setDaemon(true); + t.setName("NiFi logging handler"); + return t; + } + }); } private static void printUsage() { @@ -185,10 +197,10 @@ public class RunNiFi { switch (cmd.toLowerCase()) { case "start": - runNiFi.start(false); + runNiFi.start(); break; case "run": - runNiFi.start(true); + runNiFi.start(); break; case "stop": runNiFi.stop(); @@ -198,7 +210,7 @@ public class RunNiFi { break; case "restart": runNiFi.stop(); - runNiFi.start(false); + runNiFi.start(); break; case "dump": runNiFi.dump(dumpFile); @@ -206,40 +218,44 @@ public class RunNiFi { } } - public File getStatusFile() { + File getStatusFile() { + return getStatusFile(defaultLogger); + } + + public File getStatusFile(final Logger logger) { final File confDir = bootstrapConfigFile.getParentFile(); final File nifiHome = confDir.getParentFile(); final File bin = new File(nifiHome, "bin"); final File statusFile = new File(bin, "nifi.pid"); - logger.log(Level.FINE, "Status File: {0}", statusFile); + logger.debug("Status File: {}", statusFile); return statusFile; } - private Properties loadProperties() throws IOException { + private Properties loadProperties(final Logger logger) throws IOException { final Properties props = new Properties(); - final File statusFile = getStatusFile(); + final File statusFile = getStatusFile(logger); if (statusFile == null || !statusFile.exists()) { - logger.fine("No status file to load properties from"); + logger.debug("No status file to load properties from"); return props; } - try (final FileInputStream fis = new FileInputStream(getStatusFile())) { + try (final FileInputStream fis = new FileInputStream(getStatusFile(logger))) { props.load(fis); } final Map modified = new HashMap<>(props); modified.remove("secret.key"); - logger.log(Level.FINE, "Properties: {0}", modified); + logger.debug("Properties: {}", modified); return props; } - private synchronized void saveProperties(final Properties nifiProps) throws IOException { - final File statusFile = getStatusFile(); + private synchronized void saveProperties(final Properties nifiProps, final Logger logger) throws IOException { + final File statusFile = getStatusFile(logger); if (statusFile.exists() && !statusFile.delete()) { - logger.log(Level.WARNING, "Failed to delete {0}", statusFile); + logger.warn("Failed to delete {}", statusFile); } if (!statusFile.createNewFile()) { @@ -252,7 +268,7 @@ public class RunNiFi { perms.add(PosixFilePermission.OWNER_WRITE); Files.setPosixFilePermissions(statusFile.toPath(), perms); } catch (final Exception e) { - logger.log(Level.WARNING, "Failed to set permissions so that only the owner can read status file {0}; " + logger.warn("Failed to set permissions so that only the owner can read status file {}; " + "this may allows others to have access to the key needed to communicate with NiFi. " + "Permissions should be changed so that only the owner can read this file", statusFile); } @@ -262,23 +278,23 @@ public class RunNiFi { fos.getFD().sync(); } - logger.log(Level.FINE, "Saved Properties {0} to {1}", new Object[]{nifiProps, statusFile}); + logger.debug("Saved Properties {} to {}", new Object[]{nifiProps, statusFile}); } - private boolean isPingSuccessful(final int port, final String secretKey) { - logger.log(Level.FINE, "Pinging {0}", port); + private boolean isPingSuccessful(final int port, final String secretKey, final Logger logger) { + logger.debug("Pinging {}", port); try (final Socket socket = new Socket("localhost", port)) { final OutputStream out = socket.getOutputStream(); out.write((PING_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); - logger.fine("Sent PING command"); + logger.debug("Sent PING command"); socket.setSoTimeout(5000); final InputStream in = socket.getInputStream(); final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); final String response = reader.readLine(); - logger.log(Level.FINE, "PING response: {0}", response); + logger.debug("PING response: {}", response); out.close(); reader.close(); @@ -288,27 +304,27 @@ public class RunNiFi { } } - private Integer getCurrentPort() throws IOException { - final Properties props = loadProperties(); + private Integer getCurrentPort(final Logger logger) throws IOException { + final Properties props = loadProperties(logger); final String portVal = props.getProperty("port"); if (portVal == null) { - logger.fine("No Port found in status file"); + logger.debug("No Port found in status file"); return null; } else { - logger.log(Level.FINE, "Port defined in status file: {0}", portVal); + logger.debug("Port defined in status file: {}", portVal); } final int port = Integer.parseInt(portVal); - final boolean success = isPingSuccessful(port, props.getProperty("secret.key")); + final boolean success = isPingSuccessful(port, props.getProperty("secret.key"), logger); if (success) { - logger.log(Level.FINE, "Successful PING on port {0}", port); + logger.debug("Successful PING on port {}", port); return port; } final String pid = props.getProperty("pid"); - logger.log(Level.FINE, "PID in status file is {0}", pid); + logger.debug("PID in status file is {}", pid); if (pid != null) { - final boolean procRunning = isProcessRunning(pid); + final boolean procRunning = isProcessRunning(pid, logger); if (procRunning) { return port; } else { @@ -319,7 +335,7 @@ public class RunNiFi { return null; } - private boolean isProcessRunning(final String pid) { + private boolean isProcessRunning(final String pid, final Logger logger) { try { // We use the "ps" command to check if the process is still running. final ProcessBuilder builder = new ProcessBuilder(); @@ -343,9 +359,9 @@ public class RunNiFi { // If output of the ps command had our PID, the process is running. if (running) { - logger.log(Level.FINE, "Process with PID {0} is running", pid); + logger.debug("Process with PID {} is running", pid); } else { - logger.log(Level.FINE, "Process with PID {0} is not running", pid); + logger.debug("Process with PID {} is not running", pid); } return running; @@ -355,10 +371,10 @@ public class RunNiFi { } } - private Status getStatus() { + private Status getStatus(final Logger logger) { final Properties props; try { - props = loadProperties(); + props = loadProperties(logger); } catch (final IOException ioe) { return new Status(null, null, false, false); } @@ -380,7 +396,7 @@ public class RunNiFi { if (portValue != null) { try { port = Integer.parseInt(portValue); - pingSuccess = isPingSuccessful(port, secretKey); + pingSuccess = isPingSuccessful(port, secretKey, logger); } catch (final NumberFormatException nfe) { return new Status(null, null, false, false); } @@ -390,20 +406,21 @@ public class RunNiFi { return new Status(port, pid, true, true); } - final boolean alive = (pid == null) ? false : isProcessRunning(pid); + final boolean alive = (pid == null) ? false : isProcessRunning(pid, logger); return new Status(port, pid, pingSuccess, alive); } public void status() throws IOException { - final Status status = getStatus(); + final Logger logger = cmdLogger; + final Status status = getStatus(logger); if (status.isRespondingToPing()) { - logger.log(Level.INFO, "Apache NiFi is currently running, listening to Bootstrap on port {0}, PID={1}", + logger.info("Apache NiFi is currently running, listening to Bootstrap on port {}, PID={}", new Object[]{status.getPort(), status.getPid() == null ? "unknkown" : status.getPid()}); return; } if (status.isProcessRunning()) { - logger.log(Level.INFO, "Apache NiFi is running at PID {0} but is not responding to ping requests", status.getPid()); + logger.info("Apache NiFi is running at PID {} but is not responding to ping requests", status.getPid()); return; } @@ -427,36 +444,36 @@ public class RunNiFi { * @throws IOException if any issues occur while writing the dump file */ public void dump(final File dumpFile) throws IOException { - final Integer port = getCurrentPort(); + final Logger logger = defaultLogger; // dump to bootstrap log file by default + final Integer port = getCurrentPort(logger); if (port == null) { - System.out.println("Apache NiFi is not currently running"); + logger.info("Apache NiFi is not currently running"); return; } - final Properties nifiProps = loadProperties(); + final Properties nifiProps = loadProperties(logger); final String secretKey = nifiProps.getProperty("secret.key"); final StringBuilder sb = new StringBuilder(); try (final Socket socket = new Socket()) { - logger.fine("Connecting to NiFi instance"); + logger.debug("Connecting to NiFi instance"); socket.setSoTimeout(60000); socket.connect(new InetSocketAddress("localhost", port)); - logger.fine("Established connection to NiFi instance."); + logger.debug("Established connection to NiFi instance."); socket.setSoTimeout(60000); - logger.log(Level.FINE, "Sending DUMP Command to port {0}", port); + logger.debug("Sending DUMP Command to port {}", port); final OutputStream out = socket.getOutputStream(); out.write((DUMP_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); - out.close(); final InputStream in = socket.getInputStream(); - final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - String line; - while ((line = reader.readLine()) != null) { - sb.append(line).append("\n"); + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + String line; + while ((line = reader.readLine()) != null) { + sb.append(line).append("\n"); + } } - reader.close(); } final String dump = sb.toString(); @@ -466,28 +483,35 @@ public class RunNiFi { try (final FileOutputStream fos = new FileOutputStream(dumpFile)) { fos.write(dump.getBytes(StandardCharsets.UTF_8)); } - logger.log(Level.INFO, "Successfully wrote thread dump to {0}", dumpFile.getAbsolutePath()); + // we want to log to the console (by default) that we wrote the thread dump to the specified file + cmdLogger.info("Successfully wrote thread dump to {}", dumpFile.getAbsolutePath()); } } public void stop() throws IOException { - final Integer port = getCurrentPort(); + final Logger logger = cmdLogger; + final Integer port = getCurrentPort(logger); if (port == null) { - System.out.println("Apache NiFi is not currently running"); + logger.info("Apache NiFi is not currently running"); return; } - final Properties nifiProps = loadProperties(); + final Properties nifiProps = loadProperties(logger); final String secretKey = nifiProps.getProperty("secret.key"); + final File statusFile = getStatusFile(logger); + if (statusFile.exists() && !statusFile.delete()) { + logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile); + } + try (final Socket socket = new Socket()) { - logger.fine("Connecting to NiFi instance"); + logger.debug("Connecting to NiFi instance"); socket.setSoTimeout(60000); socket.connect(new InetSocketAddress("localhost", port)); - logger.fine("Established connection to NiFi instance."); + logger.debug("Established connection to NiFi instance."); socket.setSoTimeout(60000); - logger.log(Level.FINE, "Sending SHUTDOWN Command to port {0}", port); + logger.debug("Sending SHUTDOWN Command to port {}", port); final OutputStream out = socket.getOutputStream(); out.write((SHUTDOWN_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); @@ -501,7 +525,7 @@ public class RunNiFi { } final String response = sb.toString().trim(); - logger.log(Level.FINE, "Received response to SHUTDOWN command: {0}", response); + logger.debug("Received response to SHUTDOWN command: {}", response); if (SHUTDOWN_CMD.equals(response)) { logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now"); @@ -522,17 +546,17 @@ public class RunNiFi { } final long startWait = System.nanoTime(); - while (isProcessRunning(pid)) { + while (isProcessRunning(pid, logger)) { logger.info("Waiting for Apache NiFi to finish shutting down..."); final long waitNanos = System.nanoTime() - startWait; final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) { - if (isProcessRunning(pid)) { - logger.log(Level.WARNING, "NiFi has not finished shutting down after {0} seconds. Killing process.", gracefulShutdownSeconds); + if (isProcessRunning(pid, logger)) { + logger.warn("NiFi has not finished shutting down after {} seconds. Killing process.", gracefulShutdownSeconds); try { - killProcessTree(pid); + killProcessTree(pid, logger); } catch (final IOException ioe) { - logger.log(Level.SEVERE, "Failed to kill Process with PID {0}", pid); + logger.error("Failed to kill Process with PID {}", pid); } } break; @@ -546,16 +570,11 @@ public class RunNiFi { logger.info("NiFi has finished shutting down."); } - - final File statusFile = getStatusFile(); - if (!statusFile.delete()) { - logger.log(Level.SEVERE, "Failed to delete status file {0}; this file should be cleaned up manually", statusFile); - } } else { - logger.log(Level.SEVERE, "When sending SHUTDOWN command to NiFi, got unexpected response {0}", response); + logger.error("When sending SHUTDOWN command to NiFi, got unexpected response {}", response); } } catch (final IOException ioe) { - logger.log(Level.SEVERE, "Failed to send shutdown command to port {0} due to {1}", new Object[]{port, ioe}); + logger.error("Failed to send shutdown command to port {} due to {}", new Object[]{port, ioe.toString(), ioe}); } } @@ -574,14 +593,14 @@ public class RunNiFi { return childPids; } - private void killProcessTree(final String pid) throws IOException { - logger.log(Level.FINE, "Killing Process Tree for PID {0}", pid); + private void killProcessTree(final String pid, final Logger logger) throws IOException { + logger.debug("Killing Process Tree for PID {}", pid); final List children = getChildProcesses(pid); - logger.log(Level.FINE, "Children of PID {0}: {1}", new Object[]{pid, children}); + logger.debug("Children of PID {}: {}", new Object[]{pid, children}); for (final String childPid : children) { - killProcessTree(childPid); + killProcessTree(childPid, logger); } Runtime.getRuntime().exec(new String[]{"kill", "-9", pid}); @@ -597,10 +616,10 @@ public class RunNiFi { } @SuppressWarnings({"rawtypes", "unchecked"}) - public void start(final boolean monitor) throws IOException, InterruptedException { - final Integer port = getCurrentPort(); + public void start() throws IOException, InterruptedException { + final Integer port = getCurrentPort(cmdLogger); if (port != null) { - System.out.println("Apache NiFi is already running, listening to Bootstrap on port " + port); + cmdLogger.info("Apache NiFi is already running, listening to Bootstrap on port " + port); return; } @@ -739,122 +758,154 @@ public class RunNiFi { cmdBuilder.append(s).append(" "); } - logger.info("Starting Apache NiFi..."); - logger.log(Level.INFO, "Working Directory: {0}", workingDir.getAbsolutePath()); - logger.log(Level.INFO, "Command: {0}", cmdBuilder.toString()); + cmdLogger.info("Starting Apache NiFi..."); + cmdLogger.info("Working Directory: {}", workingDir.getAbsolutePath()); + cmdLogger.info("Command: {}", cmdBuilder.toString()); - if (monitor) { - String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP); - if (gracefulShutdown == null) { - gracefulShutdown = DEFAULT_GRACEFUL_SHUTDOWN_VALUE; - } + String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP); + if (gracefulShutdown == null) { + gracefulShutdown = DEFAULT_GRACEFUL_SHUTDOWN_VALUE; + } - final int gracefulShutdownSeconds; - try { - gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown); - } catch (final NumberFormatException nfe) { - throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " - + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); - } + final int gracefulShutdownSeconds; + try { + gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown); + } catch (final NumberFormatException nfe) { + throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " + + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); + } - if (gracefulShutdownSeconds < 0) { - throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " - + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); - } + if (gracefulShutdownSeconds < 0) { + throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " + + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); + } - Process process = builder.start(); - Long pid = getPid(process); - if (pid != null) { - nifiPid = pid; - final Properties nifiProps = new Properties(); - nifiProps.setProperty("pid", String.valueOf(nifiPid)); - saveProperties(nifiProps); - } + Process process = builder.start(); + handleLogging(process); + Long pid = getPid(process, cmdLogger); + if (pid != null) { + nifiPid = pid; + final Properties nifiProps = new Properties(); + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + saveProperties(nifiProps, cmdLogger); + } - shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds); - final Runtime runtime = Runtime.getRuntime(); - runtime.addShutdownHook(shutdownHook); + shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor); + final Runtime runtime = Runtime.getRuntime(); + runtime.addShutdownHook(shutdownHook); - while (true) { - final boolean alive = isAlive(process); + while (true) { + final boolean alive = isAlive(process); - if (alive) { - try { - Thread.sleep(1000L); - } catch (final InterruptedException ie) { - } - } else { - try { - runtime.removeShutdownHook(shutdownHook); - } catch (final IllegalStateException ise) { - // happens when already shutting down - } + if (alive) { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + } else { + try { + runtime.removeShutdownHook(shutdownHook); + } catch (final IllegalStateException ise) { + // happens when already shutting down + } - if (autoRestartNiFi) { - logger.warning("Apache NiFi appears to have died. Restarting..."); - process = builder.start(); - - pid = getPid(process); - if (pid != null) { - nifiPid = pid; - final Properties nifiProps = new Properties(); - nifiProps.setProperty("pid", String.valueOf(nifiPid)); - saveProperties(nifiProps); - } - - shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds); - runtime.addShutdownHook(shutdownHook); - - final boolean started = waitForStart(); - - if (started) { - logger.log(Level.INFO, "Successfully started Apache NiFi{0}", (pid == null ? "" : " with PID " + pid)); - } else { - logger.severe("Apache NiFi does not appear to have started"); - } - } else { + if (autoRestartNiFi) { + final File statusFile = getStatusFile(defaultLogger); + if (!statusFile.exists()) { + defaultLogger.debug("Status File no longer exists. Will not restart NiFi"); return; } + + defaultLogger.warn("Apache NiFi appears to have died. Restarting..."); + process = builder.start(); + handleLogging(process); + + pid = getPid(process, defaultLogger); + if (pid != null) { + nifiPid = pid; + final Properties nifiProps = new Properties(); + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + saveProperties(nifiProps, defaultLogger); + } + + shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor); + runtime.addShutdownHook(shutdownHook); + + final boolean started = waitForStart(); + + if (started) { + defaultLogger.info("Successfully started Apache NiFi{}", (pid == null ? "" : " with PID " + pid)); + } else { + defaultLogger.error("Apache NiFi does not appear to have started"); + } + } else { + return; } } - } else { - final Process process = builder.start(); - final Long pid = getPid(process); - - if (pid != null) { - nifiPid = pid; - final Properties nifiProps = new Properties(); - nifiProps.setProperty("pid", String.valueOf(nifiPid)); - saveProperties(nifiProps); - } - - boolean started = waitForStart(); - - if (started) { - logger.log(Level.INFO, "Successfully started Apache NiFi{0}", (pid == null ? "" : " with PID " + pid)); - } else { - logger.severe("Apache NiFi does not appear to have started"); - } - - listener.stop(); } } - private Long getPid(final Process process) { + private void handleLogging(final Process process) { + final Set> existingFutures = loggingFutures; + if (existingFutures != null) { + for (final Future future : existingFutures) { + future.cancel(false); + } + } + + final Future stdOutFuture = loggingExecutor.submit(new Runnable() { + @Override + public void run() { + final Logger stdOutLogger = LoggerFactory.getLogger("org.apache.nifi.StdOut"); + final InputStream in = process.getInputStream(); + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + String line; + while ((line = reader.readLine()) != null) { + stdOutLogger.info(line); + } + } catch (IOException e) { + defaultLogger.error("Failed to read from NiFi's Standard Out stream", e); + } + } + }); + + final Future stdErrFuture = loggingExecutor.submit(new Runnable() { + @Override + public void run() { + final Logger stdErrLogger = LoggerFactory.getLogger("org.apache.nifi.StdErr"); + final InputStream in = process.getErrorStream(); + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + String line; + while ((line = reader.readLine()) != null) { + stdErrLogger.error(line); + } + } catch (IOException e) { + defaultLogger.error("Failed to read from NiFi's Standard Error stream", e); + } + } + }); + + final Set> futures = new HashSet<>(); + futures.add(stdOutFuture); + futures.add(stdErrFuture); + this.loggingFutures = futures; + } + + private Long getPid(final Process process, final Logger logger) { try { final Class procClass = process.getClass(); final Field pidField = procClass.getDeclaredField("pid"); pidField.setAccessible(true); final Object pidObject = pidField.get(process); - logger.log(Level.FINE, "PID Object = {0}", pidObject); + logger.debug("PID Object = {}", pidObject); if (pidObject instanceof Number) { return ((Number) pidObject).longValue(); } return null; } catch (final IllegalAccessException | NoSuchFieldException nsfe) { - logger.log(Level.FINE, "Could not find PID for child process due to {0}", nsfe); + logger.debug("Could not find PID for child process due to {}", nsfe); return null; } } @@ -913,7 +964,7 @@ public class RunNiFi { shutdownHook.setSecretKey(secretKey); } - final File statusFile = getStatusFile(); + final File statusFile = getStatusFile(defaultLogger); final Properties nifiProps = new Properties(); if (nifiPid != -1) { @@ -923,12 +974,12 @@ public class RunNiFi { nifiProps.setProperty("secret.key", secretKey); try { - saveProperties(nifiProps); + saveProperties(nifiProps, defaultLogger); } catch (final IOException ioe) { - logger.log(Level.WARNING, "Apache NiFi has started but failed to persist NiFi Port information to {0} due to {1}", new Object[]{statusFile.getAbsolutePath(), ioe}); + defaultLogger.warn("Apache NiFi has started but failed to persist NiFi Port information to {} due to {}", new Object[]{statusFile.getAbsolutePath(), ioe}); } - logger.log(Level.INFO, "Apache NiFi now running and listening for Bootstrap requests on port {0}", port); + defaultLogger.info("Apache NiFi now running and listening for Bootstrap requests on port {}", port); } int getNiFiCommandControlPort() { diff --git a/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java b/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java index 023ab84e64..a594f60aaa 100644 --- a/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java +++ b/nifi/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; public class ShutdownHook extends Thread { @@ -28,14 +29,16 @@ public class ShutdownHook extends Thread { private final Process nifiProcess; private final RunNiFi runner; private final int gracefulShutdownSeconds; + private final ExecutorService executor; private volatile String secretKey; - public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final String secretKey, final int gracefulShutdownSeconds) { + public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final String secretKey, final int gracefulShutdownSeconds, final ExecutorService executor) { this.nifiProcess = nifiProcess; this.runner = runner; this.secretKey = secretKey; this.gracefulShutdownSeconds = gracefulShutdownSeconds; + this.executor = executor; } void setSecretKey(final String secretKey) { @@ -44,6 +47,7 @@ public class ShutdownHook extends Thread { @Override public void run() { + executor.shutdown(); runner.setAutoRestartNiFi(false); final int ccPort = runner.getNiFiCommandControlPort(); if (ccPort > 0) { diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/dump-nifi.bat b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/dump-nifi.bat index ab902d0074..f19f029c11 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/dump-nifi.bat +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/dump-nifi.bat @@ -39,9 +39,9 @@ set CONF_DIR=conf set BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf set JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% -set JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi +SET JAVA_PARAMS=-cp %CONF_DIR%;%LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi set BOOTSTRAP_ACTION=dump cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% -popd \ No newline at end of file +popd diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh index fb0d22e8e7..6d3191d9a7 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh @@ -159,7 +159,19 @@ run() { echo "Bootstrap Config File: $BOOTSTRAP_CONF" echo - exec "$JAVA" -cp "$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@ + # run 'start' in the background because the process will continue to run, monitoring NiFi. + # all other commands will terminate quickly so want to just wait for them + if [ "$1" = "start" ]; then + ("$JAVA" -cp "$NIFI_HOME"/conf/:"$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@ &) + else + "$JAVA" -cp "$NIFI_HOME"/conf/:"$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@ + fi + + # Wait just a bit (3 secs) to wait for the logging to finish and then echo a new-line. + # We do this to avoid having logs spewed on the console after running the command and then not giving + # control back to the user + sleep 3 + echo } main() { @@ -172,9 +184,14 @@ case "$1" in install) install "$@" ;; - start|stop|run|restart|status|dump) + start|stop|run|status|dump) main "$@" ;; + restart) + init + run "stop" + run "start" + ;; *) echo "Usage nifi {start|stop|run|restart|status|dump|install}" ;; diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/run-nifi.bat b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/run-nifi.bat index 8b56968c6c..e7708f09e9 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/run-nifi.bat +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/run-nifi.bat @@ -39,9 +39,9 @@ set CONF_DIR=conf set BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf set JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% -set JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi +SET JAVA_PARAMS=-cp %CONF_DIR%;%LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi set BOOTSTRAP_ACTION=run cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% -popd \ No newline at end of file +popd diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/start-nifi.bat b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/start-nifi.bat deleted file mode 100644 index 1c9599527e..0000000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/start-nifi.bat +++ /dev/null @@ -1,47 +0,0 @@ -@echo off -rem -rem Licensed to the Apache Software Foundation (ASF) under one or more -rem contributor license agreements. See the NOTICE file distributed with -rem this work for additional information regarding copyright ownership. -rem The ASF licenses this file to You under the Apache License, Version 2.0 -rem (the "License"); you may not use this file except in compliance with -rem the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software -rem distributed under the License is distributed on an "AS IS" BASIS, -rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -rem See the License for the specific language governing permissions and -rem limitations under the License. -rem - -rem Use JAVA_HOME if it's set; otherwise, just use java - -if "%JAVA_HOME%" == "" goto noJavaHome -if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome -set JAVA_EXE=%JAVA_HOME%\bin\java.exe -goto startNifi - -:noJavaHome -echo The JAVA_HOME environment variable is not defined correctly. -echo Instead the PATH will be used to find the java executable. -echo. -set JAVA_EXE=java -goto startNifi - -:startNifi -set NIFI_ROOT=%~dp0..\ -pushd "%NIFI_ROOT%" -set LIB_DIR=lib\bootstrap -set CONF_DIR=conf - -set BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf -set JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% - -set JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi -set BOOTSTRAP_ACTION=start - -cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% - -popd \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/stop-nifi.bat b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/stop-nifi.bat deleted file mode 100644 index 803b6f1162..0000000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/stop-nifi.bat +++ /dev/null @@ -1,47 +0,0 @@ -@echo off -rem -rem Licensed to the Apache Software Foundation (ASF) under one or more -rem contributor license agreements. See the NOTICE file distributed with -rem this work for additional information regarding copyright ownership. -rem The ASF licenses this file to You under the Apache License, Version 2.0 -rem (the "License"); you may not use this file except in compliance with -rem the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software -rem distributed under the License is distributed on an "AS IS" BASIS, -rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -rem See the License for the specific language governing permissions and -rem limitations under the License. -rem - -rem Use JAVA_HOME if it's set; otherwise, just use java - -if "%JAVA_HOME%" == "" goto noJavaHome -if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome -set JAVA_EXE=%JAVA_HOME%\bin\java.exe -goto startNifi - -:noJavaHome -echo The JAVA_HOME environment variable is not defined correctly. -echo Instead the PATH will be used to find the java executable. -echo. -set JAVA_EXE=java -goto startNifi - -:startNifi -set NIFI_ROOT=%~dp0..\ -pushd "%NIFI_ROOT%" -set LIB_DIR=lib\bootstrap -set CONF_DIR=conf - -set BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf -set JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% - -set JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi -set BOOTSTRAP_ACTION=stop - -cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% - -popd \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml index 296169e38f..36f42d736a 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml @@ -58,10 +58,35 @@ %date %level [%thread] %logger{40} %msg%n + + + logs/nifi-bootstrap.log + + + ./logs/nifi-bootstrap_%d.log + + 5 + + + %date %level [%thread] %logger{40} %msg%n + + + + + + %date %level [%thread] %logger{40} %msg%n + + + @@ -101,6 +126,29 @@ + + + + + + + + + + + + + + + + + + + + +