This commit is contained in:
Mark Payne 2014-12-09 12:18:35 -05:00
parent cb63c66602
commit 646570490c
8 changed files with 825 additions and 19 deletions

View File

@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BootstrapListener {
private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class);
private final NiFi nifi;
private final int bootstrapPort;
private Listener listener;
private ServerSocket serverSocket;
public BootstrapListener(final NiFi nifi, final int port) {
this.nifi = nifi;
this.bootstrapPort = port;
}
public void start() throws IOException {
logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", bootstrapPort);
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("localhost", 0));
final int localPort = serverSocket.getLocalPort();
logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort);
listener = new Listener(serverSocket);
final Thread listenThread = new Thread(listener);
listenThread.setName("Listen to Bootstrap");
listenThread.start();
logger.debug("Notifying Bootstrap that local port is {}", localPort);
try (final Socket socket = new Socket()) {
socket.setSoTimeout(60000);
socket.connect(new InetSocketAddress("localhost", bootstrapPort));
socket.setSoTimeout(60000);
final OutputStream out = socket.getOutputStream();
out.write(("PORT " + localPort + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
logger.debug("Awaiting response from Bootstrap...");
final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
final String response = reader.readLine();
if ("OK".equals(response)) {
logger.info("Successfully initiated communication with Bootstrap");
} else {
logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from NiFi");
}
}
}
public void stop() {
if (listener != null) {
listener.stop();
}
}
private class Listener implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService executor;
private volatile boolean stopped = false;
public Listener(final ServerSocket serverSocket) {
this.serverSocket = serverSocket;
this.executor = Executors.newFixedThreadPool(2);
}
public void stop() {
stopped = true;
executor.shutdownNow();
try {
serverSocket.close();
} catch (final IOException ioe) {
// nothing to really do here. we could log this, but it would just become
// confusing in the logs, as we're shutting down and there's no real benefit
}
}
@Override
public void run() {
while (!serverSocket.isClosed()) {
try {
if ( stopped ) {
return;
}
final Socket socket;
try {
socket = serverSocket.accept();
} catch (final IOException ioe) {
if ( stopped ) {
return;
}
throw ioe;
}
executor.submit(new Runnable() {
@Override
public void run() {
try {
final BootstrapRequest request = readRequest(socket.getInputStream());
final BootstrapRequest.RequestType requestType = request.getRequestType();
switch (requestType) {
case PING:
logger.debug("Received PING request from Bootstrap; responding");
echoPing(socket.getOutputStream());
logger.debug("Responded to PING request from Bootstrap");
break;
case SHUTDOWN:
logger.info("Received SHUTDOWN request from Bootstrap");
echoShutdown(socket.getOutputStream());
nifi.shutdownHook();
return;
}
} catch (final Throwable t) {
logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
} finally {
try {
socket.close();
} catch (final IOException ioe) {
logger.warn("Failed to close socket to Bootstrap due to {}", ioe.toString());
}
}
}
});
} catch (final Throwable t) {
logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
}
}
}
}
private void echoPing(final OutputStream out) throws IOException {
out.write("PING\n".getBytes(StandardCharsets.UTF_8));
out.flush();
}
private void echoShutdown(final OutputStream out) throws IOException {
out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
out.flush();
}
private BootstrapRequest readRequest(final InputStream in) throws IOException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
final String line = reader.readLine();
final String[] splits = line.split(" ");
if ( splits.length < 0 ) {
throw new IOException("Received invalid command from NiFi: " + line);
}
final String requestType = splits[0];
final String[] args;
if ( splits.length == 1 ) {
args = new String[0];
} else {
args = Arrays.copyOfRange(splits, 1, splits.length);
}
try {
return new BootstrapRequest(requestType, args);
} catch (final Exception e) {
throw new IOException("Received invalid request from bootstrap; request type = " + requestType);
}
}
private static class BootstrapRequest {
public static enum RequestType {
SHUTDOWN,
PING;
}
private final RequestType requestType;
private final String[] args;
public BootstrapRequest(final String request, final String[] args) {
this.requestType = RequestType.valueOf(request);
this.args = args;
}
public RequestType getRequestType() {
return requestType;
}
public String[] getArgs() {
return args;
}
}
}

View File

