NIFI-488: Merged develop

This commit is contained in:
Mark Payne 2015-06-08 14:13:24 -04:00
parent 315af02c59
commit 69f04cbb86
11 changed files with 329 additions and 283 deletions

View File

@ -49,6 +49,8 @@
<useTransitiveFiltering>true</useTransitiveFiltering> <useTransitiveFiltering>true</useTransitiveFiltering>
<includes> <includes>
<include>nifi-bootstrap</include> <include>nifi-bootstrap</include>
<include>slf4j-api</include>
<include>logback-classic</include>
</includes> </includes>
</dependencySet> </dependencySet>

View File

@ -21,4 +21,11 @@
</parent> </parent>
<artifactId>nifi-bootstrap</artifactId> <artifactId>nifi-bootstrap</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
</project> </project>

View File

@ -23,6 +23,7 @@ import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.nifi.bootstrap.util.LimitingInputStream; import org.apache.nifi.bootstrap.util.LimitingInputStream;
@ -40,6 +41,7 @@ public class NiFiListener {
listener = new Listener(serverSocket, runner); listener = new Listener(serverSocket, runner);
final Thread listenThread = new Thread(listener); final Thread listenThread = new Thread(listener);
listenThread.setName("Listen to NiFi"); listenThread.setName("Listen to NiFi");
listenThread.setDaemon(true);
listenThread.start(); listenThread.start();
return localPort; return localPort;
} }
@ -62,7 +64,16 @@ public class NiFiListener {
public Listener(final ServerSocket serverSocket, final RunNiFi runner) { public Listener(final ServerSocket serverSocket, final RunNiFi runner) {
this.serverSocket = serverSocket; this.serverSocket = serverSocket;
this.executor = Executors.newFixedThreadPool(2); this.executor = Executors.newFixedThreadPool(2, new ThreadFactory() {
@Override
public Thread newThread(final Runnable runnable) {
final Thread t = Executors.defaultThreadFactory().newThread(runnable);
t.setDaemon(true);
t.setName("NiFi Bootstrap Command Listener");
return t;
}
});
this.runner = runner; this.runner = runner;
} }

View File

@ -41,13 +41,17 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler; import org.slf4j.Logger;
import java.util.logging.Level; import org.slf4j.LoggerFactory;
/** /**
* *
@ -94,19 +98,27 @@ public class RunNiFi {
private final File bootstrapConfigFile; private final File bootstrapConfigFile;
private final java.util.logging.Logger logger; // used for logging initial info; these will be logged to console by default when the app is started
private final Logger cmdLogger = LoggerFactory.getLogger("org.apache.nifi.bootstrap.Command");
// used for logging all info. These by default will be written to the log file
private final Logger defaultLogger = LoggerFactory.getLogger(RunNiFi.class);
private final ExecutorService loggingExecutor;
private volatile Set<Future<?>> loggingFutures = new HashSet<>(2);
public RunNiFi(final File bootstrapConfigFile, final boolean verbose) { public RunNiFi(final File bootstrapConfigFile, final boolean verbose) {
this.bootstrapConfigFile = bootstrapConfigFile; this.bootstrapConfigFile = bootstrapConfigFile;
logger = java.util.logging.Logger.getLogger("Bootstrap");
if (verbose) {
logger.info("Enabling Verbose Output");
logger.setLevel(Level.FINE); loggingExecutor = Executors.newFixedThreadPool(2, new ThreadFactory() {
final Handler handler = new ConsoleHandler(); @Override
handler.setLevel(Level.FINE); public Thread newThread(final Runnable runnable) {
logger.addHandler(handler); final Thread t = Executors.defaultThreadFactory().newThread(runnable);
} t.setDaemon(true);
t.setName("NiFi logging handler");
return t;
}
});
} }
private static void printUsage() { private static void printUsage() {
@ -185,10 +197,10 @@ public class RunNiFi {
switch (cmd.toLowerCase()) { switch (cmd.toLowerCase()) {
case "start": case "start":
runNiFi.start(false); runNiFi.start();
break; break;
case "run": case "run":
runNiFi.start(true); runNiFi.start();
break; break;
case "stop": case "stop":
runNiFi.stop(); runNiFi.stop();
@ -198,7 +210,7 @@ public class RunNiFi {
break; break;
case "restart": case "restart":
runNiFi.stop(); runNiFi.stop();
runNiFi.start(false); runNiFi.start();
break; break;
case "dump": case "dump":
runNiFi.dump(dumpFile); runNiFi.dump(dumpFile);
@ -206,40 +218,44 @@ public class RunNiFi {
} }
} }
public File getStatusFile() { File getStatusFile() {
return getStatusFile(defaultLogger);
}
public File getStatusFile(final Logger logger) {
final File confDir = bootstrapConfigFile.getParentFile(); final File confDir = bootstrapConfigFile.getParentFile();
final File nifiHome = confDir.getParentFile(); final File nifiHome = confDir.getParentFile();
final File bin = new File(nifiHome, "bin"); final File bin = new File(nifiHome, "bin");
final File statusFile = new File(bin, "nifi.pid"); final File statusFile = new File(bin, "nifi.pid");
logger.log(Level.FINE, "Status File: {0}", statusFile); logger.debug("Status File: {}", statusFile);
return statusFile; return statusFile;
} }
private Properties loadProperties() throws IOException { private Properties loadProperties(final Logger logger) throws IOException {
final Properties props = new Properties(); final Properties props = new Properties();
final File statusFile = getStatusFile(); final File statusFile = getStatusFile(logger);
if (statusFile == null || !statusFile.exists()) { if (statusFile == null || !statusFile.exists()) {
logger.fine("No status file to load properties from"); logger.debug("No status file to load properties from");
return props; return props;
} }
try (final FileInputStream fis = new FileInputStream(getStatusFile())) { try (final FileInputStream fis = new FileInputStream(getStatusFile(logger))) {
props.load(fis); props.load(fis);
} }
final Map<Object, Object> modified = new HashMap<>(props); final Map<Object, Object> modified = new HashMap<>(props);
modified.remove("secret.key"); modified.remove("secret.key");
logger.log(Level.FINE, "Properties: {0}", modified); logger.debug("Properties: {}", modified);
return props; return props;
} }
private synchronized void saveProperties(final Properties nifiProps) throws IOException { private synchronized void saveProperties(final Properties nifiProps, final Logger logger) throws IOException {
final File statusFile = getStatusFile(); final File statusFile = getStatusFile(logger);
if (statusFile.exists() && !statusFile.delete()) { if (statusFile.exists() && !statusFile.delete()) {
logger.log(Level.WARNING, "Failed to delete {0}", statusFile); logger.warn("Failed to delete {}", statusFile);
} }
if (!statusFile.createNewFile()) { if (!statusFile.createNewFile()) {
@ -252,7 +268,7 @@ public class RunNiFi {
perms.add(PosixFilePermission.OWNER_WRITE); perms.add(PosixFilePermission.OWNER_WRITE);
Files.setPosixFilePermissions(statusFile.toPath(), perms); Files.setPosixFilePermissions(statusFile.toPath(), perms);
} catch (final Exception e) { } catch (final Exception e) {
logger.log(Level.WARNING, "Failed to set permissions so that only the owner can read status file {0}; " logger.warn("Failed to set permissions so that only the owner can read status file {}; "
+ "this may allows others to have access to the key needed to communicate with NiFi. " + "this may allows others to have access to the key needed to communicate with NiFi. "
+ "Permissions should be changed so that only the owner can read this file", statusFile); + "Permissions should be changed so that only the owner can read this file", statusFile);
} }
@ -262,23 +278,23 @@ public class RunNiFi {
fos.getFD().sync(); fos.getFD().sync();
} }
logger.log(Level.FINE, "Saved Properties {0} to {1}", new Object[]{nifiProps, statusFile}); logger.debug("Saved Properties {} to {}", new Object[]{nifiProps, statusFile});
} }
private boolean isPingSuccessful(final int port, final String secretKey) { private boolean isPingSuccessful(final int port, final String secretKey, final Logger logger) {
logger.log(Level.FINE, "Pinging {0}", port); logger.debug("Pinging {}", port);
try (final Socket socket = new Socket("localhost", port)) { try (final Socket socket = new Socket("localhost", port)) {
final OutputStream out = socket.getOutputStream(); final OutputStream out = socket.getOutputStream();
out.write((PING_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.write((PING_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
out.flush(); out.flush();
logger.fine("Sent PING command"); logger.debug("Sent PING command");
socket.setSoTimeout(5000); socket.setSoTimeout(5000);
final InputStream in = socket.getInputStream(); final InputStream in = socket.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
final String response = reader.readLine(); final String response = reader.readLine();
logger.log(Level.FINE, "PING response: {0}", response); logger.debug("PING response: {}", response);
out.close(); out.close();
reader.close(); reader.close();
@ -288,27 +304,27 @@ public class RunNiFi {
} }
} }
private Integer getCurrentPort() throws IOException { private Integer getCurrentPort(final Logger logger) throws IOException {
final Properties props = loadProperties(); final Properties props = loadProperties(logger);
final String portVal = props.getProperty("port"); final String portVal = props.getProperty("port");
if (portVal == null) { if (portVal == null) {
logger.fine("No Port found in status file"); logger.debug("No Port found in status file");
return null; return null;
} else { } else {
logger.log(Level.FINE, "Port defined in status file: {0}", portVal); logger.debug("Port defined in status file: {}", portVal);
} }
final int port = Integer.parseInt(portVal); final int port = Integer.parseInt(portVal);
final boolean success = isPingSuccessful(port, props.getProperty("secret.key")); final boolean success = isPingSuccessful(port, props.getProperty("secret.key"), logger);
if (success) { if (success) {
logger.log(Level.FINE, "Successful PING on port {0}", port); logger.debug("Successful PING on port {}", port);
return port; return port;
} }
final String pid = props.getProperty("pid"); final String pid = props.getProperty("pid");
logger.log(Level.FINE, "PID in status file is {0}", pid); logger.debug("PID in status file is {}", pid);
if (pid != null) { if (pid != null) {
final boolean procRunning = isProcessRunning(pid); final boolean procRunning = isProcessRunning(pid, logger);
if (procRunning) { if (procRunning) {
return port; return port;
} else { } else {
@ -319,7 +335,7 @@ public class RunNiFi {
return null; return null;
} }
private boolean isProcessRunning(final String pid) { private boolean isProcessRunning(final String pid, final Logger logger) {
try { try {
// We use the "ps" command to check if the process is still running. // We use the "ps" command to check if the process is still running.
final ProcessBuilder builder = new ProcessBuilder(); final ProcessBuilder builder = new ProcessBuilder();
@ -343,9 +359,9 @@ public class RunNiFi {
// If output of the ps command had our PID, the process is running. // If output of the ps command had our PID, the process is running.
if (running) { if (running) {
logger.log(Level.FINE, "Process with PID {0} is running", pid); logger.debug("Process with PID {} is running", pid);
} else { } else {
logger.log(Level.FINE, "Process with PID {0} is not running", pid); logger.debug("Process with PID {} is not running", pid);
} }
return running; return running;
@ -355,10 +371,10 @@ public class RunNiFi {
} }
} }
private Status getStatus() { private Status getStatus(final Logger logger) {
final Properties props; final Properties props;
try { try {
props = loadProperties(); props = loadProperties(logger);
} catch (final IOException ioe) { } catch (final IOException ioe) {
return new Status(null, null, false, false); return new Status(null, null, false, false);
} }
@ -380,7 +396,7 @@ public class RunNiFi {
if (portValue != null) { if (portValue != null) {
try { try {
port = Integer.parseInt(portValue); port = Integer.parseInt(portValue);
pingSuccess = isPingSuccessful(port, secretKey); pingSuccess = isPingSuccessful(port, secretKey, logger);
} catch (final NumberFormatException nfe) { } catch (final NumberFormatException nfe) {
return new Status(null, null, false, false); return new Status(null, null, false, false);
} }
@ -390,20 +406,21 @@ public class RunNiFi {
return new Status(port, pid, true, true); return new Status(port, pid, true, true);
} }
final boolean alive = (pid == null) ? false : isProcessRunning(pid); final boolean alive = (pid == null) ? false : isProcessRunning(pid, logger);
return new Status(port, pid, pingSuccess, alive); return new Status(port, pid, pingSuccess, alive);
} }
public void status() throws IOException { public void status() throws IOException {
final Status status = getStatus(); final Logger logger = cmdLogger;
final Status status = getStatus(logger);
if (status.isRespondingToPing()) { if (status.isRespondingToPing()) {
logger.log(Level.INFO, "Apache NiFi is currently running, listening to Bootstrap on port {0}, PID={1}", logger.info("Apache NiFi is currently running, listening to Bootstrap on port {}, PID={}",
new Object[]{status.getPort(), status.getPid() == null ? "unknkown" : status.getPid()}); new Object[]{status.getPort(), status.getPid() == null ? "unknkown" : status.getPid()});
return; return;
} }
if (status.isProcessRunning()) { if (status.isProcessRunning()) {
logger.log(Level.INFO, "Apache NiFi is running at PID {0} but is not responding to ping requests", status.getPid()); logger.info("Apache NiFi is running at PID {} but is not responding to ping requests", status.getPid());
return; return;
} }
@ -427,36 +444,36 @@ public class RunNiFi {
* @throws IOException if any issues occur while writing the dump file * @throws IOException if any issues occur while writing the dump file
*/ */
public void dump(final File dumpFile) throws IOException { public void dump(final File dumpFile) throws IOException {
final Integer port = getCurrentPort(); final Logger logger = defaultLogger; // dump to bootstrap log file by default
final Integer port = getCurrentPort(logger);
if (port == null) { if (port == null) {
System.out.println("Apache NiFi is not currently running"); logger.info("Apache NiFi is not currently running");
return; return;
} }
final Properties nifiProps = loadProperties(); final Properties nifiProps = loadProperties(logger);
final String secretKey = nifiProps.getProperty("secret.key"); final String secretKey = nifiProps.getProperty("secret.key");
final StringBuilder sb = new StringBuilder(); final StringBuilder sb = new StringBuilder();
try (final Socket socket = new Socket()) { try (final Socket socket = new Socket()) {
logger.fine("Connecting to NiFi instance"); logger.debug("Connecting to NiFi instance");
socket.setSoTimeout(60000); socket.setSoTimeout(60000);
socket.connect(new InetSocketAddress("localhost", port)); socket.connect(new InetSocketAddress("localhost", port));
logger.fine("Established connection to NiFi instance."); logger.debug("Established connection to NiFi instance.");
socket.setSoTimeout(60000); socket.setSoTimeout(60000);
logger.log(Level.FINE, "Sending DUMP Command to port {0}", port); logger.debug("Sending DUMP Command to port {}", port);
final OutputStream out = socket.getOutputStream(); final OutputStream out = socket.getOutputStream();
out.write((DUMP_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.write((DUMP_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
out.flush(); out.flush();
out.close();
final InputStream in = socket.getInputStream(); final InputStream in = socket.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line; String line;
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
sb.append(line).append("\n"); sb.append(line).append("\n");
}
} }
reader.close();
} }
final String dump = sb.toString(); final String dump = sb.toString();
@ -466,28 +483,35 @@ public class RunNiFi {
try (final FileOutputStream fos = new FileOutputStream(dumpFile)) { try (final FileOutputStream fos = new FileOutputStream(dumpFile)) {
fos.write(dump.getBytes(StandardCharsets.UTF_8)); fos.write(dump.getBytes(StandardCharsets.UTF_8));
} }
logger.log(Level.INFO, "Successfully wrote thread dump to {0}", dumpFile.getAbsolutePath()); // we want to log to the console (by default) that we wrote the thread dump to the specified file
cmdLogger.info("Successfully wrote thread dump to {}", dumpFile.getAbsolutePath());
} }
} }
public void stop() throws IOException { public void stop() throws IOException {
final Integer port = getCurrentPort(); final Logger logger = cmdLogger;
final Integer port = getCurrentPort(logger);
if (port == null) { if (port == null) {
System.out.println("Apache NiFi is not currently running"); logger.info("Apache NiFi is not currently running");
return; return;
} }
final Properties nifiProps = loadProperties(); final Properties nifiProps = loadProperties(logger);
final String secretKey = nifiProps.getProperty("secret.key"); final String secretKey = nifiProps.getProperty("secret.key");
final File statusFile = getStatusFile(logger);
if (statusFile.exists() && !statusFile.delete()) {
logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
}
try (final Socket socket = new Socket()) { try (final Socket socket = new Socket()) {
logger.fine("Connecting to NiFi instance"); logger.debug("Connecting to NiFi instance");
socket.setSoTimeout(60000); socket.setSoTimeout(60000);
socket.connect(new InetSocketAddress("localhost", port)); socket.connect(new InetSocketAddress("localhost", port));
logger.fine("Established connection to NiFi instance."); logger.debug("Established connection to NiFi instance.");
socket.setSoTimeout(60000); socket.setSoTimeout(60000);
logger.log(Level.FINE, "Sending SHUTDOWN Command to port {0}", port); logger.debug("Sending SHUTDOWN Command to port {}", port);
final OutputStream out = socket.getOutputStream(); final OutputStream out = socket.getOutputStream();
out.write((SHUTDOWN_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.write((SHUTDOWN_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
out.flush(); out.flush();
@ -501,7 +525,7 @@ public class RunNiFi {
} }
final String response = sb.toString().trim(); final String response = sb.toString().trim();
logger.log(Level.FINE, "Received response to SHUTDOWN command: {0}", response); logger.debug("Received response to SHUTDOWN command: {}", response);
if (SHUTDOWN_CMD.equals(response)) { if (SHUTDOWN_CMD.equals(response)) {
logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now"); logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now");
@ -522,17 +546,17 @@ public class RunNiFi {
} }
final long startWait = System.nanoTime(); final long startWait = System.nanoTime();
while (isProcessRunning(pid)) { while (isProcessRunning(pid, logger)) {
logger.info("Waiting for Apache NiFi to finish shutting down..."); logger.info("Waiting for Apache NiFi to finish shutting down...");
final long waitNanos = System.nanoTime() - startWait; final long waitNanos = System.nanoTime() - startWait;
final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) { if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) {
if (isProcessRunning(pid)) { if (isProcessRunning(pid, logger)) {
logger.log(Level.WARNING, "NiFi has not finished shutting down after {0} seconds. Killing process.", gracefulShutdownSeconds); logger.warn("NiFi has not finished shutting down after {} seconds. Killing process.", gracefulShutdownSeconds);
try { try {
killProcessTree(pid); killProcessTree(pid, logger);
} catch (final IOException ioe) { } catch (final IOException ioe) {
logger.log(Level.SEVERE, "Failed to kill Process with PID {0}", pid); logger.error("Failed to kill Process with PID {}", pid);
} }
} }
break; break;
@ -546,16 +570,11 @@ public class RunNiFi {
logger.info("NiFi has finished shutting down."); logger.info("NiFi has finished shutting down.");
} }
final File statusFile = getStatusFile();
if (!statusFile.delete()) {
logger.log(Level.SEVERE, "Failed to delete status file {0}; this file should be cleaned up manually", statusFile);
}
} else { } else {
logger.log(Level.SEVERE, "When sending SHUTDOWN command to NiFi, got unexpected response {0}", response); logger.error("When sending SHUTDOWN command to NiFi, got unexpected response {}", response);
} }
} catch (final IOException ioe) { } catch (final IOException ioe) {
logger.log(Level.SEVERE, "Failed to send shutdown command to port {0} due to {1}", new Object[]{port, ioe}); logger.error("Failed to send shutdown command to port {} due to {}", new Object[]{port, ioe.toString(), ioe});
} }
} }
@ -574,14 +593,14 @@ public class RunNiFi {
return childPids; return childPids;
} }
private void killProcessTree(final String pid) throws IOException { private void killProcessTree(final String pid, final Logger logger) throws IOException {
logger.log(Level.FINE, "Killing Process Tree for PID {0}", pid); logger.debug("Killing Process Tree for PID {}", pid);
final List<String> children = getChildProcesses(pid); final List<String> children = getChildProcesses(pid);
logger.log(Level.FINE, "Children of PID {0}: {1}", new Object[]{pid, children}); logger.debug("Children of PID {}: {}", new Object[]{pid, children});
for (final String childPid : children) { for (final String childPid : children) {
killProcessTree(childPid); killProcessTree(childPid, logger);
} }
Runtime.getRuntime().exec(new String[]{"kill", "-9", pid}); Runtime.getRuntime().exec(new String[]{"kill", "-9", pid});
@ -597,10 +616,10 @@ public class RunNiFi {
} }
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
public void start(final boolean monitor) throws IOException, InterruptedException { public void start() throws IOException, InterruptedException {
final Integer port = getCurrentPort(); final Integer port = getCurrentPort(cmdLogger);
if (port != null) { if (port != null) {
System.out.println("Apache NiFi is already running, listening to Bootstrap on port " + port); cmdLogger.info("Apache NiFi is already running, listening to Bootstrap on port " + port);
return; return;
} }
@ -739,122 +758,154 @@ public class RunNiFi {
cmdBuilder.append(s).append(" "); cmdBuilder.append(s).append(" ");
} }
logger.info("Starting Apache NiFi..."); cmdLogger.info("Starting Apache NiFi...");
logger.log(Level.INFO, "Working Directory: {0}", workingDir.getAbsolutePath()); cmdLogger.info("Working Directory: {}", workingDir.getAbsolutePath());
logger.log(Level.INFO, "Command: {0}", cmdBuilder.toString()); cmdLogger.info("Command: {}", cmdBuilder.toString());
if (monitor) { String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP);
String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP); if (gracefulShutdown == null) {
if (gracefulShutdown == null) { gracefulShutdown = DEFAULT_GRACEFUL_SHUTDOWN_VALUE;
gracefulShutdown = DEFAULT_GRACEFUL_SHUTDOWN_VALUE; }
}
final int gracefulShutdownSeconds; final int gracefulShutdownSeconds;
try { try {
gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown); gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
} catch (final NumberFormatException nfe) { } catch (final NumberFormatException nfe) {
throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File "
+ bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
} }
if (gracefulShutdownSeconds < 0) { if (gracefulShutdownSeconds < 0) {
throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File "
+ bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
} }
Process process = builder.start(); Process process = builder.start();
Long pid = getPid(process); handleLogging(process);
if (pid != null) { Long pid = getPid(process, cmdLogger);
nifiPid = pid; if (pid != null) {
final Properties nifiProps = new Properties(); nifiPid = pid;
nifiProps.setProperty("pid", String.valueOf(nifiPid)); final Properties nifiProps = new Properties();
saveProperties(nifiProps); nifiProps.setProperty("pid", String.valueOf(nifiPid));
} saveProperties(nifiProps, cmdLogger);
}
shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds); shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor);
final Runtime runtime = Runtime.getRuntime(); final Runtime runtime = Runtime.getRuntime();
runtime.addShutdownHook(shutdownHook); runtime.addShutdownHook(shutdownHook);
while (true) { while (true) {
final boolean alive = isAlive(process); final boolean alive = isAlive(process);
if (alive) { if (alive) {
try { try {
Thread.sleep(1000L); Thread.sleep(1000L);
} catch (final InterruptedException ie) { } catch (final InterruptedException ie) {
} }
} else { } else {
try { try {
runtime.removeShutdownHook(shutdownHook); runtime.removeShutdownHook(shutdownHook);
} catch (final IllegalStateException ise) { } catch (final IllegalStateException ise) {
// happens when already shutting down // happens when already shutting down
} }
if (autoRestartNiFi) { if (autoRestartNiFi) {
logger.warning("Apache NiFi appears to have died. Restarting..."); final File statusFile = getStatusFile(defaultLogger);
process = builder.start(); if (!statusFile.exists()) {
defaultLogger.debug("Status File no longer exists. Will not restart NiFi");
pid = getPid(process);
if (pid != null) {
nifiPid = pid;
final Properties nifiProps = new Properties();
nifiProps.setProperty("pid", String.valueOf(nifiPid));
saveProperties(nifiProps);
}
shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds);
runtime.addShutdownHook(shutdownHook);
final boolean started = waitForStart();
if (started) {
logger.log(Level.INFO, "Successfully started Apache NiFi{0}", (pid == null ? "" : " with PID " + pid));
} else {
logger.severe("Apache NiFi does not appear to have started");
}
} else {
return; return;
} }
defaultLogger.warn("Apache NiFi appears to have died. Restarting...");
process = builder.start();
handleLogging(process);
pid = getPid(process, defaultLogger);
if (pid != null) {
nifiPid = pid;
final Properties nifiProps = new Properties();
nifiProps.setProperty("pid", String.valueOf(nifiPid));
saveProperties(nifiProps, defaultLogger);
}
shutdownHook = new ShutdownHook(process, this, secretKey, gracefulShutdownSeconds, loggingExecutor);
runtime.addShutdownHook(shutdownHook);
final boolean started = waitForStart();
if (started) {
defaultLogger.info("Successfully started Apache NiFi{}", (pid == null ? "" : " with PID " + pid));
} else {
defaultLogger.error("Apache NiFi does not appear to have started");
}
} else {
return;
} }
} }
} else {
final Process process = builder.start();
final Long pid = getPid(process);
if (pid != null) {
nifiPid = pid;
final Properties nifiProps = new Properties();
nifiProps.setProperty("pid", String.valueOf(nifiPid));
saveProperties(nifiProps);
}
boolean started = waitForStart();
if (started) {
logger.log(Level.INFO, "Successfully started Apache NiFi{0}", (pid == null ? "" : " with PID " + pid));
} else {
logger.severe("Apache NiFi does not appear to have started");
}
listener.stop();
} }
} }
private Long getPid(final Process process) { private void handleLogging(final Process process) {
final Set<Future<?>> existingFutures = loggingFutures;
if (existingFutures != null) {
for (final Future<?> future : existingFutures) {
future.cancel(false);
}
}
final Future<?> stdOutFuture = loggingExecutor.submit(new Runnable() {
@Override
public void run() {
final Logger stdOutLogger = LoggerFactory.getLogger("org.apache.nifi.StdOut");
final InputStream in = process.getInputStream();
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line;
while ((line = reader.readLine()) != null) {
stdOutLogger.info(line);
}
} catch (IOException e) {
defaultLogger.error("Failed to read from NiFi's Standard Out stream", e);
}
}
});
final Future<?> stdErrFuture = loggingExecutor.submit(new Runnable() {
@Override
public void run() {
final Logger stdErrLogger = LoggerFactory.getLogger("org.apache.nifi.StdErr");
final InputStream in = process.getErrorStream();
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line;
while ((line = reader.readLine()) != null) {
stdErrLogger.error(line);
}
} catch (IOException e) {
defaultLogger.error("Failed to read from NiFi's Standard Error stream", e);
}
}
});
final Set<Future<?>> futures = new HashSet<>();
futures.add(stdOutFuture);
futures.add(stdErrFuture);
this.loggingFutures = futures;
}
private Long getPid(final Process process, final Logger logger) {
try { try {
final Class<?> procClass = process.getClass(); final Class<?> procClass = process.getClass();
final Field pidField = procClass.getDeclaredField("pid"); final Field pidField = procClass.getDeclaredField("pid");
pidField.setAccessible(true); pidField.setAccessible(true);
final Object pidObject = pidField.get(process); final Object pidObject = pidField.get(process);
logger.log(Level.FINE, "PID Object = {0}", pidObject); logger.debug("PID Object = {}", pidObject);
if (pidObject instanceof Number) { if (pidObject instanceof Number) {
return ((Number) pidObject).longValue(); return ((Number) pidObject).longValue();
} }
return null; return null;
} catch (final IllegalAccessException | NoSuchFieldException nsfe) { } catch (final IllegalAccessException | NoSuchFieldException nsfe) {
logger.log(Level.FINE, "Could not find PID for child process due to {0}", nsfe); logger.debug("Could not find PID for child process due to {}", nsfe);
return null; return null;
} }
} }
@ -913,7 +964,7 @@ public class RunNiFi {
shutdownHook.setSecretKey(secretKey); shutdownHook.setSecretKey(secretKey);
} }
final File statusFile = getStatusFile(); final File statusFile = getStatusFile(defaultLogger);
final Properties nifiProps = new Properties(); final Properties nifiProps = new Properties();
if (nifiPid != -1) { if (nifiPid != -1) {
@ -923,12 +974,12 @@ public class RunNiFi {
nifiProps.setProperty("secret.key", secretKey); nifiProps.setProperty("secret.key", secretKey);
try { try {
saveProperties(nifiProps); saveProperties(nifiProps, defaultLogger);
} catch (final IOException ioe) { } catch (final IOException ioe) {
logger.log(Level.WARNING, "Apache NiFi has started but failed to persist NiFi Port information to {0} due to {1}", new Object[]{statusFile.getAbsolutePath(), ioe}); defaultLogger.warn("Apache NiFi has started but failed to persist NiFi Port information to {} due to {}", new Object[]{statusFile.getAbsolutePath(), ioe});
} }
logger.log(Level.INFO, "Apache NiFi now running and listening for Bootstrap requests on port {0}", port); defaultLogger.info("Apache NiFi now running and listening for Bootstrap requests on port {}", port);
} }
int getNiFiCommandControlPort() { int getNiFiCommandControlPort() {

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class ShutdownHook extends Thread { public class ShutdownHook extends Thread {
@ -28,14 +29,16 @@ public class ShutdownHook extends Thread {
private final Process nifiProcess; private final Process nifiProcess;
private final RunNiFi runner; private final RunNiFi runner;
private final int gracefulShutdownSeconds; private final int gracefulShutdownSeconds;
private final ExecutorService executor;
private volatile String secretKey; private volatile String secretKey;
public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final String secretKey, final int gracefulShutdownSeconds) { public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final String secretKey, final int gracefulShutdownSeconds, final ExecutorService executor) {
this.nifiProcess = nifiProcess; this.nifiProcess = nifiProcess;
this.runner = runner; this.runner = runner;
this.secretKey = secretKey; this.secretKey = secretKey;
this.gracefulShutdownSeconds = gracefulShutdownSeconds; this.gracefulShutdownSeconds = gracefulShutdownSeconds;
this.executor = executor;
} }
void setSecretKey(final String secretKey) { void setSecretKey(final String secretKey) {
@ -44,6 +47,7 @@ public class ShutdownHook extends Thread {
@Override @Override
public void run() { public void run() {
executor.shutdown();
runner.setAutoRestartNiFi(false); runner.setAutoRestartNiFi(false);
final int ccPort = runner.getNiFiCommandControlPort(); final int ccPort = runner.getNiFiCommandControlPort();
if (ccPort > 0) { if (ccPort > 0) {

View File

@ -39,7 +39,7 @@ set CONF_DIR=conf
set BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf set BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
set JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% set JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
set JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi SET JAVA_PARAMS=-cp %CONF_DIR%;%LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
set BOOTSTRAP_ACTION=dump set BOOTSTRAP_ACTION=dump
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

View File

@ -159,7 +159,19 @@ run() {
echo "Bootstrap Config File: $BOOTSTRAP_CONF" echo "Bootstrap Config File: $BOOTSTRAP_CONF"
echo echo
exec "$JAVA" -cp "$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@ # run 'start' in the background because the process will continue to run, monitoring NiFi.
# all other commands will terminate quickly so want to just wait for them
if [ "$1" = "start" ]; then
("$JAVA" -cp "$NIFI_HOME"/conf/:"$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@ &)
else
"$JAVA" -cp "$NIFI_HOME"/conf/:"$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@
fi
# Wait just a bit (3 secs) to wait for the logging to finish and then echo a new-line.
# We do this to avoid having logs spewed on the console after running the command and then not giving
# control back to the user
sleep 3
echo
} }
main() { main() {
@ -172,9 +184,14 @@ case "$1" in
install) install)
install "$@" install "$@"
;; ;;
start|stop|run|restart|status|dump) start|stop|run|status|dump)
main "$@" main "$@"
;; ;;
restart)
init
run "stop"
run "start"
;;
*) *)
echo "Usage nifi {start|stop|run|restart|status|dump|install}" echo "Usage nifi {start|stop|run|restart|status|dump|install}"
;; ;;

View File

@ -39,7 +39,7 @@ set CONF_DIR=conf
set BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf set BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
set JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% set JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
set JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi SET JAVA_PARAMS=-cp %CONF_DIR%;%LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
set BOOTSTRAP_ACTION=run set BOOTSTRAP_ACTION=run
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

View File

@ -1,47 +0,0 @@
@echo off
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
rem Use JAVA_HOME if it's set; otherwise, just use java
if "%JAVA_HOME%" == "" goto noJavaHome
if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
set JAVA_EXE=%JAVA_HOME%\bin\java.exe
goto startNifi
:noJavaHome
echo The JAVA_HOME environment variable is not defined correctly.
echo Instead the PATH will be used to find the java executable.
echo.
set JAVA_EXE=java
goto startNifi
:startNifi
set NIFI_ROOT=%~dp0..\
pushd "%NIFI_ROOT%"
set LIB_DIR=lib\bootstrap
set CONF_DIR=conf
set BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
set JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
set JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
set BOOTSTRAP_ACTION=start
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%
popd

View File

@ -1,47 +0,0 @@
@echo off
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
rem Use JAVA_HOME if it's set; otherwise, just use java
if "%JAVA_HOME%" == "" goto noJavaHome
if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
set JAVA_EXE=%JAVA_HOME%\bin\java.exe
goto startNifi
:noJavaHome
echo The JAVA_HOME environment variable is not defined correctly.
echo Instead the PATH will be used to find the java executable.
echo.
set JAVA_EXE=java
goto startNifi
:startNifi
set NIFI_ROOT=%~dp0..\
pushd "%NIFI_ROOT%"
set LIB_DIR=lib\bootstrap
set CONF_DIR=conf
set BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
set JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
set JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
set BOOTSTRAP_ACTION=stop
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%
popd

View File

@ -59,9 +59,34 @@
</encoder> </encoder>
</appender> </appender>
<appender name="BOOTSTRAP_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/nifi-bootstrap.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--
For daily rollover, use 'user_%d.log'.
For hourly rollover, use 'user_%d{yyyy-MM-dd_HH}.log'.
To GZIP rolled files, replace '.log' with '.log.gz'.
To ZIP rolled files, replace '.log' with '.log.zip'.
-->
<fileNamePattern>./logs/nifi-bootstrap_%d.log</fileNamePattern>
<!-- keep 5 log files worth of history -->
<maxHistory>5</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
</encoder>
</appender>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
</encoder>
</appender>
<!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR --> <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
<logger name="org.apache.nifi" level="INFO"/> <logger name="org.apache.nifi" level="INFO"/>
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
<!-- Logger for managing logging statements for nifi clusters. --> <!-- Logger for managing logging statements for nifi clusters. -->
<logger name="org.apache.nifi.cluster" level="INFO"/> <logger name="org.apache.nifi.cluster" level="INFO"/>
@ -101,6 +126,29 @@
<appender-ref ref="USER_FILE"/> <appender-ref ref="USER_FILE"/>
</logger> </logger>
<!--
Logger for capturing Bootstrap logs and NiFi's standard error and standard out.
-->
<logger name="org.apache.nifi.bootstrap" level="INFO" additivity="false">
<appender-ref ref="BOOTSTRAP_FILE" />
</logger>
<logger name="org.apache.nifi.bootstrap.Command" level="INFO" additivity="false">
<appender-ref ref="CONSOLE" />
<appender-ref ref="BOOTSTRAP_FILE" />
</logger>
<!-- Everything written to NiFi's Standard Out will be logged with the logger org.apache.nifi.StdOut at INFO level -->
<logger name="org.apache.nifi.StdOut" level="INFO" additivity="false">
<appender-ref ref="BOOTSTRAP_FILE" />
</logger>
<!-- Everything written to NiFi's Standard Error will be logged with the logger org.apache.nifi.StdErr at ERROR level -->
<logger name="org.apache.nifi.StdErr" level="ERROR" additivity="false">
<appender-ref ref="BOOTSTRAP_FILE" />
</logger>
<root level="INFO"> <root level="INFO">
<appender-ref ref="APP_FILE"/> <appender-ref ref="APP_FILE"/>
</root> </root>