From 646570490c530d0c076c9bd9b7d1170946a9dae8 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 9 Dec 2014 12:18:35 -0500 Subject: [PATCH] NIFI-145 --- .../org/apache/nifi/BootstrapListener.java | 229 +++++++++++++++ .../src/main/java/org/apache/nifi/NiFi.java | 25 ++ nifi-bootstrap/pom.xml | 17 +- .../apache/nifi/bootstrap/BootstrapCodec.java | 89 ++++++ .../apache/nifi/bootstrap/NiFiListener.java | 116 ++++++++ .../org/apache/nifi/bootstrap/RunNiFi.java | 272 ++++++++++++++++-- .../apache/nifi/bootstrap/ShutdownHook.java | 59 +++- .../exception/InvalidCommandException.java | 37 +++ 8 files changed, 825 insertions(+), 19 deletions(-) create mode 100644 nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java create mode 100644 nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java create mode 100644 nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java create mode 100644 nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java new file mode 100644 index 0000000000..3bcbeb35ee --- /dev/null +++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BootstrapListener { + private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class); + + private final NiFi nifi; + private final int bootstrapPort; + + private Listener listener; + private ServerSocket serverSocket; + + + public BootstrapListener(final NiFi nifi, final int port) { + this.nifi = nifi; + this.bootstrapPort = port; + } + + public void start() throws IOException { + logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", bootstrapPort); + + serverSocket = new ServerSocket(); + serverSocket.bind(new InetSocketAddress("localhost", 0)); + + final int localPort = serverSocket.getLocalPort(); + logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort); + + listener = new Listener(serverSocket); + final Thread listenThread = new Thread(listener); + listenThread.setName("Listen to Bootstrap"); + listenThread.start(); + + logger.debug("Notifying Bootstrap that local port is {}", localPort); + try (final Socket socket = new Socket()) { + socket.setSoTimeout(60000); + socket.connect(new InetSocketAddress("localhost", bootstrapPort)); + socket.setSoTimeout(60000); + + final OutputStream out = socket.getOutputStream(); + out.write(("PORT " + localPort + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + logger.debug("Awaiting response from Bootstrap..."); + final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + final String response = reader.readLine(); + if ("OK".equals(response)) { + logger.info("Successfully initiated communication with Bootstrap"); + } else { + logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from NiFi"); + } + } + } + + + public void stop() { + if (listener != null) { + listener.stop(); + } + } + + private class Listener implements Runnable { + private final ServerSocket serverSocket; + private final ExecutorService executor; + private volatile boolean stopped = false; + + public Listener(final ServerSocket serverSocket) { + this.serverSocket = serverSocket; + this.executor = Executors.newFixedThreadPool(2); + } + + public void stop() { + stopped = true; + + executor.shutdownNow(); + + try { + serverSocket.close(); + } catch (final IOException ioe) { + // nothing to really do here. we could log this, but it would just become + // confusing in the logs, as we're shutting down and there's no real benefit + } + } + + @Override + public void run() { + while (!serverSocket.isClosed()) { + try { + if ( stopped ) { + return; + } + + final Socket socket; + try { + socket = serverSocket.accept(); + } catch (final IOException ioe) { + if ( stopped ) { + return; + } + + throw ioe; + } + + executor.submit(new Runnable() { + @Override + public void run() { + try { + final BootstrapRequest request = readRequest(socket.getInputStream()); + final BootstrapRequest.RequestType requestType = request.getRequestType(); + + switch (requestType) { + case PING: + logger.debug("Received PING request from Bootstrap; responding"); + echoPing(socket.getOutputStream()); + logger.debug("Responded to PING request from Bootstrap"); + break; + case SHUTDOWN: + logger.info("Received SHUTDOWN request from Bootstrap"); + echoShutdown(socket.getOutputStream()); + nifi.shutdownHook(); + return; + } + } catch (final Throwable t) { + logger.error("Failed to process request from Bootstrap due to " + t.toString(), t); + } finally { + try { + socket.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close socket to Bootstrap due to {}", ioe.toString()); + } + } + } + }); + } catch (final Throwable t) { + logger.error("Failed to process request from Bootstrap due to " + t.toString(), t); + } + } + } + } + + + private void echoPing(final OutputStream out) throws IOException { + out.write("PING\n".getBytes(StandardCharsets.UTF_8)); + out.flush(); + } + + private void echoShutdown(final OutputStream out) throws IOException { + out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8)); + out.flush(); + } + + private BootstrapRequest readRequest(final InputStream in) throws IOException { + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + + final String line = reader.readLine(); + final String[] splits = line.split(" "); + if ( splits.length < 0 ) { + throw new IOException("Received invalid command from NiFi: " + line); + } + + final String requestType = splits[0]; + final String[] args; + if ( splits.length == 1 ) { + args = new String[0]; + } else { + args = Arrays.copyOfRange(splits, 1, splits.length); + } + + try { + return new BootstrapRequest(requestType, args); + } catch (final Exception e) { + throw new IOException("Received invalid request from bootstrap; request type = " + requestType); + } + } + + + private static class BootstrapRequest { + public static enum RequestType { + SHUTDOWN, + PING; + } + + private final RequestType requestType; + private final String[] args; + + public BootstrapRequest(final String request, final String[] args) { + this.requestType = RequestType.valueOf(request); + this.args = args; + } + + public RequestType getRequestType() { + return requestType; + } + + public String[] getArgs() { + return args; + } + } +} diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java index 5fd1a138be..bf50a218d8 100644 --- a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java +++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java @@ -45,6 +45,9 @@ public class NiFi { private static final Logger logger = LoggerFactory.getLogger(NiFi.class); private final NiFiServer nifiServer; + private final BootstrapListener bootstrapListener; + + public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port"; public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { @@ -65,6 +68,25 @@ public class NiFi { } })); + final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY); + if ( bootstrapPort != null ) { + try { + final int port = Integer.parseInt(bootstrapPort); + + if (port < 1 || port > 65535) { + throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); + } + + bootstrapListener = new BootstrapListener(this, port); + bootstrapListener.start(); + } catch (final NumberFormatException nfe) { + throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); + } + } else { + logger.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap"); + bootstrapListener = null; + } + // delete the web working dir - if the application does not start successfully // the web app directories might be in an invalid state. when this happens // jetty will not attempt to re-extract the war into the directory. by removing @@ -115,6 +137,9 @@ public class NiFi { if (nifiServer != null) { nifiServer.stop(); } + if (bootstrapListener != null) { + bootstrapListener.stop(); + } logger.info("Jetty web server shutdown completed (nicely or otherwise)."); } catch (final Throwable t) { logger.warn("Problem occured ensuring Jetty web server was properly terminated due to " + t); diff --git a/nifi-bootstrap/pom.xml b/nifi-bootstrap/pom.xml index b620c84a78..a992018d94 100644 --- a/nifi-bootstrap/pom.xml +++ b/nifi-bootstrap/pom.xml @@ -1,5 +1,18 @@ - + + 4.0.0 diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java new file mode 100644 index 0000000000..8138c02076 --- /dev/null +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.Arrays; + +import org.apache.nifi.bootstrap.exception.InvalidCommandException; + +public class BootstrapCodec { + private final RunNiFi runner; + private final BufferedReader reader; + private final BufferedWriter writer; + + public BootstrapCodec(final RunNiFi runner, final InputStream in, final OutputStream out) { + this.runner = runner; + this.reader = new BufferedReader(new InputStreamReader(in)); + this.writer = new BufferedWriter(new OutputStreamWriter(out)); + } + + public void communicate() throws IOException { + final String line = reader.readLine(); + final String[] splits = line.split(" "); + if ( splits.length < 0 ) { + throw new IOException("Received invalid command from NiFi: " + line); + } + + final String cmd = splits[0]; + final String[] args; + if ( splits.length == 1 ) { + args = new String[0]; + } else { + args = Arrays.copyOfRange(splits, 1, splits.length); + } + + try { + processRequest(cmd, args); + } catch (final InvalidCommandException ice) { + throw new IOException("Received invalid command from NiFi: " + line + " : " + ice.getMessage() == null ? "" : "Details: " + ice.toString()); + } + } + + private void processRequest(final String cmd, final String[] args) throws InvalidCommandException, IOException { + switch (cmd) { + case "PORT": { + if ( args.length != 1 ) { + throw new InvalidCommandException(); + } + + final int port; + try { + port = Integer.parseInt( args[0] ); + } catch (final NumberFormatException nfe) { + throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535"); + } + + if ( port < 1 || port > 65535 ) { + throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535"); + } + + runner.setNiFiCommandControlPort(port); + writer.write("OK"); + writer.newLine(); + writer.flush(); + } + break; + } + } +} diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java new file mode 100644 index 0000000000..c83135199f --- /dev/null +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class NiFiListener { + private ServerSocket serverSocket; + private volatile Listener listener; + + int start(final RunNiFi runner) throws IOException { + serverSocket = new ServerSocket(); + serverSocket.bind(new InetSocketAddress("localhost", 0)); + + final int localPort = serverSocket.getLocalPort(); + listener = new Listener(serverSocket, runner); + final Thread listenThread = new Thread(listener); + listenThread.setName("Listen to NiFi"); + listenThread.start(); + return localPort; + } + + public void stop() throws IOException { + final Listener listener = this.listener; + if ( listener == null ) { + return; + } + + listener.stop(); + } + + private class Listener implements Runnable { + private final ServerSocket serverSocket; + private final ExecutorService executor; + private final RunNiFi runner; + private volatile boolean stopped = false; + + public Listener(final ServerSocket serverSocket, final RunNiFi runner) { + this.serverSocket = serverSocket; + this.executor = Executors.newFixedThreadPool(2); + this.runner = runner; + } + + public void stop() throws IOException { + stopped = true; + + executor.shutdown(); + try { + executor.awaitTermination(3, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + } + + serverSocket.close(); + } + + @Override + public void run() { + while (!serverSocket.isClosed()) { + try { + if ( stopped ) { + return; + } + + final Socket socket; + try { + socket = serverSocket.accept(); + } catch (final IOException ioe) { + if ( stopped ) { + return; + } + + throw ioe; + } + + + executor.submit(new Runnable() { + @Override + public void run() { + try { + final BootstrapCodec codec = new BootstrapCodec(runner, socket.getInputStream(), socket.getOutputStream()); + codec.communicate(); + socket.close(); + } catch (final Throwable t) { + System.out.println("Failed to communicate with NiFi due to " + t); + t.printStackTrace(); + } + } + }); + } catch (final Throwable t) { + System.err.println("Failed to receive information from NiFi due to " + t); + t.printStackTrace(); + } + } + } + } +} diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index afa1f4713e..ea3e5661bf 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -1,16 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.bootstrap; +import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** @@ -18,7 +47,6 @@ import java.util.Properties; * * This class looks for the bootstrap.conf file by looking in the following places (in order): *
    - *
  1. First argument to the program
  2. *
  3. Java System Property named {@code org.apache.nifi.bootstrap.config.file}
  4. *
  5. ${NIFI_HOME}/./conf/bootstrap.conf, where ${NIFI_HOME} references an environment variable {@code NIFI_HOME}
  6. *
  7. ./conf/bootstrap.conf, where {@code .} represents the working directory. @@ -29,12 +57,57 @@ import java.util.Properties; public class RunNiFi { public static final String DEFAULT_CONFIG_FILE = "./conf/boostrap.conf"; public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties"; - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static void main(final String[] args) throws IOException, InterruptedException { - final ProcessBuilder builder = new ProcessBuilder(); - String configFilename = (args.length > 0) ? args[0] : System.getProperty("org.apache.nifi.boostrap.config.file"); + public static final int MAX_RESTART_ATTEMPTS = 5; + public static final int STARTUP_WAIT_SECONDS = 60; + + public static final String SHUTDOWN_CMD = "SHUTDOWN"; + public static final String PING_CMD = "PING"; + + private volatile boolean autoRestartNiFi = true; + private volatile int ccPort = -1; + + private final Lock lock = new ReentrantLock(); + private final Condition startupCondition = lock.newCondition(); + + private final File bootstrapConfigFile; + + public RunNiFi(final File bootstrapConfigFile) { + this.bootstrapConfigFile = bootstrapConfigFile; + } + + private static void printUsage() { + System.out.println("Usage:"); + System.out.println(); + System.out.println("java org.apache.nifi.bootstrap.RunNiFi "); + System.out.println(); + System.out.println("Valid commands include:"); + System.out.println(""); + System.out.println("Start : Start a new instance of Apache NiFi"); + System.out.println("Stop : Stop a running instance of Apache NiFi"); + System.out.println("Status : Determine if there is a running instance of Apache NiFi"); + System.out.println(); + } + + public static void main(final String[] args) throws IOException, InterruptedException { + if ( args.length != 1 ) { + printUsage(); + return; + } + + switch (args[0].toLowerCase()) { + case "start": + case "stop": + case "status": + break; + default: + System.out.println("Invalid argument: " + args[0]); + System.out.println(); + printUsage(); + return; + } + + String configFilename = System.getProperty("org.apache.nifi.boostrap.config.file"); if ( configFilename == null ) { final String nifiHome = System.getenv("NIFI_HOME"); @@ -50,12 +123,122 @@ public class RunNiFi { } final File configFile = new File(configFilename); - if ( !configFile.exists() ) { + + final RunNiFi runNiFi = new RunNiFi(configFile); + + switch (args[0].toLowerCase()) { + case "start": + runNiFi.start(); + break; + case "stop": + runNiFi.stop(); + break; + case "status": + runNiFi.status(); + break; + } + } + + + public File getStatusFile() { + final File rootDir = bootstrapConfigFile.getParentFile(); + final File statusFile = new File(rootDir, "nifi.port"); + return statusFile; + } + + private Integer getCurrentPort() throws IOException { + try { + final File statusFile = getStatusFile(); + final byte[] info = Files.readAllBytes(statusFile.toPath()); + final String text = new String(info); + + final int port = Integer.parseInt(text); + + try (final Socket socket = new Socket("localhost", port)) { + final OutputStream out = socket.getOutputStream(); + out.write((PING_CMD + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + final InputStream in = socket.getInputStream(); + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + final String response = reader.readLine(); + if ( response.equals(PING_CMD) ) { + return port; + } + } catch (final IOException ioe) { + System.out.println("Found NiFi instance info at " + statusFile + " but information appears to be stale. Removing file."); + if ( !statusFile.delete() ) { + System.err.println("Unable to remove status file"); + } + + throw ioe; + } + } catch (final Exception e) { + return null; + } + + return null; + } + + + public void status() throws IOException { + final Integer port = getCurrentPort(); + if ( port == null ) { + System.out.println("Apache NiFi does not appear to be running"); + } else { + System.out.println("Apache NiFi is currently running, listening on port " + port); + } + return; + } + + + public void stop() throws IOException { + final Integer port = getCurrentPort(); + if ( port == null ) { + System.out.println("Apache NiFi is not currently running"); + return; + } + + try (final Socket socket = new Socket()) { + socket.setSoTimeout(60000); + socket.connect(new InetSocketAddress("localhost", port)); + socket.setSoTimeout(60000); + + final OutputStream out = socket.getOutputStream(); + out.write((SHUTDOWN_CMD + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + final InputStream in = socket.getInputStream(); + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + final String response = reader.readLine(); + if ( SHUTDOWN_CMD.equals(response) ) { + System.out.println("Apache NiFi has accepted the Shutdown Command and is shutting down now"); + } else { + System.err.println("When sending SHUTDOWN command to NiFi, got unexpected response " + response); + } + } catch (final IOException ioe) { + System.err.println("Failed to communicate with Apache NiFi"); + return; + } + } + + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void start() throws IOException, InterruptedException { + final Integer port = getCurrentPort(); + if ( port != null ) { + System.out.println("Apache NiFi is already running, listening on port " + port); + return; + } + + final ProcessBuilder builder = new ProcessBuilder(); + + if ( !bootstrapConfigFile.exists() ) { throw new FileNotFoundException(DEFAULT_CONFIG_FILE); } final Properties properties = new Properties(); - try (final FileInputStream fis = new FileInputStream(configFile)) { + try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) { properties.load(fis); } @@ -136,32 +319,67 @@ public class RunNiFi { javaCmd = "java"; } + final NiFiListener listener = new NiFiListener(); + final int listenPort = listener.start(this); + final List cmd = new ArrayList<>(); cmd.add(javaCmd); cmd.add("-classpath"); cmd.add(classPath); cmd.addAll(javaAdditionalArgs); cmd.add("-Dnifi.properties.file.path=" + nifiPropsFilename); + cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort); cmd.add("org.apache.nifi.NiFi"); - builder.command(cmd).inheritIO(); + builder.command(cmd); final StringBuilder cmdBuilder = new StringBuilder(); for ( final String s : cmd ) { cmdBuilder.append(s).append(" "); } + System.out.println("Starting Apache NiFi..."); System.out.println("Working Directory: " + workingDir.getAbsolutePath()); System.out.println("Command: " + cmdBuilder.toString()); - - final Process proc = builder.start(); - Runtime.getRuntime().addShutdownHook(new ShutdownHook(proc)); - final int statusCode = proc.waitFor(); - System.out.println("Apache NiFi exited with Status Code " + statusCode); + + builder.start(); + boolean started = waitForStart(); + + if ( started ) { + System.out.println("Successfully started Apache NiFi"); + } else { + System.err.println("Apache NiFi does not appear to have started"); + } + + listener.stop(); } - private static File getFile(final String filename, final File workingDir) { + private boolean waitForStart() { + lock.lock(); + try { + final long startTime = System.nanoTime(); + + while ( ccPort < 1 ) { + try { + startupCondition.await(1, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + return false; + } + + final long waitNanos = System.nanoTime() - startTime; + final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); + if (waitSeconds > STARTUP_WAIT_SECONDS) { + return false; + } + } + } finally { + lock.unlock(); + } + return true; + } + + private File getFile(final String filename, final File workingDir) { File libDir = new File(filename); if ( !libDir.isAbsolute() ) { libDir = new File(workingDir, filename); @@ -170,7 +388,29 @@ public class RunNiFi { return libDir; } - private static String replaceNull(final String value, final String replacement) { + private String replaceNull(final String value, final String replacement) { return (value == null) ? replacement : value; } + + void setAutoRestartNiFi(final boolean restart) { + this.autoRestartNiFi = restart; + } + + void setNiFiCommandControlPort(final int port) { + this.ccPort = port; + + final File statusFile = getStatusFile(); + try (final FileOutputStream fos = new FileOutputStream(statusFile)) { + fos.write(String.valueOf(port).getBytes(StandardCharsets.UTF_8)); + fos.getFD().sync(); + } catch (final IOException ioe) { + System.err.println("Apache NiFi has started but failed to persist NiFi Port information to " + statusFile.getAbsolutePath() + " due to " + ioe); + } + + System.out.println("Apache NiFi now running and listening for requests on port " + port); + } + + int getNiFiCommandControlPort() { + return this.ccPort; + } } diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java index 55e1f457d2..142d984548 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java @@ -1,14 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.bootstrap; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + public class ShutdownHook extends Thread { private final Process nifiProcess; + private final RunNiFi runner; - public ShutdownHook(final Process nifiProcess) { + public static final int WAIT_SECONDS = 10; + + public ShutdownHook(final Process nifiProcess, final RunNiFi runner) { this.nifiProcess = nifiProcess; + this.runner = runner; } @Override public void run() { + runner.setAutoRestartNiFi(false); + final int ccPort = runner.getNiFiCommandControlPort(); + if ( ccPort > 0 ) { + System.out.println("Initiating Shutdown of NiFi..."); + + try { + final Socket socket = new Socket("localhost", ccPort); + final OutputStream out = socket.getOutputStream(); + out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8)); + out.flush(); + + socket.close(); + } catch (final IOException ioe) { + System.out.println("Failed to Shutdown NiFi due to " + ioe); + } + } + + try { + nifiProcess.waitFor(WAIT_SECONDS, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + } + + if ( nifiProcess.isAlive() ) { + System.out.println("NiFi has not finished shutting down after " + WAIT_SECONDS + " seconds. Killing process."); + } nifiProcess.destroy(); + + final File statusFile = runner.getStatusFile(); + if ( !statusFile.delete() ) { + System.err.println("Failed to delete status file " + statusFile.getAbsolutePath()); + } } } diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java new file mode 100644 index 0000000000..962aa1c39d --- /dev/null +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap.exception; + +public class InvalidCommandException extends Exception { + private static final long serialVersionUID = 1L; + + public InvalidCommandException() { + super(); + } + + public InvalidCommandException(final String message) { + super(message); + } + + public InvalidCommandException(final Throwable t) { + super(t); + } + + public InvalidCommandException(final String message, final Throwable t) { + super(message, t); + } +}