@ -45,6 +45,9 @@ public class NiFi {
private static final Logger logger = LoggerFactory.getLogger(NiFi.class); private static final Logger logger = LoggerFactory.getLogger(NiFi.class);
private final NiFiServer nifiServer; private final NiFiServer nifiServer;
private final BootstrapListener bootstrapListener;
public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@ -65,6 +68,25 @@ public class NiFi {
} }
})); }));
final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
if ( bootstrapPort != null ) {
try {
final int port = Integer.parseInt(bootstrapPort);
if (port < 1 || port > 65535) {
throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
bootstrapListener = new BootstrapListener(this, port);
bootstrapListener.start();
} catch (final NumberFormatException nfe) {
throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
} else {
logger.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
bootstrapListener = null;
}
// delete the web working dir - if the application does not start successfully // delete the web working dir - if the application does not start successfully
// the web app directories might be in an invalid state. when this happens // the web app directories might be in an invalid state. when this happens
// jetty will not attempt to re-extract the war into the directory. by removing // jetty will not attempt to re-extract the war into the directory. by removing
@ -115,6 +137,9 @@ public class NiFi {
if (nifiServer != null) { if (nifiServer != null) {
nifiServer.stop(); nifiServer.stop();
} }
if (bootstrapListener != null) {
bootstrapListener.stop();
}
logger.info("Jetty web server shutdown completed (nicely or otherwise)."); logger.info("Jetty web server shutdown completed (nicely or otherwise).");
} catch (final Throwable t) { } catch (final Throwable t) {
logger.warn("Problem occured ensuring Jetty web server was properly terminated due to " + t); logger.warn("Problem occured ensuring Jetty web server was properly terminated due to " + t);

View File

@ -1,5 +1,18 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Arrays;
import org.apache.nifi.bootstrap.exception.InvalidCommandException;
public class BootstrapCodec {
private final RunNiFi runner;
private final BufferedReader reader;
private final BufferedWriter writer;
public BootstrapCodec(final RunNiFi runner, final InputStream in, final OutputStream out) {
this.runner = runner;
this.reader = new BufferedReader(new InputStreamReader(in));
this.writer = new BufferedWriter(new OutputStreamWriter(out));
}
public void communicate() throws IOException {
final String line = reader.readLine();
final String[] splits = line.split(" ");
if ( splits.length < 0 ) {
throw new IOException("Received invalid command from NiFi: " + line);
}
final String cmd = splits[0];
final String[] args;
if ( splits.length == 1 ) {
args = new String[0];
} else {
args = Arrays.copyOfRange(splits, 1, splits.length);
}
try {
processRequest(cmd, args);
} catch (final InvalidCommandException ice) {
throw new IOException("Received invalid command from NiFi: " + line + " : " + ice.getMessage() == null ? "" : "Details: " + ice.toString());
}
}
private void processRequest(final String cmd, final String[] args) throws InvalidCommandException, IOException {
switch (cmd) {
case "PORT": {
if ( args.length != 1 ) {
throw new InvalidCommandException();
}
final int port;
try {
port = Integer.parseInt( args[0] );
} catch (final NumberFormatException nfe) {
throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
}
if ( port < 1 || port > 65535 ) {
throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
}
runner.setNiFiCommandControlPort(port);
writer.write("OK");
writer.newLine();
writer.flush();
}
break;
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class NiFiListener {
private ServerSocket serverSocket;
private volatile Listener listener;
int start(final RunNiFi runner) throws IOException {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("localhost", 0));
final int localPort = serverSocket.getLocalPort();
listener = new Listener(serverSocket, runner);
final Thread listenThread = new Thread(listener);
listenThread.setName("Listen to NiFi");
listenThread.start();
return localPort;
}
public void stop() throws IOException {
final Listener listener = this.listener;
if ( listener == null ) {
return;
}
listener.stop();
}
private class Listener implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService executor;
private final RunNiFi runner;
private volatile boolean stopped = false;
public Listener(final ServerSocket serverSocket, final RunNiFi runner) {
this.serverSocket = serverSocket;
this.executor = Executors.newFixedThreadPool(2);
this.runner = runner;
}
public void stop() throws IOException {
stopped = true;
executor.shutdown();
try {
executor.awaitTermination(3, TimeUnit.SECONDS);
} catch (final InterruptedException ie) {
}
serverSocket.close();
}
@Override
public void run() {
while (!serverSocket.isClosed()) {
try {
if ( stopped ) {
return;
}
final Socket socket;
try {
socket = serverSocket.accept();
} catch (final IOException ioe) {
if ( stopped ) {
return;
}
throw ioe;
}
executor.submit(new Runnable() {
@Override
public void run() {
try {
final BootstrapCodec codec = new BootstrapCodec(runner, socket.getInputStream(), socket.getOutputStream());
codec.communicate();
socket.close();
} catch (final Throwable t) {
System.out.println("Failed to communicate with NiFi due to " + t);
t.printStackTrace();
}
}
});
} catch (final Throwable t) {
System.err.println("Failed to receive information from NiFi due to " + t);
t.printStackTrace();
}
}
}
}
}

View File

@ -1,16 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap; package org.apache.nifi.bootstrap;
import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** /**
@ -18,7 +47,6 @@ import java.util.Properties;
* *
* This class looks for the bootstrap.conf file by looking in the following places (in order): * This class looks for the bootstrap.conf file by looking in the following places (in order):
* <ol> * <ol>
* <li>First argument to the program</li>
* <li>Java System Property named {@code org.apache.nifi.bootstrap.config.file}</li> * <li>Java System Property named {@code org.apache.nifi.bootstrap.config.file}</li>
* <li>${NIFI_HOME}/./conf/bootstrap.conf, where ${NIFI_HOME} references an environment variable {@code NIFI_HOME}</li> * <li>${NIFI_HOME}/./conf/bootstrap.conf, where ${NIFI_HOME} references an environment variable {@code NIFI_HOME}</li>
* <li>./conf/bootstrap.conf, where {@code .} represents the working directory. * <li>./conf/bootstrap.conf, where {@code .} represents the working directory.
@ -30,11 +58,56 @@ public class RunNiFi {
public static final String DEFAULT_CONFIG_FILE = "./conf/boostrap.conf"; public static final String DEFAULT_CONFIG_FILE = "./conf/boostrap.conf";
public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties"; public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties";
@SuppressWarnings({ "rawtypes", "unchecked" }) public static final int MAX_RESTART_ATTEMPTS = 5;
public static void main(final String[] args) throws IOException, InterruptedException { public static final int STARTUP_WAIT_SECONDS = 60;
final ProcessBuilder builder = new ProcessBuilder();
String configFilename = (args.length > 0) ? args[0] : System.getProperty("org.apache.nifi.boostrap.config.file"); public static final String SHUTDOWN_CMD = "SHUTDOWN";
public static final String PING_CMD = "PING";
private volatile boolean autoRestartNiFi = true;
private volatile int ccPort = -1;
private final Lock lock = new ReentrantLock();
private final Condition startupCondition = lock.newCondition();
private final File bootstrapConfigFile;
public RunNiFi(final File bootstrapConfigFile) {
this.bootstrapConfigFile = bootstrapConfigFile;
}
private static void printUsage() {
System.out.println("Usage:");
System.out.println();
System.out.println("java org.apache.nifi.bootstrap.RunNiFi <command>");
System.out.println();
System.out.println("Valid commands include:");
System.out.println("");
System.out.println("Start : Start a new instance of Apache NiFi");
System.out.println("Stop : Stop a running instance of Apache NiFi");
System.out.println("Status : Determine if there is a running instance of Apache NiFi");
System.out.println();
}
public static void main(final String[] args) throws IOException, InterruptedException {
if ( args.length != 1 ) {
printUsage();
return;
}
switch (args[0].toLowerCase()) {
case "start":
case "stop":
case "status":
break;
default:
System.out.println("Invalid argument: " + args[0]);
System.out.println();
printUsage();
return;
}
String configFilename = System.getProperty("org.apache.nifi.boostrap.config.file");
if ( configFilename == null ) { if ( configFilename == null ) {
final String nifiHome = System.getenv("NIFI_HOME"); final String nifiHome = System.getenv("NIFI_HOME");
@ -50,12 +123,122 @@ public class RunNiFi {
} }
final File configFile = new File(configFilename); final File configFile = new File(configFilename);
if ( !configFile.exists() ) {
final RunNiFi runNiFi = new RunNiFi(configFile);
switch (args[0].toLowerCase()) {
case "start":
runNiFi.start();
break;
case "stop":
runNiFi.stop();
break;
case "status":
runNiFi.status();
break;
}
}
public File getStatusFile() {
final File rootDir = bootstrapConfigFile.getParentFile();
final File statusFile = new File(rootDir, "nifi.port");
return statusFile;
}
private Integer getCurrentPort() throws IOException {
try {
final File statusFile = getStatusFile();
final byte[] info = Files.readAllBytes(statusFile.toPath());
final String text = new String(info);
final int port = Integer.parseInt(text);
try (final Socket socket = new Socket("localhost", port)) {
final OutputStream out = socket.getOutputStream();
out.write((PING_CMD + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
final InputStream in = socket.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
final String response = reader.readLine();
if ( response.equals(PING_CMD) ) {
return port;
}
} catch (final IOException ioe) {
System.out.println("Found NiFi instance info at " + statusFile + " but information appears to be stale. Removing file.");
if ( !statusFile.delete() ) {
System.err.println("Unable to remove status file");
}
throw ioe;
}
} catch (final Exception e) {
return null;
}
return null;
}
public void status() throws IOException {
final Integer port = getCurrentPort();
if ( port == null ) {
System.out.println("Apache NiFi does not appear to be running");
} else {
System.out.println("Apache NiFi is currently running, listening on port " + port);
}
return;
}
public void stop() throws IOException {
final Integer port = getCurrentPort();
if ( port == null ) {
System.out.println("Apache NiFi is not currently running");
return;
}
try (final Socket socket = new Socket()) {
socket.setSoTimeout(60000);
socket.connect(new InetSocketAddress("localhost", port));
socket.setSoTimeout(60000);
final OutputStream out = socket.getOutputStream();
out.write((SHUTDOWN_CMD + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
final InputStream in = socket.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
final String response = reader.readLine();
if ( SHUTDOWN_CMD.equals(response) ) {
System.out.println("Apache NiFi has accepted the Shutdown Command and is shutting down now");
} else {
System.err.println("When sending SHUTDOWN command to NiFi, got unexpected response " + response);
}
} catch (final IOException ioe) {
System.err.println("Failed to communicate with Apache NiFi");
return;
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void start() throws IOException, InterruptedException {
final Integer port = getCurrentPort();
if ( port != null ) {
System.out.println("Apache NiFi is already running, listening on port " + port);
return;
}
final ProcessBuilder builder = new ProcessBuilder();
if ( !bootstrapConfigFile.exists() ) {
throw new FileNotFoundException(DEFAULT_CONFIG_FILE); throw new FileNotFoundException(DEFAULT_CONFIG_FILE);
} }
final Properties properties = new Properties(); final Properties properties = new Properties();
try (final FileInputStream fis = new FileInputStream(configFile)) { try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
properties.load(fis); properties.load(fis);
} }
@ -136,32 +319,67 @@ public class RunNiFi {
javaCmd = "java"; javaCmd = "java";
} }
final NiFiListener listener = new NiFiListener();
final int listenPort = listener.start(this);
final List<String> cmd = new ArrayList<>(); final List<String> cmd = new ArrayList<>();
cmd.add(javaCmd); cmd.add(javaCmd);
cmd.add("-classpath"); cmd.add("-classpath");
cmd.add(classPath); cmd.add(classPath);
cmd.addAll(javaAdditionalArgs); cmd.addAll(javaAdditionalArgs);
cmd.add("-Dnifi.properties.file.path=" + nifiPropsFilename); cmd.add("-Dnifi.properties.file.path=" + nifiPropsFilename);
cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
cmd.add("org.apache.nifi.NiFi"); cmd.add("org.apache.nifi.NiFi");
builder.command(cmd).inheritIO(); builder.command(cmd);
final StringBuilder cmdBuilder = new StringBuilder(); final StringBuilder cmdBuilder = new StringBuilder();
for ( final String s : cmd ) { for ( final String s : cmd ) {
cmdBuilder.append(s).append(" "); cmdBuilder.append(s).append(" ");
} }
System.out.println("Starting Apache NiFi..."); System.out.println("Starting Apache NiFi...");
System.out.println("Working Directory: " + workingDir.getAbsolutePath()); System.out.println("Working Directory: " + workingDir.getAbsolutePath());
System.out.println("Command: " + cmdBuilder.toString()); System.out.println("Command: " + cmdBuilder.toString());
final Process proc = builder.start(); builder.start();
Runtime.getRuntime().addShutdownHook(new ShutdownHook(proc)); boolean started = waitForStart();
final int statusCode = proc.waitFor();
System.out.println("Apache NiFi exited with Status Code " + statusCode); if ( started ) {
System.out.println("Successfully started Apache NiFi");
} else {
System.err.println("Apache NiFi does not appear to have started");
}
listener.stop();
} }
private static File getFile(final String filename, final File workingDir) { private boolean waitForStart() {
lock.lock();
try {
final long startTime = System.nanoTime();
while ( ccPort < 1 ) {
try {
startupCondition.await(1, TimeUnit.SECONDS);
} catch (final InterruptedException ie) {
return false;
}
final long waitNanos = System.nanoTime() - startTime;
final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
if (waitSeconds > STARTUP_WAIT_SECONDS) {
return false;
}
}
} finally {
lock.unlock();
}
return true;
}
private File getFile(final String filename, final File workingDir) {
File libDir = new File(filename); File libDir = new File(filename);
if ( !libDir.isAbsolute() ) { if ( !libDir.isAbsolute() ) {
libDir = new File(workingDir, filename); libDir = new File(workingDir, filename);
@ -170,7 +388,29 @@ public class RunNiFi {
return libDir; return libDir;
} }
private static String replaceNull(final String value, final String replacement) { private String replaceNull(final String value, final String replacement) {
return (value == null) ? replacement : value; return (value == null) ? replacement : value;
} }
void setAutoRestartNiFi(final boolean restart) {
this.autoRestartNiFi = restart;
}
void setNiFiCommandControlPort(final int port) {
this.ccPort = port;
final File statusFile = getStatusFile();
try (final FileOutputStream fos = new FileOutputStream(statusFile)) {
fos.write(String.valueOf(port).getBytes(StandardCharsets.UTF_8));
fos.getFD().sync();
} catch (final IOException ioe) {
System.err.println("Apache NiFi has started but failed to persist NiFi Port information to " + statusFile.getAbsolutePath() + " due to " + ioe);
}
System.out.println("Apache NiFi now running and listening for requests on port " + port);
}
int getNiFiCommandControlPort() {
return this.ccPort;
}
} }

View File

@ -1,14 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap; package org.apache.nifi.bootstrap;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class ShutdownHook extends Thread { public class ShutdownHook extends Thread {
private final Process nifiProcess; private final Process nifiProcess;
private final RunNiFi runner;
public ShutdownHook(final Process nifiProcess) { public static final int WAIT_SECONDS = 10;
public ShutdownHook(final Process nifiProcess, final RunNiFi runner) {
this.nifiProcess = nifiProcess; this.nifiProcess = nifiProcess;
this.runner = runner;
} }
@Override @Override
public void run() { public void run() {
runner.setAutoRestartNiFi(false);
final int ccPort = runner.getNiFiCommandControlPort();
if ( ccPort > 0 ) {
System.out.println("Initiating Shutdown of NiFi...");
try {
final Socket socket = new Socket("localhost", ccPort);
final OutputStream out = socket.getOutputStream();
out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
out.flush();
socket.close();
} catch (final IOException ioe) {
System.out.println("Failed to Shutdown NiFi due to " + ioe);
}
}
try {
nifiProcess.waitFor(WAIT_SECONDS, TimeUnit.SECONDS);
} catch (final InterruptedException ie) {
}
if ( nifiProcess.isAlive() ) {
System.out.println("NiFi has not finished shutting down after " + WAIT_SECONDS + " seconds. Killing process.");
}
nifiProcess.destroy(); nifiProcess.destroy();
final File statusFile = runner.getStatusFile();
if ( !statusFile.delete() ) {
System.err.println("Failed to delete status file " + statusFile.getAbsolutePath());
}
} }
} }

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.exception;
public class InvalidCommandException extends Exception {
private static final long serialVersionUID = 1L;
public InvalidCommandException() {
super();
}
public InvalidCommandException(final String message) {
super(message);
}
public InvalidCommandException(final Throwable t) {
super(t);
}
public InvalidCommandException(final String message, final Throwable t) {
super(message, t);
}
}