NIFI-145: Added run-nifi.bat and fixed bugs

This commit is contained in:
Mark Payne 2014-12-10 09:23:51 -05:00
parent e544191f1d
commit 6d46829795
6 changed files with 144 additions and 33 deletions

View File

@ -0,0 +1,15 @@
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
SET LIB_DIR=%~dp0..\lib
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.boostrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=run
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

View File

@ -1,7 +1,14 @@
# Configure where NiFi's lib and conf directories live
lib.dir=./lib lib.dir=./lib
conf.dir=./conf conf.dir=./conf
# How long to wait after telling NiFi to shutdown before explicitly killing the Process
graceful.shutdown.seconds=20
# Disable JSR 199 so that we can use JSP's without running a JDK
java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true
# JVM memory settings
java.arg.2=-Xms256m java.arg.2=-Xms256m
java.arg.3=-Xmx512m java.arg.3=-Xmx512m

View File

@ -24,6 +24,7 @@ import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -38,8 +39,8 @@ public class BootstrapListener {
private final NiFi nifi; private final NiFi nifi;
private final int bootstrapPort; private final int bootstrapPort;
private Listener listener; private volatile Listener listener;
private ServerSocket serverSocket; private volatile ServerSocket serverSocket;
public BootstrapListener(final NiFi nifi, final int port) { public BootstrapListener(final NiFi nifi, final int port) {
@ -52,12 +53,14 @@ public class BootstrapListener {
serverSocket = new ServerSocket(); serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("localhost", 0)); serverSocket.bind(new InetSocketAddress("localhost", 0));
serverSocket.setSoTimeout(2000);
final int localPort = serverSocket.getLocalPort(); final int localPort = serverSocket.getLocalPort();
logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort); logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort);
listener = new Listener(serverSocket); listener = new Listener(serverSocket);
final Thread listenThread = new Thread(listener); final Thread listenThread = new Thread(listener);
listenThread.setDaemon(true);
listenThread.setName("Listen to Bootstrap"); listenThread.setName("Listen to Bootstrap");
listenThread.start(); listenThread.start();
@ -114,15 +117,17 @@ public class BootstrapListener {
@Override @Override
public void run() { public void run() {
while (!serverSocket.isClosed()) { while (!stopped) {
try { try {
final Socket socket;
try {
socket = serverSocket.accept();
} catch (final SocketTimeoutException ste) {
if ( stopped ) { if ( stopped ) {
return; return;
} }
final Socket socket; continue;
try {
socket = serverSocket.accept();
} catch (final IOException ioe) { } catch (final IOException ioe) {
if ( stopped ) { if ( stopped ) {
return; return;

View File

@ -48,6 +48,7 @@ public class NiFi {
private final BootstrapListener bootstrapListener; private final BootstrapListener bootstrapListener;
public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port"; public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
private volatile boolean shutdown = false;
public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@ -126,13 +127,21 @@ public class NiFi {
final long startTime = System.nanoTime(); final long startTime = System.nanoTime();
nifiServer = (NiFiServer) jettyConstructor.newInstance(properties); nifiServer = (NiFiServer) jettyConstructor.newInstance(properties);
nifiServer.setExtensionMapping(extensionMapping); nifiServer.setExtensionMapping(extensionMapping);
if ( shutdown ) {
logger.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
} else {
nifiServer.start(); nifiServer.start();
final long endTime = System.nanoTime(); final long endTime = System.nanoTime();
logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds."); logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds.");
} }
}
protected void shutdownHook() { protected void shutdownHook() {
try { try {
this.shutdown = true;
logger.info("Initiating shutdown of Jetty web server..."); logger.info("Initiating shutdown of Jetty web server...");
if (nifiServer != null) { if (nifiServer != null) {
nifiServer.stop(); nifiServer.stop();

View File

@ -57,6 +57,9 @@ public class RunNiFi {
public static final String DEFAULT_CONFIG_FILE = "./conf/boostrap.conf"; public static final String DEFAULT_CONFIG_FILE = "./conf/boostrap.conf";
public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties"; public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties";
public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
public static final int MAX_RESTART_ATTEMPTS = 5; public static final int MAX_RESTART_ATTEMPTS = 5;
public static final int STARTUP_WAIT_SECONDS = 60; public static final int STARTUP_WAIT_SECONDS = 60;
@ -85,6 +88,7 @@ public class RunNiFi {
System.out.println("Start : Start a new instance of Apache NiFi"); System.out.println("Start : Start a new instance of Apache NiFi");
System.out.println("Stop : Stop a running instance of Apache NiFi"); System.out.println("Stop : Stop a running instance of Apache NiFi");
System.out.println("Status : Determine if there is a running instance of Apache NiFi"); System.out.println("Status : Determine if there is a running instance of Apache NiFi");
System.out.println("Run : Start a new instance of Apache NiFi and monitor the Process, restarting if the instance dies");
System.out.println(); System.out.println();
} }
@ -96,6 +100,7 @@ public class RunNiFi {
switch (args[0].toLowerCase()) { switch (args[0].toLowerCase()) {
case "start": case "start":
case "run":
case "stop": case "stop":
case "status": case "status":
break; break;
@ -127,7 +132,10 @@ public class RunNiFi {
switch (args[0].toLowerCase()) { switch (args[0].toLowerCase()) {
case "start": case "start":
runNiFi.start(); runNiFi.start(false);
break;
case "run":
runNiFi.start(true);
break; break;
case "stop": case "stop":
runNiFi.stop(); runNiFi.stop();
@ -140,8 +148,10 @@ public class RunNiFi {
public File getStatusFile() { public File getStatusFile() {
final File rootDir = bootstrapConfigFile.getParentFile(); final File confDir = bootstrapConfigFile.getParentFile();
final File statusFile = new File(rootDir, "nifi.port"); final File nifiHome = confDir.getParentFile();
final File bin = new File(nifiHome, "bin");
final File statusFile = new File(bin, "nifi.port");
return statusFile; return statusFile;
} }
@ -165,11 +175,7 @@ public class RunNiFi {
return port; return port;
} }
} catch (final IOException ioe) { } catch (final IOException ioe) {
System.out.println("Found NiFi instance info at " + statusFile + " but information appears to be stale. Removing file."); System.out.println("Found NiFi instance info at " + statusFile + " indicating that NiFi is running and listening to port " + port + " but unable to communicate with NiFi on that port. The process may have died or may be hung.");
if ( !statusFile.delete() ) {
System.err.println("Unable to remove status file");
}
throw ioe; throw ioe;
} }
} catch (final Exception e) { } catch (final Exception e) {
@ -212,6 +218,11 @@ public class RunNiFi {
final String response = reader.readLine(); final String response = reader.readLine();
if ( SHUTDOWN_CMD.equals(response) ) { if ( SHUTDOWN_CMD.equals(response) ) {
System.out.println("Apache NiFi has accepted the Shutdown Command and is shutting down now"); System.out.println("Apache NiFi has accepted the Shutdown Command and is shutting down now");
final File statusFile = getStatusFile();
if ( !statusFile.delete() ) {
System.err.println("Failed to delete status file " + statusFile + "; this file should be cleaned up manually");
}
} else { } else {
System.err.println("When sending SHUTDOWN command to NiFi, got unexpected response " + response); System.err.println("When sending SHUTDOWN command to NiFi, got unexpected response " + response);
} }
@ -222,8 +233,17 @@ public class RunNiFi {
} }
private boolean isAlive(final Process process) {
try {
process.exitValue();
return false;
} catch (final IllegalThreadStateException itse) {
return true;
}
}
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
public void start() throws IOException, InterruptedException { public void start(final boolean monitor) throws IOException, InterruptedException {
final Integer port = getCurrentPort(); final Integer port = getCurrentPort();
if ( port != null ) { if ( port != null ) {
System.out.println("Apache NiFi is already running, listening on port " + port); System.out.println("Apache NiFi is already running, listening on port " + port);
@ -344,6 +364,60 @@ public class RunNiFi {
System.out.println("Working Directory: " + workingDir.getAbsolutePath()); System.out.println("Working Directory: " + workingDir.getAbsolutePath());
System.out.println("Command: " + cmdBuilder.toString()); System.out.println("Command: " + cmdBuilder.toString());
if ( monitor ) {
String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP);
if ( gracefulShutdown == null ) {
gracefulShutdown = DEFAULT_GRACEFUL_SHUTDOWN_VALUE;
}
final int gracefulShutdownSeconds;
try {
gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
} catch (final NumberFormatException nfe) {
throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Boostrap Config File " + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
}
if ( gracefulShutdownSeconds < 0 ) {
throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Boostrap Config File " + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
}
Process process = builder.start();
ShutdownHook shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds);
final Runtime runtime = Runtime.getRuntime();
runtime.addShutdownHook(shutdownHook);
while (true) {
final boolean alive = isAlive(process);
if ( alive ) {
try {
Thread.sleep(1000L);
} catch (final InterruptedException ie) {
}
} else {
runtime.removeShutdownHook(shutdownHook);
if (autoRestartNiFi) {
System.out.println("Apache NiFi appears to have died. Restarting...");
process = builder.start();
shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds);
runtime.addShutdownHook(shutdownHook);
final boolean started = waitForStart();
if ( started ) {
System.out.println("Successfully started Apache NiFi");
} else {
System.err.println("Apache NiFi does not appear to have started");
}
} else {
return;
}
}
}
} else {
builder.start(); builder.start();
boolean started = waitForStart(); boolean started = waitForStart();
@ -355,6 +429,7 @@ public class RunNiFi {
listener.stop(); listener.stop();
} }
}
private boolean waitForStart() { private boolean waitForStart() {

View File

@ -26,12 +26,12 @@ import java.util.concurrent.TimeUnit;
public class ShutdownHook extends Thread { public class ShutdownHook extends Thread {
private final Process nifiProcess; private final Process nifiProcess;
private final RunNiFi runner; private final RunNiFi runner;
private final int gracefulShutdownSeconds;
public static final int WAIT_SECONDS = 10; public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final int gracefulShutdownSeconds) {
public ShutdownHook(final Process nifiProcess, final RunNiFi runner) {
this.nifiProcess = nifiProcess; this.nifiProcess = nifiProcess;
this.runner = runner; this.runner = runner;
this.gracefulShutdownSeconds = gracefulShutdownSeconds;
} }
@Override @Override
@ -58,9 +58,9 @@ public class ShutdownHook extends Thread {
while ( isAlive(nifiProcess) ) { while ( isAlive(nifiProcess) ) {
final long waitNanos = System.nanoTime() - startWait; final long waitNanos = System.nanoTime() - startWait;
final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
if ( waitSeconds >= WAIT_SECONDS ) { if ( waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0 ) {
if ( isAlive(nifiProcess) ) { if ( isAlive(nifiProcess) ) {
System.out.println("NiFi has not finished shutting down after " + WAIT_SECONDS + " seconds. Killing process."); System.out.println("NiFi has not finished shutting down after " + gracefulShutdownSeconds + " seconds. Killing process.");
nifiProcess.destroy(); nifiProcess.destroy();
} }
break; break;
@ -73,7 +73,7 @@ public class ShutdownHook extends Thread {
final File statusFile = runner.getStatusFile(); final File statusFile = runner.getStatusFile();
if ( !statusFile.delete() ) { if ( !statusFile.delete() ) {
System.err.println("Failed to delete status file " + statusFile.getAbsolutePath()); System.err.println("Failed to delete status file " + statusFile.getAbsolutePath() + "; this file should be cleaned up manually");
} }
} }