mirror of https://github.com/apache/nifi.git
Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
This commit is contained in:
commit
c2bb1fd5ba
|
@ -92,7 +92,7 @@ public class VolatileContentRepository implements ContentRepository {
|
||||||
public static final String MAX_SIZE_PROPERTY = "nifi.volatile.content.repository.max.size";
|
public static final String MAX_SIZE_PROPERTY = "nifi.volatile.content.repository.max.size";
|
||||||
public static final String BLOCK_SIZE_PROPERTY = "nifi.volatile.content.repository.block.size";
|
public static final String BLOCK_SIZE_PROPERTY = "nifi.volatile.content.repository.block.size";
|
||||||
|
|
||||||
private final ScheduledExecutorService executor = new FlowEngine(3, "VolatileContentRepository Workers");
|
private final ScheduledExecutorService executor = new FlowEngine(3, "VolatileContentRepository Workers", true);
|
||||||
private final ConcurrentMap<ContentClaim, ContentBlock> claimMap = new ConcurrentHashMap<>(256);
|
private final ConcurrentMap<ContentClaim, ContentBlock> claimMap = new ConcurrentHashMap<>(256);
|
||||||
private final AtomicLong repoSize = new AtomicLong(0L);
|
private final AtomicLong repoSize = new AtomicLong(0L);
|
||||||
|
|
||||||
|
|
|
@ -15,19 +15,16 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
|
# chkconfig: 2345 20 80
|
||||||
|
# description: Apache NiFi is a dataflow system based on the principles of Flow-Based Programming.
|
||||||
|
#
|
||||||
|
|
||||||
# Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches
|
# Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches
|
||||||
|
|
||||||
DIRNAME=`dirname "$0"`
|
NIFI_HOME=`cd $(dirname "$0") && cd .. && pwd`
|
||||||
PROGNAME=`basename "$0"`
|
PROGNAME=`basename "$0"`
|
||||||
|
|
||||||
|
|
||||||
#
|
|
||||||
#Readlink is not available on all systems. Change variable to appropriate alternative as part of OS detection
|
|
||||||
#
|
|
||||||
|
|
||||||
READLINK="readlink"
|
|
||||||
|
|
||||||
warn() {
|
warn() {
|
||||||
echo "${PROGNAME}: $*"
|
echo "${PROGNAME}: $*"
|
||||||
}
|
}
|
||||||
|
@ -62,9 +59,6 @@ detectOS() {
|
||||||
export LDR_CNTRL=MAXDATA=0xB0000000@DSA
|
export LDR_CNTRL=MAXDATA=0xB0000000@DSA
|
||||||
echo $LDR_CNTRL
|
echo $LDR_CNTRL
|
||||||
fi
|
fi
|
||||||
if $darwin; then
|
|
||||||
READLINK="greadlink"
|
|
||||||
fi
|
|
||||||
}
|
}
|
||||||
|
|
||||||
unlimitFD() {
|
unlimitFD() {
|
||||||
|
@ -95,22 +89,6 @@ unlimitFD() {
|
||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
locateHome() {
|
|
||||||
if [ "x$NIFI_HOME" != "x" ]; then
|
|
||||||
warn "Ignoring predefined value for NIFI_HOME"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# In POSIX shells, CDPATH may cause cd to write to stdout
|
|
||||||
(unset CDPATH) >/dev/null 2>&1 && unset CDPATH
|
|
||||||
NIFI_HOME=$(dirname $($READLINK -f $0))/../
|
|
||||||
NIFI_HOME=$($READLINK -f $NIFI_HOME)
|
|
||||||
cd $NIFI_HOME
|
|
||||||
echo "Directory changed to NIFI_HOME of '$NIFI_HOME'"
|
|
||||||
if [ ! -d "$NIFI_HOME" ]; then
|
|
||||||
die "NIFI_HOME is not valid: $NIFI_HOME"
|
|
||||||
fi
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
locateJava() {
|
locateJava() {
|
||||||
|
@ -138,9 +116,6 @@ locateJava() {
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
if [ "x$JAVA_HOME" = "x" ]; then
|
|
||||||
JAVA_HOME="$(dirname $(dirname $(pathCanonical "$JAVA")))"
|
|
||||||
fi
|
|
||||||
}
|
}
|
||||||
|
|
||||||
init() {
|
init() {
|
||||||
|
@ -150,13 +125,25 @@ init() {
|
||||||
# Unlimit the number of file descriptors if possible
|
# Unlimit the number of file descriptors if possible
|
||||||
unlimitFD
|
unlimitFD
|
||||||
|
|
||||||
# Locate the NiFi home directory
|
|
||||||
locateHome
|
|
||||||
|
|
||||||
# Locate the Java VM to execute
|
# Locate the Java VM to execute
|
||||||
locateJava
|
locateJava
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
install() {
|
||||||
|
SVC_NAME=nifi
|
||||||
|
if [ "x$2" != "x" ] ; then
|
||||||
|
SVC_NAME=$2
|
||||||
|
fi
|
||||||
|
|
||||||
|
SVC_FILE=/etc/init.d/$SVC_NAME
|
||||||
|
cp $0 $SVC_FILE
|
||||||
|
sed -i s:NIFI_HOME=.*:NIFI_HOME="$NIFI_HOME": $SVC_FILE
|
||||||
|
sed -i s:PROGNAME=.*:PROGNAME=$(basename "$0"): $SVC_FILE
|
||||||
|
echo Service $SVC_NAME installed
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
run() {
|
run() {
|
||||||
BOOTSTRAP_CONF="$NIFI_HOME/conf/bootstrap.conf";
|
BOOTSTRAP_CONF="$NIFI_HOME/conf/bootstrap.conf";
|
||||||
|
|
||||||
|
@ -172,7 +159,7 @@ run() {
|
||||||
echo "Bootstrap Config File: $BOOTSTRAP_CONF"
|
echo "Bootstrap Config File: $BOOTSTRAP_CONF"
|
||||||
echo
|
echo
|
||||||
|
|
||||||
exec "$JAVA" -cp "$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1
|
exec "$JAVA" -cp "$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $@
|
||||||
}
|
}
|
||||||
|
|
||||||
main() {
|
main() {
|
||||||
|
@ -180,4 +167,15 @@ main() {
|
||||||
run "$@"
|
run "$@"
|
||||||
}
|
}
|
||||||
|
|
||||||
main "$@"
|
|
||||||
|
case "$1" in
|
||||||
|
install)
|
||||||
|
install "$@"
|
||||||
|
;;
|
||||||
|
start|stop|run|status)
|
||||||
|
main "$@"
|
||||||
|
;;
|
||||||
|
*)
|
||||||
|
echo "Usage nifi {start|stop|run|status|install}"
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
|
|
@ -13,4 +13,10 @@ java.arg.2=-Xms256m
|
||||||
java.arg.3=-Xmx512m
|
java.arg.3=-Xmx512m
|
||||||
|
|
||||||
# Enable Remote Debugging
|
# Enable Remote Debugging
|
||||||
#java.arg.2=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000
|
#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000
|
||||||
|
|
||||||
|
# Java command to use when running NiFi
|
||||||
|
java=java
|
||||||
|
|
||||||
|
# Username to use when running NiFi. This value will be ignored on Windows.
|
||||||
|
run.as=
|
||||||
|
|
|
@ -27,9 +27,11 @@ import java.net.Socket;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
import org.apache.nifi.util.LimitingInputStream;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -38,14 +40,16 @@ public class BootstrapListener {
|
||||||
|
|
||||||
private final NiFi nifi;
|
private final NiFi nifi;
|
||||||
private final int bootstrapPort;
|
private final int bootstrapPort;
|
||||||
|
private final String secretKey;
|
||||||
|
|
||||||
private volatile Listener listener;
|
private volatile Listener listener;
|
||||||
private volatile ServerSocket serverSocket;
|
private volatile ServerSocket serverSocket;
|
||||||
|
|
||||||
|
|
||||||
public BootstrapListener(final NiFi nifi, final int port) {
|
public BootstrapListener(final NiFi nifi, final int bootstrapPort) {
|
||||||
this.nifi = nifi;
|
this.nifi = nifi;
|
||||||
this.bootstrapPort = port;
|
this.bootstrapPort = bootstrapPort;
|
||||||
|
secretKey = UUID.randomUUID().toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() throws IOException {
|
public void start() throws IOException {
|
||||||
|
@ -71,7 +75,7 @@ public class BootstrapListener {
|
||||||
socket.setSoTimeout(60000);
|
socket.setSoTimeout(60000);
|
||||||
|
|
||||||
final OutputStream out = socket.getOutputStream();
|
final OutputStream out = socket.getOutputStream();
|
||||||
out.write(("PORT " + localPort + "\n").getBytes(StandardCharsets.UTF_8));
|
out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
|
||||||
out.flush();
|
out.flush();
|
||||||
|
|
||||||
logger.debug("Awaiting response from Bootstrap...");
|
logger.debug("Awaiting response from Bootstrap...");
|
||||||
|
@ -121,6 +125,7 @@ public class BootstrapListener {
|
||||||
try {
|
try {
|
||||||
final Socket socket;
|
final Socket socket;
|
||||||
try {
|
try {
|
||||||
|
logger.debug("Listening for Bootstrap Requests");
|
||||||
socket = serverSocket.accept();
|
socket = serverSocket.accept();
|
||||||
} catch (final SocketTimeoutException ste) {
|
} catch (final SocketTimeoutException ste) {
|
||||||
if ( stopped ) {
|
if ( stopped ) {
|
||||||
|
@ -136,6 +141,9 @@ public class BootstrapListener {
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.debug("Received connection from Bootstrap");
|
||||||
|
socket.setSoTimeout(5000);
|
||||||
|
|
||||||
executor.submit(new Runnable() {
|
executor.submit(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -184,27 +192,42 @@ public class BootstrapListener {
|
||||||
out.flush();
|
out.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@SuppressWarnings("resource") // we don't want to close the stream, as the caller will do that
|
||||||
private BootstrapRequest readRequest(final InputStream in) throws IOException {
|
private BootstrapRequest readRequest(final InputStream in) throws IOException {
|
||||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
|
// We want to ensure that we don't try to read data from an InputStream directly
|
||||||
|
// by a BufferedReader because any user on the system could open a socket and send
|
||||||
|
// a multi-gigabyte file without any new lines in order to crash the NiFi instance
|
||||||
|
// (or at least cause OutOfMemoryErrors, which can wreak havoc on the running instance).
|
||||||
|
// So we will limit the Input Stream to only 4 KB, which should be plenty for any request.
|
||||||
|
final LimitingInputStream limitingIn = new LimitingInputStream(in, 4096);
|
||||||
|
final BufferedReader reader = new BufferedReader(new InputStreamReader(limitingIn));
|
||||||
|
|
||||||
final String line = reader.readLine();
|
final String line = reader.readLine();
|
||||||
final String[] splits = line.split(" ");
|
final String[] splits = line.split(" ");
|
||||||
if ( splits.length < 0 ) {
|
if ( splits.length < 0 ) {
|
||||||
throw new IOException("Received invalid command from NiFi: " + line);
|
throw new IOException("Received invalid request from Bootstrap: " + line);
|
||||||
}
|
}
|
||||||
|
|
||||||
final String requestType = splits[0];
|
final String requestType = splits[0];
|
||||||
final String[] args;
|
final String[] args;
|
||||||
if ( splits.length == 1 ) {
|
if ( splits.length == 1 ) {
|
||||||
|
throw new IOException("Received invalid request from Bootstrap; request did not have a secret key; request type = " + requestType);
|
||||||
|
} else if ( splits.length == 2 ) {
|
||||||
args = new String[0];
|
args = new String[0];
|
||||||
} else {
|
} else {
|
||||||
args = Arrays.copyOfRange(splits, 1, splits.length);
|
args = Arrays.copyOfRange(splits, 2, splits.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
final String requestKey = splits[1];
|
||||||
|
if ( !secretKey.equals(requestKey) ) {
|
||||||
|
throw new IOException("Received invalid Secret Key for request type " + requestType);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return new BootstrapRequest(requestType, args);
|
return new BootstrapRequest(requestType, args);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new IOException("Received invalid request from bootstrap; request type = " + requestType);
|
throw new IOException("Received invalid request from Bootstrap; request type = " + requestType);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -227,6 +250,7 @@ public class BootstrapListener {
|
||||||
return requestType;
|
return requestType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
public String[] getArgs() {
|
public String[] getArgs() {
|
||||||
return args;
|
return args;
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.nifi.nar.NarClassLoaders;
|
||||||
import org.apache.nifi.nar.NarUnpacker;
|
import org.apache.nifi.nar.NarUnpacker;
|
||||||
import org.apache.nifi.util.FileUtils;
|
import org.apache.nifi.util.FileUtils;
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.slf4j.bridge.SLF4JBridgeHandler;
|
import org.slf4j.bridge.SLF4JBridgeHandler;
|
||||||
|
@ -61,7 +60,6 @@ public class NiFi {
|
||||||
|
|
||||||
// register the shutdown hook
|
// register the shutdown hook
|
||||||
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
|
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
// shutdown the jetty server
|
// shutdown the jetty server
|
||||||
|
|
|
@ -0,0 +1,107 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.util;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
public class LimitingInputStream extends InputStream {
|
||||||
|
|
||||||
|
private final InputStream in;
|
||||||
|
private final long limit;
|
||||||
|
private long bytesRead = 0;
|
||||||
|
|
||||||
|
public LimitingInputStream(final InputStream in, final long limit) {
|
||||||
|
this.in = in;
|
||||||
|
this.limit = limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read() throws IOException {
|
||||||
|
if (bytesRead >= limit) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
final int val = in.read();
|
||||||
|
if (val > -1) {
|
||||||
|
bytesRead++;
|
||||||
|
}
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(final byte[] b) throws IOException {
|
||||||
|
if (bytesRead >= limit) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
|
||||||
|
|
||||||
|
final int val = in.read(b, 0, maxToRead);
|
||||||
|
if (val > 0) {
|
||||||
|
bytesRead += val;
|
||||||
|
}
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(byte[] b, int off, int len) throws IOException {
|
||||||
|
if (bytesRead >= limit) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
final int maxToRead = (int) Math.min(len, limit - bytesRead);
|
||||||
|
|
||||||
|
final int val = in.read(b, off, maxToRead);
|
||||||
|
if (val > 0) {
|
||||||
|
bytesRead += val;
|
||||||
|
}
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long skip(final long n) throws IOException {
|
||||||
|
final long skipped = in.skip(Math.min(n, limit - bytesRead));
|
||||||
|
bytesRead += skipped;
|
||||||
|
return skipped;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int available() throws IOException {
|
||||||
|
return in.available();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void mark(int readlimit) {
|
||||||
|
in.mark(readlimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean markSupported() {
|
||||||
|
return in.markSupported();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() throws IOException {
|
||||||
|
in.reset();
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,6 +27,7 @@ import java.util.Arrays;
|
||||||
|
|
||||||
import org.apache.nifi.bootstrap.exception.InvalidCommandException;
|
import org.apache.nifi.bootstrap.exception.InvalidCommandException;
|
||||||
|
|
||||||
|
|
||||||
public class BootstrapCodec {
|
public class BootstrapCodec {
|
||||||
private final RunNiFi runner;
|
private final RunNiFi runner;
|
||||||
private final BufferedReader reader;
|
private final BufferedReader reader;
|
||||||
|
@ -63,7 +64,7 @@ public class BootstrapCodec {
|
||||||
private void processRequest(final String cmd, final String[] args) throws InvalidCommandException, IOException {
|
private void processRequest(final String cmd, final String[] args) throws InvalidCommandException, IOException {
|
||||||
switch (cmd) {
|
switch (cmd) {
|
||||||
case "PORT": {
|
case "PORT": {
|
||||||
if ( args.length != 1 ) {
|
if ( args.length != 2 ) {
|
||||||
throw new InvalidCommandException();
|
throw new InvalidCommandException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,7 +79,9 @@ public class BootstrapCodec {
|
||||||
throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
|
throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
|
||||||
}
|
}
|
||||||
|
|
||||||
runner.setNiFiCommandControlPort(port);
|
final String secretKey = args[1];
|
||||||
|
|
||||||
|
runner.setNiFiCommandControlPort(port, secretKey);
|
||||||
writer.write("OK");
|
writer.write("OK");
|
||||||
writer.newLine();
|
writer.newLine();
|
||||||
writer.flush();
|
writer.flush();
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.nifi.bootstrap;
|
package org.apache.nifi.bootstrap;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.ServerSocket;
|
import java.net.ServerSocket;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
|
@ -24,6 +25,8 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.nifi.bootstrap.util.LimitingInputStream;
|
||||||
|
|
||||||
public class NiFiListener {
|
public class NiFiListener {
|
||||||
private ServerSocket serverSocket;
|
private ServerSocket serverSocket;
|
||||||
private volatile Listener listener;
|
private volatile Listener listener;
|
||||||
|
@ -92,17 +95,26 @@ public class NiFiListener {
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
executor.submit(new Runnable() {
|
executor.submit(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
final BootstrapCodec codec = new BootstrapCodec(runner, socket.getInputStream(), socket.getOutputStream());
|
// we want to ensure that we don't try to read data from an InputStream directly
|
||||||
|
// by a BufferedReader because any user on the system could open a socket and send
|
||||||
|
// a multi-gigabyte file without any new lines in order to crash the Bootstrap,
|
||||||
|
// which in turn may cause the Shutdown Hook to shutdown NiFi.
|
||||||
|
// So we will limit the amount of data to read to 4 KB
|
||||||
|
final InputStream limitingIn = new LimitingInputStream(socket.getInputStream(), 4096);
|
||||||
|
final BootstrapCodec codec = new BootstrapCodec(runner, limitingIn, socket.getOutputStream());
|
||||||
codec.communicate();
|
codec.communicate();
|
||||||
socket.close();
|
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
System.out.println("Failed to communicate with NiFi due to " + t);
|
System.out.println("Failed to communicate with NiFi due to " + t);
|
||||||
t.printStackTrace();
|
t.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
socket.close();
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -26,19 +26,28 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.io.Reader;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.attribute.PosixFilePermission;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import java.util.logging.ConsoleHandler;
|
||||||
|
import java.util.logging.Handler;
|
||||||
|
import java.util.logging.Level;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -60,6 +69,8 @@ public class RunNiFi {
|
||||||
public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
|
public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
|
||||||
public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
|
public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
|
||||||
|
|
||||||
|
public static final String RUN_AS_PROP = "run.as";
|
||||||
|
|
||||||
public static final int MAX_RESTART_ATTEMPTS = 5;
|
public static final int MAX_RESTART_ATTEMPTS = 5;
|
||||||
public static final int STARTUP_WAIT_SECONDS = 60;
|
public static final int STARTUP_WAIT_SECONDS = 60;
|
||||||
|
|
||||||
|
@ -68,20 +79,32 @@ public class RunNiFi {
|
||||||
|
|
||||||
private volatile boolean autoRestartNiFi = true;
|
private volatile boolean autoRestartNiFi = true;
|
||||||
private volatile int ccPort = -1;
|
private volatile int ccPort = -1;
|
||||||
|
private volatile long nifiPid = -1L;
|
||||||
|
|
||||||
private final Lock lock = new ReentrantLock();
|
private final Lock lock = new ReentrantLock();
|
||||||
private final Condition startupCondition = lock.newCondition();
|
private final Condition startupCondition = lock.newCondition();
|
||||||
|
|
||||||
private final File bootstrapConfigFile;
|
private final File bootstrapConfigFile;
|
||||||
|
|
||||||
public RunNiFi(final File bootstrapConfigFile) {
|
private final java.util.logging.Logger logger;
|
||||||
|
|
||||||
|
public RunNiFi(final File bootstrapConfigFile, final boolean verbose) {
|
||||||
this.bootstrapConfigFile = bootstrapConfigFile;
|
this.bootstrapConfigFile = bootstrapConfigFile;
|
||||||
|
logger = java.util.logging.Logger.getLogger("Bootstrap");
|
||||||
|
if ( verbose ) {
|
||||||
|
logger.info("Enabling Verbose Output");
|
||||||
|
|
||||||
|
logger.setLevel(Level.FINE);
|
||||||
|
final Handler handler = new ConsoleHandler();
|
||||||
|
handler.setLevel(Level.FINE);
|
||||||
|
logger.addHandler(handler);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void printUsage() {
|
private static void printUsage() {
|
||||||
System.out.println("Usage:");
|
System.out.println("Usage:");
|
||||||
System.out.println();
|
System.out.println();
|
||||||
System.out.println("java org.apache.nifi.bootstrap.RunNiFi <command>");
|
System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] <command>");
|
||||||
System.out.println();
|
System.out.println();
|
||||||
System.out.println("Valid commands include:");
|
System.out.println("Valid commands include:");
|
||||||
System.out.println("");
|
System.out.println("");
|
||||||
|
@ -92,21 +115,32 @@ public class RunNiFi {
|
||||||
System.out.println();
|
System.out.println();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static void main(final String[] args) throws IOException, InterruptedException {
|
public static void main(final String[] args) throws IOException, InterruptedException {
|
||||||
if ( args.length != 1 ) {
|
if ( args.length < 1 || args.length > 2 ) {
|
||||||
printUsage();
|
printUsage();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (args[0].toLowerCase()) {
|
boolean verbose = false;
|
||||||
|
if ( args.length == 2 ) {
|
||||||
|
if ( args[0].equals("-verbose") ) {
|
||||||
|
verbose = true;
|
||||||
|
} else {
|
||||||
|
printUsage();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final String cmd = args.length == 1 ? args[0] : args[1];
|
||||||
|
|
||||||
|
switch (cmd.toLowerCase()) {
|
||||||
case "start":
|
case "start":
|
||||||
case "run":
|
case "run":
|
||||||
case "stop":
|
case "stop":
|
||||||
case "status":
|
case "status":
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
System.out.println("Invalid argument: " + args[0]);
|
|
||||||
System.out.println();
|
|
||||||
printUsage();
|
printUsage();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -128,9 +162,9 @@ public class RunNiFi {
|
||||||
|
|
||||||
final File configFile = new File(configFilename);
|
final File configFile = new File(configFilename);
|
||||||
|
|
||||||
final RunNiFi runNiFi = new RunNiFi(configFile);
|
final RunNiFi runNiFi = new RunNiFi(configFile, verbose);
|
||||||
|
|
||||||
switch (args[0].toLowerCase()) {
|
switch (cmd.toLowerCase()) {
|
||||||
case "start":
|
case "start":
|
||||||
runNiFi.start(false);
|
runNiFi.start(false);
|
||||||
break;
|
break;
|
||||||
|
@ -151,51 +185,211 @@ public class RunNiFi {
|
||||||
final File confDir = bootstrapConfigFile.getParentFile();
|
final File confDir = bootstrapConfigFile.getParentFile();
|
||||||
final File nifiHome = confDir.getParentFile();
|
final File nifiHome = confDir.getParentFile();
|
||||||
final File bin = new File(nifiHome, "bin");
|
final File bin = new File(nifiHome, "bin");
|
||||||
final File statusFile = new File(bin, "nifi.port");
|
final File statusFile = new File(bin, "nifi.pid");
|
||||||
|
|
||||||
|
logger.fine("Status File: " + statusFile);
|
||||||
|
|
||||||
return statusFile;
|
return statusFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Integer getCurrentPort() throws IOException {
|
private Properties loadProperties() throws IOException {
|
||||||
try {
|
final Properties props = new Properties();
|
||||||
final File statusFile = getStatusFile();
|
final File statusFile = getStatusFile();
|
||||||
final byte[] info = Files.readAllBytes(statusFile.toPath());
|
if ( statusFile == null || !statusFile.exists() ) {
|
||||||
final String text = new String(info);
|
logger.fine("No status file to load properties from");
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
|
||||||
final int port = Integer.parseInt(text);
|
try (final FileInputStream fis = new FileInputStream(getStatusFile())) {
|
||||||
|
props.load(fis);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.fine("Properties: " + props);
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void saveProperties(final Properties nifiProps) throws IOException {
|
||||||
|
final File statusFile = getStatusFile();
|
||||||
|
if ( statusFile.exists() && !statusFile.delete() ) {
|
||||||
|
logger.warning("Failed to delete " + statusFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( !statusFile.createNewFile() ) {
|
||||||
|
throw new IOException("Failed to create file " + statusFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
final Set<PosixFilePermission> perms = new HashSet<>();
|
||||||
|
perms.add(PosixFilePermission.OWNER_READ);
|
||||||
|
perms.add(PosixFilePermission.OWNER_WRITE);
|
||||||
|
Files.setPosixFilePermissions(statusFile.toPath(), perms);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
logger.warning("Failed to set permissions so that only the owner can read status file " + statusFile + "; this may allows others to have access to the key needed to communicate with NiFi. Permissions should be changed so that only the owner can read this file");
|
||||||
|
}
|
||||||
|
|
||||||
|
try (final FileOutputStream fos = new FileOutputStream(statusFile)) {
|
||||||
|
nifiProps.store(fos, null);
|
||||||
|
fos.getFD().sync();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.fine("Saved Properties " + nifiProps + " to " + statusFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isPingSuccessful(final int port, final String secretKey) {
|
||||||
|
logger.fine("Pinging " + port);
|
||||||
|
|
||||||
try (final Socket socket = new Socket("localhost", port)) {
|
try (final Socket socket = new Socket("localhost", port)) {
|
||||||
final OutputStream out = socket.getOutputStream();
|
final OutputStream out = socket.getOutputStream();
|
||||||
out.write((PING_CMD + "\n").getBytes(StandardCharsets.UTF_8));
|
out.write((PING_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
|
||||||
out.flush();
|
out.flush();
|
||||||
|
|
||||||
|
logger.fine("Sent PING command");
|
||||||
|
socket.setSoTimeout(5000);
|
||||||
final InputStream in = socket.getInputStream();
|
final InputStream in = socket.getInputStream();
|
||||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
|
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
|
||||||
final String response = reader.readLine();
|
final String response = reader.readLine();
|
||||||
if ( response.equals(PING_CMD) ) {
|
logger.fine("PING response: " + response);
|
||||||
|
|
||||||
|
return PING_CMD.equals(response);
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Integer getCurrentPort() throws IOException {
|
||||||
|
final Properties props = loadProperties();
|
||||||
|
final String portVal = props.getProperty("port");
|
||||||
|
if ( portVal == null ) {
|
||||||
|
logger.fine("No Port found in status file");
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
logger.fine("Port defined in status file: " + portVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
final int port = Integer.parseInt(portVal);
|
||||||
|
final boolean success = isPingSuccessful(port, props.getProperty("secret.key"));
|
||||||
|
if ( success ) {
|
||||||
|
logger.fine("Successful PING on port " + port);
|
||||||
return port;
|
return port;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String pid = props.getProperty("pid");
|
||||||
|
logger.fine("PID in status file is " + pid);
|
||||||
|
if ( pid != null ) {
|
||||||
|
final boolean procRunning = isProcessRunning(pid);
|
||||||
|
if ( procRunning ) {
|
||||||
|
return port;
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private boolean isProcessRunning(final String pid) {
|
||||||
|
try {
|
||||||
|
// We use the "ps" command to check if the process is still running.
|
||||||
|
final ProcessBuilder builder = new ProcessBuilder();
|
||||||
|
|
||||||
|
builder.command("ps", "-p", pid, "--no-headers");
|
||||||
|
final Process proc = builder.start();
|
||||||
|
|
||||||
|
// Read how many lines are output by the 'ps' command
|
||||||
|
int lineCount = 0;
|
||||||
|
String line;
|
||||||
|
try (final InputStream in = proc.getInputStream();
|
||||||
|
final Reader streamReader = new InputStreamReader(in);
|
||||||
|
final BufferedReader reader = new BufferedReader(streamReader)) {
|
||||||
|
|
||||||
|
while ((line = reader.readLine()) != null) {
|
||||||
|
if ( !line.trim().isEmpty() ) {
|
||||||
|
lineCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If anything was output, the process is running.
|
||||||
|
final boolean running = lineCount > 0;
|
||||||
|
if ( running ) {
|
||||||
|
logger.fine("Process with PID " + pid + " is running");
|
||||||
|
} else {
|
||||||
|
logger.fine("Process with PID " + pid + " is not running");
|
||||||
|
}
|
||||||
|
|
||||||
|
return running;
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
System.out.println("Found NiFi instance info at " + statusFile + " indicating that NiFi is running and listening to port " + port + " but unable to communicate with NiFi on that port. The process may have died or may be hung.");
|
System.err.println("Failed to determine if Process " + pid + " is running; assuming that it is not");
|
||||||
throw ioe;
|
return false;
|
||||||
}
|
}
|
||||||
} catch (final Exception e) {
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
|
||||||
|
private Status getStatus() {
|
||||||
|
final Properties props;
|
||||||
|
try {
|
||||||
|
props = loadProperties();
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
return new Status(null, null, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ( props == null ) {
|
||||||
|
return new Status(null, null, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
final String portValue = props.getProperty("port");
|
||||||
|
final String pid = props.getProperty("pid");
|
||||||
|
final String secretKey = props.getProperty("secret.key");
|
||||||
|
|
||||||
|
if ( portValue == null && pid == null ) {
|
||||||
|
return new Status(null, null, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
Integer port = null;
|
||||||
|
boolean pingSuccess = false;
|
||||||
|
if ( portValue != null ) {
|
||||||
|
try {
|
||||||
|
port = Integer.parseInt(portValue);
|
||||||
|
pingSuccess = isPingSuccessful(port, secretKey);
|
||||||
|
} catch (final NumberFormatException nfe) {
|
||||||
|
return new Status(null, null, false, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( pingSuccess ) {
|
||||||
|
return new Status(port, pid, true, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
final boolean alive = (pid == null) ? false : isProcessRunning(pid);
|
||||||
|
return new Status(port, pid, pingSuccess, alive);
|
||||||
|
}
|
||||||
|
|
||||||
public void status() throws IOException {
|
public void status() throws IOException {
|
||||||
final Integer port = getCurrentPort();
|
final Status status = getStatus();
|
||||||
if ( port == null ) {
|
if ( status.isRespondingToPing() ) {
|
||||||
System.out.println("Apache NiFi does not appear to be running");
|
logger.info("Apache NiFi is currently running, listening to Bootstrap on port " + status.getPort() +
|
||||||
} else {
|
", PID=" + (status.getPid() == null ? "unknkown" : status.getPid()));
|
||||||
System.out.println("Apache NiFi is currently running, listening on port " + port);
|
|
||||||
}
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ( status.isProcessRunning() ) {
|
||||||
|
logger.info("Apache NiFi is running at PID " + status.getPid() + " but is not responding to ping requests");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( status.getPort() == null ) {
|
||||||
|
logger.info("Apache NiFi is not running");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( status.getPid() == null ) {
|
||||||
|
logger.info("Apache NiFi is not responding to Ping requests. The process may have died or may be hung");
|
||||||
|
} else {
|
||||||
|
logger.info("Apache NiFi is not running");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void stop() throws IOException {
|
public void stop() throws IOException {
|
||||||
final Integer port = getCurrentPort();
|
final Integer port = getCurrentPort();
|
||||||
|
@ -204,35 +398,112 @@ public class RunNiFi {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final Properties nifiProps = loadProperties();
|
||||||
|
final String secretKey = nifiProps.getProperty("secret.key");
|
||||||
|
|
||||||
try (final Socket socket = new Socket()) {
|
try (final Socket socket = new Socket()) {
|
||||||
|
logger.fine("Connecting to NiFi instance");
|
||||||
socket.setSoTimeout(60000);
|
socket.setSoTimeout(60000);
|
||||||
socket.connect(new InetSocketAddress("localhost", port));
|
socket.connect(new InetSocketAddress("localhost", port));
|
||||||
|
logger.fine("Established connection to NiFi instance.");
|
||||||
socket.setSoTimeout(60000);
|
socket.setSoTimeout(60000);
|
||||||
|
|
||||||
|
logger.fine("Sending SHUTDOWN Command to port " + port);
|
||||||
final OutputStream out = socket.getOutputStream();
|
final OutputStream out = socket.getOutputStream();
|
||||||
out.write((SHUTDOWN_CMD + "\n").getBytes(StandardCharsets.UTF_8));
|
out.write((SHUTDOWN_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
|
||||||
out.flush();
|
out.flush();
|
||||||
|
|
||||||
final InputStream in = socket.getInputStream();
|
final InputStream in = socket.getInputStream();
|
||||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
|
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
|
||||||
final String response = reader.readLine();
|
final String response = reader.readLine();
|
||||||
|
|
||||||
|
logger.fine("Received response to SHUTDOWN command: " + response);
|
||||||
|
|
||||||
if ( SHUTDOWN_CMD.equals(response) ) {
|
if ( SHUTDOWN_CMD.equals(response) ) {
|
||||||
System.out.println("Apache NiFi has accepted the Shutdown Command and is shutting down now");
|
logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now");
|
||||||
|
|
||||||
|
final String pid = nifiProps.getProperty("pid");
|
||||||
|
if ( pid != null ) {
|
||||||
|
final Properties bootstrapProperties = new Properties();
|
||||||
|
try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
|
||||||
|
bootstrapProperties.load(fis);
|
||||||
|
}
|
||||||
|
|
||||||
|
String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
|
||||||
|
int gracefulShutdownSeconds;
|
||||||
|
try {
|
||||||
|
gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
|
||||||
|
} catch (final NumberFormatException nfe) {
|
||||||
|
gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
final long startWait = System.nanoTime();
|
||||||
|
while ( isProcessRunning(pid) ) {
|
||||||
|
logger.info("Waiting for Apache NiFi to finish shutting down...");
|
||||||
|
final long waitNanos = System.nanoTime() - startWait;
|
||||||
|
final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
|
||||||
|
if ( waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0 ) {
|
||||||
|
if ( isProcessRunning(pid) ) {
|
||||||
|
logger.warning("NiFi has not finished shutting down after " + gracefulShutdownSeconds + " seconds. Killing process.");
|
||||||
|
try {
|
||||||
|
killProcessTree(pid);
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
logger.severe("Failed to kill Process with PID " + pid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
Thread.sleep(2000L);
|
||||||
|
} catch (final InterruptedException ie) {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("NiFi has finished shutting down.");
|
||||||
|
}
|
||||||
|
|
||||||
final File statusFile = getStatusFile();
|
final File statusFile = getStatusFile();
|
||||||
if ( !statusFile.delete() ) {
|
if ( !statusFile.delete() ) {
|
||||||
System.err.println("Failed to delete status file " + statusFile + "; this file should be cleaned up manually");
|
logger.severe("Failed to delete status file " + statusFile + "; this file should be cleaned up manually");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
System.err.println("When sending SHUTDOWN command to NiFi, got unexpected response " + response);
|
logger.severe("When sending SHUTDOWN command to NiFi, got unexpected response " + response);
|
||||||
}
|
}
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
System.err.println("Failed to communicate with Apache NiFi");
|
logger.severe("Failed to send shutdown command to port " + port + " due to " + ioe);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static List<String> getChildProcesses(final String ppid) throws IOException {
|
||||||
|
final Process proc = Runtime.getRuntime().exec(new String[] {"ps", "-o", "pid", "--no-headers", "--ppid", ppid});
|
||||||
|
final List<String> childPids = new ArrayList<>();
|
||||||
|
try (final InputStream in = proc.getInputStream();
|
||||||
|
final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
|
||||||
|
|
||||||
|
String line;
|
||||||
|
while ((line = reader.readLine()) != null) {
|
||||||
|
childPids.add(line.trim());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return childPids;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void killProcessTree(final String pid) throws IOException {
|
||||||
|
logger.fine("Killing Process Tree for PID " + pid);
|
||||||
|
|
||||||
|
final List<String> children = getChildProcesses(pid);
|
||||||
|
logger.fine("Children of PID " + pid + ": " + children);
|
||||||
|
|
||||||
|
for ( final String childPid : children ) {
|
||||||
|
killProcessTree(childPid);
|
||||||
|
}
|
||||||
|
|
||||||
|
Runtime.getRuntime().exec(new String[] {"kill", "-9", pid});
|
||||||
|
}
|
||||||
|
|
||||||
public static boolean isAlive(final Process process) {
|
public static boolean isAlive(final Process process) {
|
||||||
try {
|
try {
|
||||||
process.exitValue();
|
process.exitValue();
|
||||||
|
@ -246,7 +517,7 @@ public class RunNiFi {
|
||||||
public void start(final boolean monitor) throws IOException, InterruptedException {
|
public void start(final boolean monitor) throws IOException, InterruptedException {
|
||||||
final Integer port = getCurrentPort();
|
final Integer port = getCurrentPort();
|
||||||
if ( port != null ) {
|
if ( port != null ) {
|
||||||
System.out.println("Apache NiFi is already running, listening on port " + port);
|
System.out.println("Apache NiFi is already running, listening to Bootstrap on port " + port);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -344,7 +615,20 @@ public class RunNiFi {
|
||||||
final NiFiListener listener = new NiFiListener();
|
final NiFiListener listener = new NiFiListener();
|
||||||
final int listenPort = listener.start(this);
|
final int listenPort = listener.start(this);
|
||||||
|
|
||||||
|
String runAs = isWindows() ? null : props.get(RUN_AS_PROP);
|
||||||
|
if ( runAs != null ) {
|
||||||
|
runAs = runAs.trim();
|
||||||
|
if ( runAs.isEmpty() ) {
|
||||||
|
runAs = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final List<String> cmd = new ArrayList<>();
|
final List<String> cmd = new ArrayList<>();
|
||||||
|
if ( runAs != null ) {
|
||||||
|
cmd.add("sudo");
|
||||||
|
cmd.add("-u");
|
||||||
|
cmd.add(runAs);
|
||||||
|
}
|
||||||
cmd.add(javaCmd);
|
cmd.add(javaCmd);
|
||||||
cmd.add("-classpath");
|
cmd.add("-classpath");
|
||||||
cmd.add(classPath);
|
cmd.add(classPath);
|
||||||
|
@ -361,9 +645,9 @@ public class RunNiFi {
|
||||||
cmdBuilder.append(s).append(" ");
|
cmdBuilder.append(s).append(" ");
|
||||||
}
|
}
|
||||||
|
|
||||||
System.out.println("Starting Apache NiFi...");
|
logger.info("Starting Apache NiFi...");
|
||||||
System.out.println("Working Directory: " + workingDir.getAbsolutePath());
|
logger.info("Working Directory: " + workingDir.getAbsolutePath());
|
||||||
System.out.println("Command: " + cmdBuilder.toString());
|
logger.info("Command: " + cmdBuilder.toString());
|
||||||
|
|
||||||
if ( monitor ) {
|
if ( monitor ) {
|
||||||
String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP);
|
String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP);
|
||||||
|
@ -383,6 +667,13 @@ public class RunNiFi {
|
||||||
}
|
}
|
||||||
|
|
||||||
Process process = builder.start();
|
Process process = builder.start();
|
||||||
|
Long pid = getPid(process);
|
||||||
|
if ( pid != null ) {
|
||||||
|
nifiPid = pid;
|
||||||
|
final Properties nifiProps = new Properties();
|
||||||
|
nifiProps.setProperty("pid", String.valueOf(nifiPid));
|
||||||
|
saveProperties(nifiProps);
|
||||||
|
}
|
||||||
|
|
||||||
ShutdownHook shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds);
|
ShutdownHook shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds);
|
||||||
final Runtime runtime = Runtime.getRuntime();
|
final Runtime runtime = Runtime.getRuntime();
|
||||||
|
@ -404,18 +695,26 @@ public class RunNiFi {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (autoRestartNiFi) {
|
if (autoRestartNiFi) {
|
||||||
System.out.println("Apache NiFi appears to have died. Restarting...");
|
logger.warning("Apache NiFi appears to have died. Restarting...");
|
||||||
process = builder.start();
|
process = builder.start();
|
||||||
|
|
||||||
|
pid = getPid(process);
|
||||||
|
if ( pid != null ) {
|
||||||
|
nifiPid = pid;
|
||||||
|
final Properties nifiProps = new Properties();
|
||||||
|
nifiProps.setProperty("pid", String.valueOf(nifiPid));
|
||||||
|
saveProperties(nifiProps);
|
||||||
|
}
|
||||||
|
|
||||||
shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds);
|
shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds);
|
||||||
runtime.addShutdownHook(shutdownHook);
|
runtime.addShutdownHook(shutdownHook);
|
||||||
|
|
||||||
final boolean started = waitForStart();
|
final boolean started = waitForStart();
|
||||||
|
|
||||||
if ( started ) {
|
if ( started ) {
|
||||||
System.out.println("Successfully started Apache NiFi");
|
logger.info("Successfully started Apache NiFi" + (pid == null ? "" : " with PID " + pid));
|
||||||
} else {
|
} else {
|
||||||
System.err.println("Apache NiFi does not appear to have started");
|
logger.severe("Apache NiFi does not appear to have started");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return;
|
return;
|
||||||
|
@ -423,13 +722,22 @@ public class RunNiFi {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
builder.start();
|
final Process process = builder.start();
|
||||||
|
final Long pid = getPid(process);
|
||||||
|
|
||||||
|
if ( pid != null ) {
|
||||||
|
nifiPid = pid;
|
||||||
|
final Properties nifiProps = new Properties();
|
||||||
|
nifiProps.setProperty("pid", String.valueOf(nifiPid));
|
||||||
|
saveProperties(nifiProps);
|
||||||
|
}
|
||||||
|
|
||||||
boolean started = waitForStart();
|
boolean started = waitForStart();
|
||||||
|
|
||||||
if ( started ) {
|
if ( started ) {
|
||||||
System.out.println("Successfully started Apache NiFi");
|
logger.info("Successfully started Apache NiFi" + (pid == null ? "" : " with PID " + pid));
|
||||||
} else {
|
} else {
|
||||||
System.err.println("Apache NiFi does not appear to have started");
|
logger.severe("Apache NiFi does not appear to have started");
|
||||||
}
|
}
|
||||||
|
|
||||||
listener.stop();
|
listener.stop();
|
||||||
|
@ -437,6 +745,30 @@ public class RunNiFi {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private Long getPid(final Process process) {
|
||||||
|
try {
|
||||||
|
final Class<?> procClass = process.getClass();
|
||||||
|
final Field pidField = procClass.getDeclaredField("pid");
|
||||||
|
pidField.setAccessible(true);
|
||||||
|
final Object pidObject = pidField.get(process);
|
||||||
|
|
||||||
|
logger.fine("PID Object = " + pidObject);
|
||||||
|
|
||||||
|
if ( pidObject instanceof Number ) {
|
||||||
|
return ((Number) pidObject).longValue();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
} catch (final IllegalAccessException | NoSuchFieldException nsfe) {
|
||||||
|
logger.fine("Could not find PID for child process due to " + nsfe);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isWindows() {
|
||||||
|
final String osName = System.getProperty("os.name");
|
||||||
|
return osName != null && osName.toLowerCase().contains("win");
|
||||||
|
}
|
||||||
|
|
||||||
private boolean waitForStart() {
|
private boolean waitForStart() {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
|
@ -478,21 +810,59 @@ public class RunNiFi {
|
||||||
this.autoRestartNiFi = restart;
|
this.autoRestartNiFi = restart;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setNiFiCommandControlPort(final int port) {
|
void setNiFiCommandControlPort(final int port, final String secretKey) {
|
||||||
this.ccPort = port;
|
this.ccPort = port;
|
||||||
|
|
||||||
final File statusFile = getStatusFile();
|
final File statusFile = getStatusFile();
|
||||||
try (final FileOutputStream fos = new FileOutputStream(statusFile)) {
|
|
||||||
fos.write(String.valueOf(port).getBytes(StandardCharsets.UTF_8));
|
final Properties nifiProps = new Properties();
|
||||||
fos.getFD().sync();
|
if ( nifiPid != -1 ) {
|
||||||
|
nifiProps.setProperty("pid", String.valueOf(nifiPid));
|
||||||
|
}
|
||||||
|
nifiProps.setProperty("port", String.valueOf(ccPort));
|
||||||
|
nifiProps.setProperty("secret.key", secretKey);
|
||||||
|
|
||||||
|
try {
|
||||||
|
saveProperties(nifiProps);
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
System.err.println("Apache NiFi has started but failed to persist NiFi Port information to " + statusFile.getAbsolutePath() + " due to " + ioe);
|
logger.warning("Apache NiFi has started but failed to persist NiFi Port information to " + statusFile.getAbsolutePath() + " due to " + ioe);
|
||||||
}
|
}
|
||||||
|
|
||||||
System.out.println("Apache NiFi now running and listening for requests on port " + port);
|
logger.info("Apache NiFi now running and listening for Bootstrap requests on port " + port);
|
||||||
}
|
}
|
||||||
|
|
||||||
int getNiFiCommandControlPort() {
|
int getNiFiCommandControlPort() {
|
||||||
return this.ccPort;
|
return this.ccPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static class Status {
|
||||||
|
private final Integer port;
|
||||||
|
private final String pid;
|
||||||
|
|
||||||
|
private final Boolean respondingToPing;
|
||||||
|
private final Boolean processRunning;
|
||||||
|
|
||||||
|
public Status(final Integer port, final String pid, final Boolean respondingToPing, final Boolean processRunning) {
|
||||||
|
this.port = port;
|
||||||
|
this.pid = pid;
|
||||||
|
this.respondingToPing = respondingToPing;
|
||||||
|
this.processRunning = processRunning;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPid() {
|
||||||
|
return pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getPort() {
|
||||||
|
return port;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isRespondingToPing() {
|
||||||
|
return Boolean.TRUE.equals(respondingToPing);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isProcessRunning() {
|
||||||
|
return Boolean.TRUE.equals(processRunning);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,107 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.bootstrap.util;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
public class LimitingInputStream extends InputStream {
|
||||||
|
|
||||||
|
private final InputStream in;
|
||||||
|
private final long limit;
|
||||||
|
private long bytesRead = 0;
|
||||||
|
|
||||||
|
public LimitingInputStream(final InputStream in, final long limit) {
|
||||||
|
this.in = in;
|
||||||
|
this.limit = limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read() throws IOException {
|
||||||
|
if (bytesRead >= limit) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
final int val = in.read();
|
||||||
|
if (val > -1) {
|
||||||
|
bytesRead++;
|
||||||
|
}
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(final byte[] b) throws IOException {
|
||||||
|
if (bytesRead >= limit) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
|
||||||
|
|
||||||
|
final int val = in.read(b, 0, maxToRead);
|
||||||
|
if (val > 0) {
|
||||||
|
bytesRead += val;
|
||||||
|
}
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(byte[] b, int off, int len) throws IOException {
|
||||||
|
if (bytesRead >= limit) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
final int maxToRead = (int) Math.min(len, limit - bytesRead);
|
||||||
|
|
||||||
|
final int val = in.read(b, off, maxToRead);
|
||||||
|
if (val > 0) {
|
||||||
|
bytesRead += val;
|
||||||
|
}
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long skip(final long n) throws IOException {
|
||||||
|
final long skipped = in.skip(Math.min(n, limit - bytesRead));
|
||||||
|
bytesRead += skipped;
|
||||||
|
return skipped;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int available() throws IOException {
|
||||||
|
return in.available();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void mark(int readlimit) {
|
||||||
|
in.mark(readlimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean markSupported() {
|
||||||
|
return in.markSupported();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() throws IOException {
|
||||||
|
in.reset();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue