diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index 99e3655e1d..e39436e839 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -92,7 +92,7 @@ public class VolatileContentRepository implements ContentRepository { public static final String MAX_SIZE_PROPERTY = "nifi.volatile.content.repository.max.size"; public static final String BLOCK_SIZE_PROPERTY = "nifi.volatile.content.repository.block.size"; - private final ScheduledExecutorService executor = new FlowEngine(3, "VolatileContentRepository Workers"); + private final ScheduledExecutorService executor = new FlowEngine(3, "VolatileContentRepository Workers", true); private final ConcurrentMap claimMap = new ConcurrentHashMap<>(256); private final AtomicLong repoSize = new AtomicLong(0L); diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh index 60afa48992..163f8e24c6 100644 --- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh @@ -15,19 +15,16 @@ # See the License for the specific language governing permissions and # limitations under the License. # +# chkconfig: 2345 20 80 +# description: Apache NiFi is a dataflow system based on the principles of Flow-Based Programming. +# # Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches -DIRNAME=`dirname "$0"` +NIFI_HOME=`cd $(dirname "$0") && cd .. && pwd` PROGNAME=`basename "$0"` -# -#Readlink is not available on all systems. Change variable to appropriate alternative as part of OS detection -# - -READLINK="readlink" - warn() { echo "${PROGNAME}: $*" } @@ -54,17 +51,14 @@ detectOS() { os400=true ;; Darwin) - darwin=true - ;; + darwin=true + ;; esac # For AIX, set an environment variable if $aix; then export LDR_CNTRL=MAXDATA=0xB0000000@DSA echo $LDR_CNTRL fi - if $darwin; then - READLINK="greadlink" - fi } unlimitFD() { @@ -95,22 +89,6 @@ unlimitFD() { fi } -locateHome() { - if [ "x$NIFI_HOME" != "x" ]; then - warn "Ignoring predefined value for NIFI_HOME" - fi - - # In POSIX shells, CDPATH may cause cd to write to stdout - (unset CDPATH) >/dev/null 2>&1 && unset CDPATH - NIFI_HOME=$(dirname $($READLINK -f $0))/../ - NIFI_HOME=$($READLINK -f $NIFI_HOME) - cd $NIFI_HOME - echo "Directory changed to NIFI_HOME of '$NIFI_HOME'" - if [ ! -d "$NIFI_HOME" ]; then - die "NIFI_HOME is not valid: $NIFI_HOME" - fi - -} locateJava() { @@ -138,9 +116,6 @@ locateJava() { fi fi fi - if [ "x$JAVA_HOME" = "x" ]; then - JAVA_HOME="$(dirname $(dirname $(pathCanonical "$JAVA")))" - fi } init() { @@ -150,29 +125,41 @@ init() { # Unlimit the number of file descriptors if possible unlimitFD - # Locate the NiFi home directory - locateHome - # Locate the Java VM to execute locateJava } + +install() { + SVC_NAME=nifi + if [ "x$2" != "x" ] ; then + SVC_NAME=$2 + fi + + SVC_FILE=/etc/init.d/$SVC_NAME + cp $0 $SVC_FILE + sed -i s:NIFI_HOME=.*:NIFI_HOME="$NIFI_HOME": $SVC_FILE + sed -i s:PROGNAME=.*:PROGNAME=$(basename "$0"): $SVC_FILE + echo Service $SVC_NAME installed +} + + run() { - BOOTSTRAP_CONF="$NIFI_HOME/conf/bootstrap.conf"; - + BOOTSTRAP_CONF="$NIFI_HOME/conf/bootstrap.conf"; + if $cygwin; then NIFI_HOME=`cygpath --path --windows "$NIFI_HOME"` BOOTSTRAP_CONF=`cygpath --path --windows "$BOOTSTRAP_CONF"` fi - - echo + + echo echo "Java home: $JAVA_HOME" echo "NiFi home: $NIFI_HOME" echo echo "Bootstrap Config File: $BOOTSTRAP_CONF" echo - - exec "$JAVA" -cp "$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1 + + exec "$JAVA" -cp "$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@ } main() { @@ -180,4 +167,15 @@ main() { run "$@" } -main "$@" + +case "$1" in + install) + install "$@" + ;; + start|stop|run|status) + main "$@" + ;; + *) + echo "Usage nifi {start|stop|run|status|install}" + ;; +esac diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf b/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf index c45d8f8280..37ec474575 100644 --- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf @@ -13,4 +13,10 @@ java.arg.2=-Xms256m java.arg.3=-Xmx512m # Enable Remote Debugging -#java.arg.2=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 \ No newline at end of file +#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 + +# Java command to use when running NiFi +java=java + +# Username to use when running NiFi. This value will be ignored on Windows. +run.as= diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java index 31f336cb83..3393952f46 100644 --- a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java +++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java @@ -27,9 +27,11 @@ import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.nifi.util.LimitingInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,14 +40,16 @@ public class BootstrapListener { private final NiFi nifi; private final int bootstrapPort; - + private final String secretKey; + private volatile Listener listener; private volatile ServerSocket serverSocket; - public BootstrapListener(final NiFi nifi, final int port) { + public BootstrapListener(final NiFi nifi, final int bootstrapPort) { this.nifi = nifi; - this.bootstrapPort = port; + this.bootstrapPort = bootstrapPort; + secretKey = UUID.randomUUID().toString(); } public void start() throws IOException { @@ -71,7 +75,7 @@ public class BootstrapListener { socket.setSoTimeout(60000); final OutputStream out = socket.getOutputStream(); - out.write(("PORT " + localPort + "\n").getBytes(StandardCharsets.UTF_8)); + out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); logger.debug("Awaiting response from Bootstrap..."); @@ -121,6 +125,7 @@ public class BootstrapListener { try { final Socket socket; try { + logger.debug("Listening for Bootstrap Requests"); socket = serverSocket.accept(); } catch (final SocketTimeoutException ste) { if ( stopped ) { @@ -136,6 +141,9 @@ public class BootstrapListener { throw ioe; } + logger.debug("Received connection from Bootstrap"); + socket.setSoTimeout(5000); + executor.submit(new Runnable() { @Override public void run() { @@ -184,27 +192,42 @@ public class BootstrapListener { out.flush(); } - private BootstrapRequest readRequest(final InputStream in) throws IOException { - final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + + @SuppressWarnings("resource") // we don't want to close the stream, as the caller will do that + private BootstrapRequest readRequest(final InputStream in) throws IOException { + // We want to ensure that we don't try to read data from an InputStream directly + // by a BufferedReader because any user on the system could open a socket and send + // a multi-gigabyte file without any new lines in order to crash the NiFi instance + // (or at least cause OutOfMemoryErrors, which can wreak havoc on the running instance). + // So we will limit the Input Stream to only 4 KB, which should be plenty for any request. + final LimitingInputStream limitingIn = new LimitingInputStream(in, 4096); + final BufferedReader reader = new BufferedReader(new InputStreamReader(limitingIn)); final String line = reader.readLine(); final String[] splits = line.split(" "); if ( splits.length < 0 ) { - throw new IOException("Received invalid command from NiFi: " + line); + throw new IOException("Received invalid request from Bootstrap: " + line); } final String requestType = splits[0]; final String[] args; if ( splits.length == 1 ) { - args = new String[0]; + throw new IOException("Received invalid request from Bootstrap; request did not have a secret key; request type = " + requestType); + } else if ( splits.length == 2 ) { + args = new String[0]; } else { - args = Arrays.copyOfRange(splits, 1, splits.length); + args = Arrays.copyOfRange(splits, 2, splits.length); + } + + final String requestKey = splits[1]; + if ( !secretKey.equals(requestKey) ) { + throw new IOException("Received invalid Secret Key for request type " + requestType); } try { return new BootstrapRequest(requestType, args); } catch (final Exception e) { - throw new IOException("Received invalid request from bootstrap; request type = " + requestType); + throw new IOException("Received invalid request from Bootstrap; request type = " + requestType); } } @@ -227,7 +250,8 @@ public class BootstrapListener { return requestType; } - public String[] getArgs() { + @SuppressWarnings("unused") + public String[] getArgs() { return args; } } diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java index 13cd4d691b..98489af16f 100644 --- a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java +++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java @@ -36,7 +36,6 @@ import org.apache.nifi.nar.NarClassLoaders; import org.apache.nifi.nar.NarUnpacker; import org.apache.nifi.util.FileUtils; import org.apache.nifi.util.NiFiProperties; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; @@ -61,7 +60,6 @@ public class NiFi { // register the shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override public void run() { // shutdown the jetty server diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/util/LimitingInputStream.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/util/LimitingInputStream.java new file mode 100644 index 0000000000..ce3a6dbff0 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/util/LimitingInputStream.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; + +public class LimitingInputStream extends InputStream { + + private final InputStream in; + private final long limit; + private long bytesRead = 0; + + public LimitingInputStream(final InputStream in, final long limit) { + this.in = in; + this.limit = limit; + } + + @Override + public int read() throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int val = in.read(); + if (val > -1) { + bytesRead++; + } + return val; + } + + @Override + public int read(final byte[] b) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(b.length, limit - bytesRead); + + final int val = in.read(b, 0, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(len, limit - bytesRead); + + final int val = in.read(b, off, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public long skip(final long n) throws IOException { + final long skipped = in.skip(Math.min(n, limit - bytesRead)); + bytesRead += skipped; + return skipped; + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public void mark(int readlimit) { + in.mark(readlimit); + } + + @Override + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public void reset() throws IOException { + in.reset(); + } +} diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java index 8138c02076..f03bf1edcd 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java @@ -27,6 +27,7 @@ import java.util.Arrays; import org.apache.nifi.bootstrap.exception.InvalidCommandException; + public class BootstrapCodec { private final RunNiFi runner; private final BufferedReader reader; @@ -63,7 +64,7 @@ public class BootstrapCodec { private void processRequest(final String cmd, final String[] args) throws InvalidCommandException, IOException { switch (cmd) { case "PORT": { - if ( args.length != 1 ) { + if ( args.length != 2 ) { throw new InvalidCommandException(); } @@ -77,8 +78,10 @@ public class BootstrapCodec { if ( port < 1 || port > 65535 ) { throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535"); } + + final String secretKey = args[1]; - runner.setNiFiCommandControlPort(port); + runner.setNiFiCommandControlPort(port, secretKey); writer.write("OK"); writer.newLine(); writer.flush(); diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java index c83135199f..f05d45a395 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java @@ -17,6 +17,7 @@ package org.apache.nifi.bootstrap; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; @@ -24,6 +25,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.nifi.bootstrap.util.LimitingInputStream; + public class NiFiListener { private ServerSocket serverSocket; private volatile Listener listener; @@ -92,17 +95,26 @@ public class NiFiListener { throw ioe; } - executor.submit(new Runnable() { @Override public void run() { try { - final BootstrapCodec codec = new BootstrapCodec(runner, socket.getInputStream(), socket.getOutputStream()); + // we want to ensure that we don't try to read data from an InputStream directly + // by a BufferedReader because any user on the system could open a socket and send + // a multi-gigabyte file without any new lines in order to crash the Bootstrap, + // which in turn may cause the Shutdown Hook to shutdown NiFi. + // So we will limit the amount of data to read to 4 KB + final InputStream limitingIn = new LimitingInputStream(socket.getInputStream(), 4096); + final BootstrapCodec codec = new BootstrapCodec(runner, limitingIn, socket.getOutputStream()); codec.communicate(); - socket.close(); } catch (final Throwable t) { System.out.println("Failed to communicate with NiFi due to " + t); t.printStackTrace(); + } finally { + try { + socket.close(); + } catch (final IOException ioe) { + } } } }); diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index 1b82a3c684..437493e4c2 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -26,19 +26,28 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.Reader; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.attribute.PosixFilePermission; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.ConsoleHandler; +import java.util.logging.Handler; +import java.util.logging.Level; /** @@ -60,6 +69,8 @@ public class RunNiFi { public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds"; public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20"; + public static final String RUN_AS_PROP = "run.as"; + public static final int MAX_RESTART_ATTEMPTS = 5; public static final int STARTUP_WAIT_SECONDS = 60; @@ -68,20 +79,32 @@ public class RunNiFi { private volatile boolean autoRestartNiFi = true; private volatile int ccPort = -1; + private volatile long nifiPid = -1L; private final Lock lock = new ReentrantLock(); private final Condition startupCondition = lock.newCondition(); private final File bootstrapConfigFile; + + private final java.util.logging.Logger logger; - public RunNiFi(final File bootstrapConfigFile) { + public RunNiFi(final File bootstrapConfigFile, final boolean verbose) { this.bootstrapConfigFile = bootstrapConfigFile; + logger = java.util.logging.Logger.getLogger("Bootstrap"); + if ( verbose ) { + logger.info("Enabling Verbose Output"); + + logger.setLevel(Level.FINE); + final Handler handler = new ConsoleHandler(); + handler.setLevel(Level.FINE); + logger.addHandler(handler); + } } private static void printUsage() { System.out.println("Usage:"); System.out.println(); - System.out.println("java org.apache.nifi.bootstrap.RunNiFi "); + System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] "); System.out.println(); System.out.println("Valid commands include:"); System.out.println(""); @@ -91,22 +114,33 @@ public class RunNiFi { System.out.println("Run : Start a new instance of Apache NiFi and monitor the Process, restarting if the instance dies"); System.out.println(); } + public static void main(final String[] args) throws IOException, InterruptedException { - if ( args.length != 1 ) { + if ( args.length < 1 || args.length > 2 ) { printUsage(); return; } - switch (args[0].toLowerCase()) { + boolean verbose = false; + if ( args.length == 2 ) { + if ( args[0].equals("-verbose") ) { + verbose = true; + } else { + printUsage(); + return; + } + } + + final String cmd = args.length == 1 ? args[0] : args[1]; + + switch (cmd.toLowerCase()) { case "start": case "run": case "stop": case "status": break; default: - System.out.println("Invalid argument: " + args[0]); - System.out.println(); printUsage(); return; } @@ -128,9 +162,9 @@ public class RunNiFi { final File configFile = new File(configFilename); - final RunNiFi runNiFi = new RunNiFi(configFile); + final RunNiFi runNiFi = new RunNiFi(configFile, verbose); - switch (args[0].toLowerCase()) { + switch (cmd.toLowerCase()) { case "start": runNiFi.start(false); break; @@ -151,49 +185,209 @@ public class RunNiFi { final File confDir = bootstrapConfigFile.getParentFile(); final File nifiHome = confDir.getParentFile(); final File bin = new File(nifiHome, "bin"); - final File statusFile = new File(bin, "nifi.port"); + final File statusFile = new File(bin, "nifi.pid"); + + logger.fine("Status File: " + statusFile); + return statusFile; } + + private Properties loadProperties() throws IOException { + final Properties props = new Properties(); + final File statusFile = getStatusFile(); + if ( statusFile == null || !statusFile.exists() ) { + logger.fine("No status file to load properties from"); + return props; + } + + try (final FileInputStream fis = new FileInputStream(getStatusFile())) { + props.load(fis); + } + + logger.fine("Properties: " + props); + return props; + } + + private synchronized void saveProperties(final Properties nifiProps) throws IOException { + final File statusFile = getStatusFile(); + if ( statusFile.exists() && !statusFile.delete() ) { + logger.warning("Failed to delete " + statusFile); + } + if ( !statusFile.createNewFile() ) { + throw new IOException("Failed to create file " + statusFile); + } + + try { + final Set perms = new HashSet<>(); + perms.add(PosixFilePermission.OWNER_READ); + perms.add(PosixFilePermission.OWNER_WRITE); + Files.setPosixFilePermissions(statusFile.toPath(), perms); + } catch (final Exception e) { + logger.warning("Failed to set permissions so that only the owner can read status file " + statusFile + "; this may allows others to have access to the key needed to communicate with NiFi. Permissions should be changed so that only the owner can read this file"); + } + + try (final FileOutputStream fos = new FileOutputStream(statusFile)) { + nifiProps.store(fos, null); + fos.getFD().sync(); + } + + logger.fine("Saved Properties " + nifiProps + " to " + statusFile); + } + + private boolean isPingSuccessful(final int port, final String secretKey) { + logger.fine("Pinging " + port); + + try (final Socket socket = new Socket("localhost", port)) { + final OutputStream out = socket.getOutputStream(); + out.write((PING_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + logger.fine("Sent PING command"); + socket.setSoTimeout(5000); + final InputStream in = socket.getInputStream(); + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + final String response = reader.readLine(); + logger.fine("PING response: " + response); + + return PING_CMD.equals(response); + } catch (final IOException ioe) { + return false; + } + } + private Integer getCurrentPort() throws IOException { - try { - final File statusFile = getStatusFile(); - final byte[] info = Files.readAllBytes(statusFile.toPath()); - final String text = new String(info); - - final int port = Integer.parseInt(text); - - try (final Socket socket = new Socket("localhost", port)) { - final OutputStream out = socket.getOutputStream(); - out.write((PING_CMD + "\n").getBytes(StandardCharsets.UTF_8)); - out.flush(); - - final InputStream in = socket.getInputStream(); - final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - final String response = reader.readLine(); - if ( response.equals(PING_CMD) ) { - return port; - } - } catch (final IOException ioe) { - System.out.println("Found NiFi instance info at " + statusFile + " indicating that NiFi is running and listening to port " + port + " but unable to communicate with NiFi on that port. The process may have died or may be hung."); - throw ioe; - } - } catch (final Exception e) { - return null; + final Properties props = loadProperties(); + final String portVal = props.getProperty("port"); + if ( portVal == null ) { + logger.fine("No Port found in status file"); + return null; + } else { + logger.fine("Port defined in status file: " + portVal); } - return null; + final int port = Integer.parseInt(portVal); + final boolean success = isPingSuccessful(port, props.getProperty("secret.key")); + if ( success ) { + logger.fine("Successful PING on port " + port); + return port; + } + + final String pid = props.getProperty("pid"); + logger.fine("PID in status file is " + pid); + if ( pid != null ) { + final boolean procRunning = isProcessRunning(pid); + if ( procRunning ) { + return port; + } else { + return null; + } + } + + return null; } + private boolean isProcessRunning(final String pid) { + try { + // We use the "ps" command to check if the process is still running. + final ProcessBuilder builder = new ProcessBuilder(); + + builder.command("ps", "-p", pid, "--no-headers"); + final Process proc = builder.start(); + + // Read how many lines are output by the 'ps' command + int lineCount = 0; + String line; + try (final InputStream in = proc.getInputStream(); + final Reader streamReader = new InputStreamReader(in); + final BufferedReader reader = new BufferedReader(streamReader)) { + + while ((line = reader.readLine()) != null) { + if ( !line.trim().isEmpty() ) { + lineCount++; + } + } + } + + // If anything was output, the process is running. + final boolean running = lineCount > 0; + if ( running ) { + logger.fine("Process with PID " + pid + " is running"); + } else { + logger.fine("Process with PID " + pid + " is not running"); + } + + return running; + } catch (final IOException ioe) { + System.err.println("Failed to determine if Process " + pid + " is running; assuming that it is not"); + return false; + } + } + + + private Status getStatus() { + final Properties props; + try { + props = loadProperties(); + } catch (final IOException ioe) { + return new Status(null, null, false, false); + } + + if ( props == null ) { + return new Status(null, null, false, false); + } + + final String portValue = props.getProperty("port"); + final String pid = props.getProperty("pid"); + final String secretKey = props.getProperty("secret.key"); + + if ( portValue == null && pid == null ) { + return new Status(null, null, false, false); + } + + Integer port = null; + boolean pingSuccess = false; + if ( portValue != null ) { + try { + port = Integer.parseInt(portValue); + pingSuccess = isPingSuccessful(port, secretKey); + } catch (final NumberFormatException nfe) { + return new Status(null, null, false, false); + } + } + + if ( pingSuccess ) { + return new Status(port, pid, true, true); + } + + final boolean alive = (pid == null) ? false : isProcessRunning(pid); + return new Status(port, pid, pingSuccess, alive); + } + public void status() throws IOException { - final Integer port = getCurrentPort(); - if ( port == null ) { - System.out.println("Apache NiFi does not appear to be running"); - } else { - System.out.println("Apache NiFi is currently running, listening on port " + port); - } - return; + final Status status = getStatus(); + if ( status.isRespondingToPing() ) { + logger.info("Apache NiFi is currently running, listening to Bootstrap on port " + status.getPort() + + ", PID=" + (status.getPid() == null ? "unknkown" : status.getPid())); + return; + } + + if ( status.isProcessRunning() ) { + logger.info("Apache NiFi is running at PID " + status.getPid() + " but is not responding to ping requests"); + return; + } + + if ( status.getPort() == null ) { + logger.info("Apache NiFi is not running"); + return; + } + + if ( status.getPid() == null ) { + logger.info("Apache NiFi is not responding to Ping requests. The process may have died or may be hung"); + } else { + logger.info("Apache NiFi is not running"); + } } @@ -204,35 +398,112 @@ public class RunNiFi { return; } + final Properties nifiProps = loadProperties(); + final String secretKey = nifiProps.getProperty("secret.key"); + try (final Socket socket = new Socket()) { + logger.fine("Connecting to NiFi instance"); socket.setSoTimeout(60000); socket.connect(new InetSocketAddress("localhost", port)); + logger.fine("Established connection to NiFi instance."); socket.setSoTimeout(60000); + logger.fine("Sending SHUTDOWN Command to port " + port); final OutputStream out = socket.getOutputStream(); - out.write((SHUTDOWN_CMD + "\n").getBytes(StandardCharsets.UTF_8)); + out.write((SHUTDOWN_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); final InputStream in = socket.getInputStream(); final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); final String response = reader.readLine(); + + logger.fine("Received response to SHUTDOWN command: " + response); + if ( SHUTDOWN_CMD.equals(response) ) { - System.out.println("Apache NiFi has accepted the Shutdown Command and is shutting down now"); + logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now"); + + final String pid = nifiProps.getProperty("pid"); + if ( pid != null ) { + final Properties bootstrapProperties = new Properties(); + try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) { + bootstrapProperties.load(fis); + } + + String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE); + int gracefulShutdownSeconds; + try { + gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown); + } catch (final NumberFormatException nfe) { + gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE); + } + + final long startWait = System.nanoTime(); + while ( isProcessRunning(pid) ) { + logger.info("Waiting for Apache NiFi to finish shutting down..."); + final long waitNanos = System.nanoTime() - startWait; + final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); + if ( waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0 ) { + if ( isProcessRunning(pid) ) { + logger.warning("NiFi has not finished shutting down after " + gracefulShutdownSeconds + " seconds. Killing process."); + try { + killProcessTree(pid); + } catch (final IOException ioe) { + logger.severe("Failed to kill Process with PID " + pid); + } + } + break; + } else { + try { + Thread.sleep(2000L); + } catch (final InterruptedException ie) {} + } + } + + logger.info("NiFi has finished shutting down."); + } final File statusFile = getStatusFile(); if ( !statusFile.delete() ) { - System.err.println("Failed to delete status file " + statusFile + "; this file should be cleaned up manually"); + logger.severe("Failed to delete status file " + statusFile + "; this file should be cleaned up manually"); } } else { - System.err.println("When sending SHUTDOWN command to NiFi, got unexpected response " + response); + logger.severe("When sending SHUTDOWN command to NiFi, got unexpected response " + response); } } catch (final IOException ioe) { - System.err.println("Failed to communicate with Apache NiFi"); + logger.severe("Failed to send shutdown command to port " + port + " due to " + ioe); return; } } + private static List getChildProcesses(final String ppid) throws IOException { + final Process proc = Runtime.getRuntime().exec(new String[] {"ps", "-o", "pid", "--no-headers", "--ppid", ppid}); + final List childPids = new ArrayList<>(); + try (final InputStream in = proc.getInputStream(); + final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + + String line; + while ((line = reader.readLine()) != null) { + childPids.add(line.trim()); + } + } + + return childPids; + } + + private void killProcessTree(final String pid) throws IOException { + logger.fine("Killing Process Tree for PID " + pid); + + final List children = getChildProcesses(pid); + logger.fine("Children of PID " + pid + ": " + children); + + for ( final String childPid : children ) { + killProcessTree(childPid); + } + + Runtime.getRuntime().exec(new String[] {"kill", "-9", pid}); + } + public static boolean isAlive(final Process process) { try { process.exitValue(); @@ -246,7 +517,7 @@ public class RunNiFi { public void start(final boolean monitor) throws IOException, InterruptedException { final Integer port = getCurrentPort(); if ( port != null ) { - System.out.println("Apache NiFi is already running, listening on port " + port); + System.out.println("Apache NiFi is already running, listening to Bootstrap on port " + port); return; } @@ -344,7 +615,20 @@ public class RunNiFi { final NiFiListener listener = new NiFiListener(); final int listenPort = listener.start(this); + String runAs = isWindows() ? null : props.get(RUN_AS_PROP); + if ( runAs != null ) { + runAs = runAs.trim(); + if ( runAs.isEmpty() ) { + runAs = null; + } + } + final List cmd = new ArrayList<>(); + if ( runAs != null ) { + cmd.add("sudo"); + cmd.add("-u"); + cmd.add(runAs); + } cmd.add(javaCmd); cmd.add("-classpath"); cmd.add(classPath); @@ -361,9 +645,9 @@ public class RunNiFi { cmdBuilder.append(s).append(" "); } - System.out.println("Starting Apache NiFi..."); - System.out.println("Working Directory: " + workingDir.getAbsolutePath()); - System.out.println("Command: " + cmdBuilder.toString()); + logger.info("Starting Apache NiFi..."); + logger.info("Working Directory: " + workingDir.getAbsolutePath()); + logger.info("Command: " + cmdBuilder.toString()); if ( monitor ) { String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP); @@ -383,6 +667,13 @@ public class RunNiFi { } Process process = builder.start(); + Long pid = getPid(process); + if ( pid != null ) { + nifiPid = pid; + final Properties nifiProps = new Properties(); + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + saveProperties(nifiProps); + } ShutdownHook shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds); final Runtime runtime = Runtime.getRuntime(); @@ -404,18 +695,26 @@ public class RunNiFi { } if (autoRestartNiFi) { - System.out.println("Apache NiFi appears to have died. Restarting..."); + logger.warning("Apache NiFi appears to have died. Restarting..."); process = builder.start(); + pid = getPid(process); + if ( pid != null ) { + nifiPid = pid; + final Properties nifiProps = new Properties(); + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + saveProperties(nifiProps); + } + shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds); runtime.addShutdownHook(shutdownHook); final boolean started = waitForStart(); if ( started ) { - System.out.println("Successfully started Apache NiFi"); + logger.info("Successfully started Apache NiFi" + (pid == null ? "" : " with PID " + pid)); } else { - System.err.println("Apache NiFi does not appear to have started"); + logger.severe("Apache NiFi does not appear to have started"); } } else { return; @@ -423,13 +722,22 @@ public class RunNiFi { } } } else { - builder.start(); + final Process process = builder.start(); + final Long pid = getPid(process); + + if ( pid != null ) { + nifiPid = pid; + final Properties nifiProps = new Properties(); + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + saveProperties(nifiProps); + } + boolean started = waitForStart(); if ( started ) { - System.out.println("Successfully started Apache NiFi"); + logger.info("Successfully started Apache NiFi" + (pid == null ? "" : " with PID " + pid)); } else { - System.err.println("Apache NiFi does not appear to have started"); + logger.severe("Apache NiFi does not appear to have started"); } listener.stop(); @@ -437,6 +745,30 @@ public class RunNiFi { } + private Long getPid(final Process process) { + try { + final Class procClass = process.getClass(); + final Field pidField = procClass.getDeclaredField("pid"); + pidField.setAccessible(true); + final Object pidObject = pidField.get(process); + + logger.fine("PID Object = " + pidObject); + + if ( pidObject instanceof Number ) { + return ((Number) pidObject).longValue(); + } + return null; + } catch (final IllegalAccessException | NoSuchFieldException nsfe) { + logger.fine("Could not find PID for child process due to " + nsfe); + return null; + } + } + + private boolean isWindows() { + final String osName = System.getProperty("os.name"); + return osName != null && osName.toLowerCase().contains("win"); + } + private boolean waitForStart() { lock.lock(); try { @@ -478,21 +810,59 @@ public class RunNiFi { this.autoRestartNiFi = restart; } - void setNiFiCommandControlPort(final int port) { + void setNiFiCommandControlPort(final int port, final String secretKey) { this.ccPort = port; - final File statusFile = getStatusFile(); - try (final FileOutputStream fos = new FileOutputStream(statusFile)) { - fos.write(String.valueOf(port).getBytes(StandardCharsets.UTF_8)); - fos.getFD().sync(); + + final Properties nifiProps = new Properties(); + if ( nifiPid != -1 ) { + nifiProps.setProperty("pid", String.valueOf(nifiPid)); + } + nifiProps.setProperty("port", String.valueOf(ccPort)); + nifiProps.setProperty("secret.key", secretKey); + + try { + saveProperties(nifiProps); } catch (final IOException ioe) { - System.err.println("Apache NiFi has started but failed to persist NiFi Port information to " + statusFile.getAbsolutePath() + " due to " + ioe); + logger.warning("Apache NiFi has started but failed to persist NiFi Port information to " + statusFile.getAbsolutePath() + " due to " + ioe); } - System.out.println("Apache NiFi now running and listening for requests on port " + port); + logger.info("Apache NiFi now running and listening for Bootstrap requests on port " + port); } int getNiFiCommandControlPort() { return this.ccPort; } + + + private static class Status { + private final Integer port; + private final String pid; + + private final Boolean respondingToPing; + private final Boolean processRunning; + + public Status(final Integer port, final String pid, final Boolean respondingToPing, final Boolean processRunning) { + this.port = port; + this.pid = pid; + this.respondingToPing = respondingToPing; + this.processRunning = processRunning; + } + + public String getPid() { + return pid; + } + + public Integer getPort() { + return port; + } + + public boolean isRespondingToPing() { + return Boolean.TRUE.equals(respondingToPing); + } + + public boolean isProcessRunning() { + return Boolean.TRUE.equals(processRunning); + } + } } diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java new file mode 100644 index 0000000000..214934222c --- /dev/null +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap.util; + +import java.io.IOException; +import java.io.InputStream; + +public class LimitingInputStream extends InputStream { + + private final InputStream in; + private final long limit; + private long bytesRead = 0; + + public LimitingInputStream(final InputStream in, final long limit) { + this.in = in; + this.limit = limit; + } + + @Override + public int read() throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int val = in.read(); + if (val > -1) { + bytesRead++; + } + return val; + } + + @Override + public int read(final byte[] b) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(b.length, limit - bytesRead); + + final int val = in.read(b, 0, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(len, limit - bytesRead); + + final int val = in.read(b, off, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public long skip(final long n) throws IOException { + final long skipped = in.skip(Math.min(n, limit - bytesRead)); + bytesRead += skipped; + return skipped; + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public void mark(int readlimit) { + in.mark(readlimit); + } + + @Override + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public void reset() throws IOException { + in.reset(); + } +}