Merge branch 'bootstrap' into develop

This commit is contained in:
Mark Payne 2014-12-10 11:08:55 -05:00
commit cce36335e1
20 changed files with 1137 additions and 223 deletions

View File

@ -119,6 +119,11 @@
<artifactId>nifi-runtime</artifactId> <artifactId>nifi-runtime</artifactId>
<version>${framework.version}</version> <version>${framework.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-bootstrap</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-resources</artifactId> <artifactId>nifi-resources</artifactId>

View File

@ -1045,6 +1045,21 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
if (flowFileSwapManager != null) { if (flowFileSwapManager != null) {
flowFileSwapManager.shutdown(); flowFileSwapManager.shutdown();
} }
if ( processScheduler != null ) {
processScheduler.shutdown();
}
if ( provenanceEventRepository != null ) {
try {
provenanceEventRepository.close();
} catch (final IOException ioe) {
LOG.warn("There was a problem shutting down the Provenance Repository: " + ioe.toString());
if ( LOG.isDebugEnabled() ) {
LOG.warn("", ioe);
}
}
}
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }

View File

@ -92,7 +92,7 @@ public class FileSystemRepository implements ContentRepository {
private final List<String> containerNames; private final List<String> containerNames;
private final AtomicLong index; private final AtomicLong index;
private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers"); private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true);
private final ConcurrentMap<String, BlockingQueue<ContentClaim>> reclaimable = new ConcurrentHashMap<>(); private final ConcurrentMap<String, BlockingQueue<ContentClaim>> reclaimable = new ConcurrentHashMap<>();
private final Map<String, ContainerState> containerStateMap = new HashMap<>(); private final Map<String, ContainerState> containerStateMap = new HashMap<>();
@ -209,7 +209,7 @@ public class FileSystemRepository implements ContentRepository {
} }
} }
containerCleanupExecutor = new FlowEngine(containers.size(), "Cleanup FileSystemRepository Container"); containerCleanupExecutor = new FlowEngine(containers.size(), "Cleanup FileSystemRepository Container", true);
for (final Map.Entry<String, Path> containerEntry : containers.entrySet()) { for (final Map.Entry<String, Path> containerEntry : containers.entrySet()) {
final String containerName = containerEntry.getKey(); final String containerName = containerEntry.getKey();
final Path containerPath = containerEntry.getValue(); final Path containerPath = containerEntry.getValue();

View File

@ -90,6 +90,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
private final long checkpointDelayMillis; private final long checkpointDelayMillis;
private final Path flowFileRepositoryPath; private final Path flowFileRepositoryPath;
private final int numPartitions; private final int numPartitions;
private final ScheduledExecutorService checkpointExecutor;
// effectively final // effectively final
private WriteAheadRepository<RepositoryRecord> wal; private WriteAheadRepository<RepositoryRecord> wal;
@ -128,6 +129,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
flowFileRepositoryPath = properties.getFlowFileRepositoryPath(); flowFileRepositoryPath = properties.getFlowFileRepositoryPath();
numPartitions = properties.getFlowFileRepositoryPartitions(); numPartitions = properties.getFlowFileRepositoryPartitions();
checkpointDelayMillis = FormatUtils.getTimeDuration(properties.getFlowFileRepositoryCheckpointInterval(), TimeUnit.MILLISECONDS); checkpointDelayMillis = FormatUtils.getTimeDuration(properties.getFlowFileRepositoryCheckpointInterval(), TimeUnit.MILLISECONDS);
checkpointExecutor = Executors.newSingleThreadScheduledExecutor();
} }
@Override @Override
@ -150,6 +153,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
checkpointFuture.cancel(false); checkpointFuture.cancel(false);
} }
checkpointExecutor.shutdown();
wal.shutdown(); wal.shutdown();
} }
@ -363,8 +367,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
} }
}; };
final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); checkpointFuture = checkpointExecutor.scheduleWithFixedDelay(checkpointRunnable, checkpointDelayMillis, checkpointDelayMillis, TimeUnit.MILLISECONDS);
checkpointFuture = executorService.scheduleWithFixedDelay(checkpointRunnable, checkpointDelayMillis, checkpointDelayMillis, TimeUnit.MILLISECONDS);
return maxId; return maxId;
} }

View File

@ -138,6 +138,9 @@ public final class StandardProcessScheduler implements ProcessScheduler {
LOG.error("", t); LOG.error("", t);
} }
} }
frameworkTaskExecutor.shutdown();
componentLifeCycleThreadPool.shutdown();
} }
public void schedule(final ReportingTaskNode taskNode) { public void schedule(final ReportingTaskNode taskNode) {

View File

@ -42,6 +42,18 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
* @param threadNamePrefix * @param threadNamePrefix
*/ */
public FlowEngine(int corePoolSize, final String threadNamePrefix) { public FlowEngine(int corePoolSize, final String threadNamePrefix) {
this(corePoolSize, threadNamePrefix, false);
}
/**
* Creates a new instance of FlowEngine
*
* @param corePoolSize the maximum number of threads available to tasks
* running in the engine.
* @param threadNamePrefix
* @param deamon if true, the thread pool will be populated with daemon threads, otherwise the threads will not be marked as daemon.
*/
public FlowEngine(int corePoolSize, final String threadNamePrefix, final boolean daemon) {
super(corePoolSize); super(corePoolSize);
final AtomicInteger threadIndex = new AtomicInteger(0); final AtomicInteger threadIndex = new AtomicInteger(0);
@ -50,6 +62,9 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
@Override @Override
public Thread newThread(final Runnable r) { public Thread newThread(final Runnable r) {
final Thread t = defaultThreadFactory.newThread(r); final Thread t = defaultThreadFactory.newThread(r);
if ( daemon ) {
t.setDaemon(true);
}
t.setName(threadNamePrefix + " Thread-" + threadIndex.incrementAndGet()); t.setName(threadNamePrefix + " Thread-" + threadIndex.incrementAndGet());
return t; return t;
} }

View File

@ -0,0 +1,32 @@
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
SET LIB_DIR=%~dp0..\lib
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=status
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

View File

@ -21,34 +21,6 @@
DIRNAME=`dirname "$0"` DIRNAME=`dirname "$0"`
PROGNAME=`basename "$0"` PROGNAME=`basename "$0"`
#
# Sourcing environment settings for NIFI similar to tomcats setenv
#
NIFI_SCRIPT="nifi.sh"
export NIFI_SCRIPT
if [ -f "$DIRNAME/setenv.sh" ]; then
. "$DIRNAME/setenv.sh"
fi
#
# Check/Set up some easily accessible MIN/MAX params for JVM mem usage
#
if [ "x$JAVA_MIN_MEM" = "x" ]; then
JAVA_MIN_MEM=512M
export JAVA_MIN_MEM
fi
if [ "x$JAVA_MAX_MEM" = "x" ]; then
JAVA_MAX_MEM=512M
export JAVA_MAX_MEM
fi
if [ "x$JAVA_PERMSIZE" = "x" ]; then
JAVA_PERMSIZE=128M
export JAVA_PERMSIZE
fi
if [ "x$JAVA_MAX_PERMSIZE" = "x" ]; then
JAVA_MAX_PERMSIZE=128M
export JAVA_MAX_PERMSIZE
fi
# #
#Readlink is not available on all systems. Change variable to appropriate alternative as part of OS detection #Readlink is not available on all systems. Change variable to appropriate alternative as part of OS detection
@ -140,58 +112,6 @@ locateHome() {
} }
locateBase() {
if [ "x$NIFI_BASE" != "x" ]; then
if [ ! -d "$NIFI_BASE" ]; then
die "NIFI_BASE is not valid: $NIFI_BASE"
fi
else
NIFI_BASE=$NIFI_HOME
fi
}
locateConf() {
if [ "x$NIFI_CONF" != "x" ]; then
if [ ! -d "$NIFI_CONF" ]; then
die "NIFI_CONF is not valid: $NIFI_CONF"
fi
else
NIFI_CONF=$NIFI_BASE/conf
fi
}
setupNativePath() {
# Support for loading native libraries
LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:$NIFI_BASE/lib:$NIFI_HOME/lib"
# For Cygwin, set PATH from LD_LIBRARY_PATH
if $cygwin; then
LD_LIBRARY_PATH=`cygpath --path --windows "$LD_LIBRARY_PATH"`
PATH="$PATH;$LD_LIBRARY_PATH"
export PATH
fi
export LD_LIBRARY_PATH
}
pathCanonical() {
local dst="${1}"
while [ -h "${dst}" ] ; do
ls=`ls -ld "${dst}"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
dst="$link"
else
dst="`dirname "${dst}"`/$link"
fi
done
local bas=`basename "${dst}"`
local dir=`dirname "${dst}"`
if [ "$bas" != "$dir" ]; then
dst="`pathCanonical "$dir"`/$bas"
fi
echo "${dst}" | sed -e 's#//#/#g' -e 's#/./#/#g' -e 's#/[^/]*/../#/#g'
}
locateJava() { locateJava() {
# Setup the Java Virtual Machine # Setup the Java Virtual Machine
@ -223,82 +143,6 @@ locateJava() {
fi fi
} }
detectJVM() {
#echo "`$JAVA -version`"
# This service should call `java -version`,
# read stdout, and look for hints
if $JAVA -version 2>&1 | grep "^IBM" ; then
JVM_VENDOR="IBM"
# on OS/400, java -version does not contain IBM explicitly
elif $os400; then
JVM_VENDOR="IBM"
else
JVM_VENDOR="SUN"
fi
# echo "JVM vendor is $JVM_VENDOR"
}
setupDebugOptions() {
if [ "x$JAVA_OPTS" = "x" ]; then
JAVA_OPTS="$DEFAULT_JAVA_OPTS"
fi
export JAVA_OPTS
if [ "x$EXTRA_JAVA_OPTS" != "x" ]; then
JAVA_OPTS="$JAVA_OPTS $EXTRA_JAVA_OPTS"
fi
# Set Debug options if enabled
if [ "x$NIFI_DEBUG" != "x" ]; then
# Use the defaults if JAVA_DEBUG_OPTS was not set
if [ "x$JAVA_DEBUG_OPTS" = "x" ]; then
JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
fi
JAVA_OPTS="$JAVA_DEBUG_OPTS $JAVA_OPTS"
warn "Enabling Java debug options: $JAVA_DEBUG_OPTS"
fi
}
setupDefaults() {
DEFAULT_JAVA_OPTS="-Xms$JAVA_MIN_MEM -Xmx$JAVA_MAX_MEM -XX:PermSize=$JAVA_PERMSIZE -XX:MaxPermSize=$JAVA_MAX_PERMSIZE"
#Set the JVM_VENDOR specific JVM flags
if [ "$JVM_VENDOR" = "SUN" ]; then
#
# Check some easily accessible MIN/MAX params for JVM mem usage
#
if [ "x$JAVA_PERM_MEM" != "x" ]; then
DEFAULT_JAVA_OPTS="$DEFAULT_JAVA_OPTS -XX:PermSize=$JAVA_PERM_MEM"
fi
if [ "x$JAVA_MAX_PERM_MEM" != "x" ]; then
DEFAULT_JAVA_OPTS="$DEFAULT_JAVA_OPTS -XX:MaxPermSize=$JAVA_MAX_PERM_MEM"
fi
DEFAULT_JAVA_OPTS="-server $DEFAULT_JAVA_OPTS -Dcom.sun.management.jmxremote"
elif [ "$JVM_VENDOR" = "IBM" ]; then
if $os400; then
DEFAULT_JAVA_OPTS="$DEFAULT_JAVA_OPTS"
elif $aix; then
DEFAULT_JAVA_OPTS="-Xverify:none -Xdump:heap -Xlp $DEFAULT_JAVA_OPTS"
else
DEFAULT_JAVA_OPTS="-Xverify:none $DEFAULT_JAVA_OPTS"
fi
fi
DEFAULT_JAVA_OPTS="$DEFAULT_JAVA_OPTS -Djava.net.preferIPv4Stack=true -Dsun.net.http.allowRestrictedHeaders=true -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dorg.apache.jasper.compiler.disablejsr199=true -XX:ReservedCodeCacheSize=128m -XX:+UseCodeCacheFlushing"
# Setup classpath
CLASSPATH="$NIFI_HOME"/conf
for f in "$NIFI_HOME"/lib/*
do
CLASSPATH="${CLASSPATH}":"${f}"
done
DEFAULT_JAVA_DEBUG_OPTS="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"
}
init() { init() {
# Determine if there is special OS handling we must perform # Determine if there is special OS handling we must perform
detectOS detectOS
@ -309,49 +153,28 @@ init() {
# Locate the NiFi home directory # Locate the NiFi home directory
locateHome locateHome
# Locate the NiFi base directory
locateBase
# Locate the NiFi conf directory
locateConf
# Setup the native library path
setupNativePath
# Locate the Java VM to execute # Locate the Java VM to execute
locateJava locateJava
# Determine the JVM vendor
detectJVM
# Setup default options
setupDefaults
# Install debug options
setupDebugOptions
} }
run() { run() {
BOOTSTRAP_CONF="$NIFI_HOME/conf/bootstrap.conf";
if $cygwin; then if $cygwin; then
NIFI_HOME=`cygpath --path --windows "$NIFI_HOME"` NIFI_HOME=`cygpath --path --windows "$NIFI_HOME"`
NIFI_BASE=`cygpath --path --windows "$NIFI_BASE"` BOOTSTRAP_CONF=`cygpath --path --windows "$BOOTSTRAP_CONF"`
NIFI_CONF=`cygpath --path --windows "$NIFI_CONF"`
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
fi fi
# export CLASSPATH to the java process. Could also pass in via -cp
export CLASSPATH
echo echo
echo "Classpath: $CLASSPATH" echo "Classpath: $CLASSPATH"
echo echo
echo "Java home: $JAVA_HOME" echo "Java home: $JAVA_HOME"
echo "NiFi home: $NIFI_HOME" echo "NiFi home: $NIFI_HOME"
echo "Java Options: $JAVA_OPTS"
echo echo
echo "Launching NiFi. See logs..." echo "Bootstrap Config File: $BOOTSTRAP_CONF"
exec "$JAVA" -Dapp=nifi $JAVA_OPTS -Dnifi.properties.file.path="$NIFI_HOME"/conf/nifi.properties org.apache.nifi.NiFi echo
exec "$JAVA" -cp "$NIFI_HOME"/lib/nifi-bootstrap*.jar -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1
} }
main() { main() {

View File

@ -0,0 +1,32 @@
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
SET LIB_DIR=%~dp0..\lib
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=run
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

View File

@ -0,0 +1,32 @@
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
SET LIB_DIR=%~dp0..\lib
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=start
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

View File

@ -0,0 +1,32 @@
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
SET LIB_DIR=%~dp0..\lib
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=stop
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

View File

@ -0,0 +1,16 @@
# Configure where NiFi's lib and conf directories live
lib.dir=./lib
conf.dir=./conf
# How long to wait after telling NiFi to shutdown before explicitly killing the Process
graceful.shutdown.seconds=20
# Disable JSR 199 so that we can use JSP's without running a JDK
java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true
# JVM memory settings
java.arg.2=-Xms256m
java.arg.3=-Xmx512m
# Enable Remote Debugging
#java.arg.2=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000

View File

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BootstrapListener {
private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class);
private final NiFi nifi;
private final int bootstrapPort;
private volatile Listener listener;
private volatile ServerSocket serverSocket;
public BootstrapListener(final NiFi nifi, final int port) {
this.nifi = nifi;
this.bootstrapPort = port;
}
public void start() throws IOException {
logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", bootstrapPort);
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("localhost", 0));
serverSocket.setSoTimeout(2000);
final int localPort = serverSocket.getLocalPort();
logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort);
listener = new Listener(serverSocket);
final Thread listenThread = new Thread(listener);
listenThread.setDaemon(true);
listenThread.setName("Listen to Bootstrap");
listenThread.start();
logger.debug("Notifying Bootstrap that local port is {}", localPort);
try (final Socket socket = new Socket()) {
socket.setSoTimeout(60000);
socket.connect(new InetSocketAddress("localhost", bootstrapPort));
socket.setSoTimeout(60000);
final OutputStream out = socket.getOutputStream();
out.write(("PORT " + localPort + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
logger.debug("Awaiting response from Bootstrap...");
final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
final String response = reader.readLine();
if ("OK".equals(response)) {
logger.info("Successfully initiated communication with Bootstrap");
} else {
logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from NiFi");
}
}
}
public void stop() {
if (listener != null) {
listener.stop();
}
}
private class Listener implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService executor;
private volatile boolean stopped = false;
public Listener(final ServerSocket serverSocket) {
this.serverSocket = serverSocket;
this.executor = Executors.newFixedThreadPool(2);
}
public void stop() {
stopped = true;
executor.shutdownNow();
try {
serverSocket.close();
} catch (final IOException ioe) {
// nothing to really do here. we could log this, but it would just become
// confusing in the logs, as we're shutting down and there's no real benefit
}
}
@Override
public void run() {
while (!stopped) {
try {
final Socket socket;
try {
socket = serverSocket.accept();
} catch (final SocketTimeoutException ste) {
if ( stopped ) {
return;
}
continue;
} catch (final IOException ioe) {
if ( stopped ) {
return;
}
throw ioe;
}
executor.submit(new Runnable() {
@Override
public void run() {
try {
final BootstrapRequest request = readRequest(socket.getInputStream());
final BootstrapRequest.RequestType requestType = request.getRequestType();
switch (requestType) {
case PING:
logger.debug("Received PING request from Bootstrap; responding");
echoPing(socket.getOutputStream());
logger.debug("Responded to PING request from Bootstrap");
break;
case SHUTDOWN:
logger.info("Received SHUTDOWN request from Bootstrap");
echoShutdown(socket.getOutputStream());
nifi.shutdownHook();
return;
}
} catch (final Throwable t) {
logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
} finally {
try {
socket.close();
} catch (final IOException ioe) {
logger.warn("Failed to close socket to Bootstrap due to {}", ioe.toString());
}
}
}
});
} catch (final Throwable t) {
logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
}
}
}
}
private void echoPing(final OutputStream out) throws IOException {
out.write("PING\n".getBytes(StandardCharsets.UTF_8));
out.flush();
}
private void echoShutdown(final OutputStream out) throws IOException {
out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
out.flush();
}
private BootstrapRequest readRequest(final InputStream in) throws IOException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
final String line = reader.readLine();
final String[] splits = line.split(" ");
if ( splits.length < 0 ) {
throw new IOException("Received invalid command from NiFi: " + line);
}
final String requestType = splits[0];
final String[] args;
if ( splits.length == 1 ) {
args = new String[0];
} else {
args = Arrays.copyOfRange(splits, 1, splits.length);
}
try {
return new BootstrapRequest(requestType, args);
} catch (final Exception e) {
throw new IOException("Received invalid request from bootstrap; request type = " + requestType);
}
}
private static class BootstrapRequest {
public static enum RequestType {
SHUTDOWN,
PING;
}
private final RequestType requestType;
private final String[] args;
public BootstrapRequest(final String request, final String[] args) {
this.requestType = RequestType.valueOf(request);
this.args = args;
}
public RequestType getRequestType() {
return requestType;
}
public String[] getArgs() {
return args;
}
}
}

View File

@ -45,6 +45,10 @@ public class NiFi {
private static final Logger logger = LoggerFactory.getLogger(NiFi.class); private static final Logger logger = LoggerFactory.getLogger(NiFi.class);
private final NiFiServer nifiServer; private final NiFiServer nifiServer;
private final BootstrapListener bootstrapListener;
public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
private volatile boolean shutdown = false;
public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@ -65,6 +69,25 @@ public class NiFi {
} }
})); }));
final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
if ( bootstrapPort != null ) {
try {
final int port = Integer.parseInt(bootstrapPort);
if (port < 1 || port > 65535) {
throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
bootstrapListener = new BootstrapListener(this, port);
bootstrapListener.start();
} catch (final NumberFormatException nfe) {
throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
} else {
logger.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
bootstrapListener = null;
}
// delete the web working dir - if the application does not start successfully // delete the web working dir - if the application does not start successfully
// the web app directories might be in an invalid state. when this happens // the web app directories might be in an invalid state. when this happens
// jetty will not attempt to re-extract the war into the directory. by removing // jetty will not attempt to re-extract the war into the directory. by removing
@ -104,17 +127,28 @@ public class NiFi {
final long startTime = System.nanoTime(); final long startTime = System.nanoTime();
nifiServer = (NiFiServer) jettyConstructor.newInstance(properties); nifiServer = (NiFiServer) jettyConstructor.newInstance(properties);
nifiServer.setExtensionMapping(extensionMapping); nifiServer.setExtensionMapping(extensionMapping);
if ( shutdown ) {
logger.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
} else {
nifiServer.start(); nifiServer.start();
final long endTime = System.nanoTime(); final long endTime = System.nanoTime();
logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds."); logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds.");
} }
}
protected void shutdownHook() { protected void shutdownHook() {
try { try {
this.shutdown = true;
logger.info("Initiating shutdown of Jetty web server..."); logger.info("Initiating shutdown of Jetty web server...");
if (nifiServer != null) { if (nifiServer != null) {
nifiServer.stop(); nifiServer.stop();
} }
if (bootstrapListener != null) {
bootstrapListener.stop();
}
logger.info("Jetty web server shutdown completed (nicely or otherwise)."); logger.info("Jetty web server shutdown completed (nicely or otherwise).");
} catch (final Throwable t) { } catch (final Throwable t) {
logger.warn("Problem occured ensuring Jetty web server was properly terminated due to " + t); logger.warn("Problem occured ensuring Jetty web server was properly terminated due to " + t);

View File

@ -1,5 +1,18 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Arrays;
import org.apache.nifi.bootstrap.exception.InvalidCommandException;
public class BootstrapCodec {
private final RunNiFi runner;
private final BufferedReader reader;
private final BufferedWriter writer;
public BootstrapCodec(final RunNiFi runner, final InputStream in, final OutputStream out) {
this.runner = runner;
this.reader = new BufferedReader(new InputStreamReader(in));
this.writer = new BufferedWriter(new OutputStreamWriter(out));
}
public void communicate() throws IOException {
final String line = reader.readLine();
final String[] splits = line.split(" ");
if ( splits.length < 0 ) {
throw new IOException("Received invalid command from NiFi: " + line);
}
final String cmd = splits[0];
final String[] args;
if ( splits.length == 1 ) {
args = new String[0];
} else {
args = Arrays.copyOfRange(splits, 1, splits.length);
}
try {
processRequest(cmd, args);
} catch (final InvalidCommandException ice) {
throw new IOException("Received invalid command from NiFi: " + line + " : " + ice.getMessage() == null ? "" : "Details: " + ice.toString());
}
}
private void processRequest(final String cmd, final String[] args) throws InvalidCommandException, IOException {
switch (cmd) {
case "PORT": {
if ( args.length != 1 ) {
throw new InvalidCommandException();
}
final int port;
try {
port = Integer.parseInt( args[0] );
} catch (final NumberFormatException nfe) {
throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
}
if ( port < 1 || port > 65535 ) {
throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
}
runner.setNiFiCommandControlPort(port);
writer.write("OK");
writer.newLine();
writer.flush();
}
break;
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class NiFiListener {
private ServerSocket serverSocket;
private volatile Listener listener;
int start(final RunNiFi runner) throws IOException {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("localhost", 0));
final int localPort = serverSocket.getLocalPort();
listener = new Listener(serverSocket, runner);
final Thread listenThread = new Thread(listener);
listenThread.setName("Listen to NiFi");
listenThread.start();
return localPort;
}
public void stop() throws IOException {
final Listener listener = this.listener;
if ( listener == null ) {
return;
}
listener.stop();
}
private class Listener implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService executor;
private final RunNiFi runner;
private volatile boolean stopped = false;
public Listener(final ServerSocket serverSocket, final RunNiFi runner) {
this.serverSocket = serverSocket;
this.executor = Executors.newFixedThreadPool(2);
this.runner = runner;
}
public void stop() throws IOException {
stopped = true;
executor.shutdown();
try {
executor.awaitTermination(3, TimeUnit.SECONDS);
} catch (final InterruptedException ie) {
}
serverSocket.close();
}
@Override
public void run() {
while (!serverSocket.isClosed()) {
try {
if ( stopped ) {
return;
}
final Socket socket;
try {
socket = serverSocket.accept();
} catch (final IOException ioe) {
if ( stopped ) {
return;
}
throw ioe;
}
executor.submit(new Runnable() {
@Override
public void run() {
try {
final BootstrapCodec codec = new BootstrapCodec(runner, socket.getInputStream(), socket.getOutputStream());
codec.communicate();
socket.close();
} catch (final Throwable t) {
System.out.println("Failed to communicate with NiFi due to " + t);
t.printStackTrace();
}
}
});
} catch (final Throwable t) {
System.err.println("Failed to receive information from NiFi due to " + t);
t.printStackTrace();
}
}
}
}
}

View File

@ -1,16 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap; package org.apache.nifi.bootstrap;
import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** /**
@ -18,7 +46,6 @@ import java.util.Properties;
* *
* This class looks for the bootstrap.conf file by looking in the following places (in order): * This class looks for the bootstrap.conf file by looking in the following places (in order):
* <ol> * <ol>
* <li>First argument to the program</li>
* <li>Java System Property named {@code org.apache.nifi.bootstrap.config.file}</li> * <li>Java System Property named {@code org.apache.nifi.bootstrap.config.file}</li>
* <li>${NIFI_HOME}/./conf/bootstrap.conf, where ${NIFI_HOME} references an environment variable {@code NIFI_HOME}</li> * <li>${NIFI_HOME}/./conf/bootstrap.conf, where ${NIFI_HOME} references an environment variable {@code NIFI_HOME}</li>
* <li>./conf/bootstrap.conf, where {@code .} represents the working directory. * <li>./conf/bootstrap.conf, where {@code .} represents the working directory.
@ -27,14 +54,64 @@ import java.util.Properties;
* If the {@code bootstrap.conf} file cannot be found, throws a {@code FileNotFoundException]. * If the {@code bootstrap.conf} file cannot be found, throws a {@code FileNotFoundException].
*/ */
public class RunNiFi { public class RunNiFi {
public static final String DEFAULT_CONFIG_FILE = "./conf/boostrap.conf"; public static final String DEFAULT_CONFIG_FILE = "./conf/bootstrap.conf";
public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties"; public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties";
@SuppressWarnings({ "rawtypes", "unchecked" }) public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
public static void main(final String[] args) throws IOException, InterruptedException { public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
final ProcessBuilder builder = new ProcessBuilder();
String configFilename = (args.length > 0) ? args[0] : System.getProperty("org.apache.nifi.boostrap.config.file"); public static final int MAX_RESTART_ATTEMPTS = 5;
public static final int STARTUP_WAIT_SECONDS = 60;
public static final String SHUTDOWN_CMD = "SHUTDOWN";
public static final String PING_CMD = "PING";
private volatile boolean autoRestartNiFi = true;
private volatile int ccPort = -1;
private final Lock lock = new ReentrantLock();
private final Condition startupCondition = lock.newCondition();
private final File bootstrapConfigFile;
public RunNiFi(final File bootstrapConfigFile) {
this.bootstrapConfigFile = bootstrapConfigFile;
}
private static void printUsage() {
System.out.println("Usage:");
System.out.println();
System.out.println("java org.apache.nifi.bootstrap.RunNiFi <command>");
System.out.println();
System.out.println("Valid commands include:");
System.out.println("");
System.out.println("Start : Start a new instance of Apache NiFi");
System.out.println("Stop : Stop a running instance of Apache NiFi");
System.out.println("Status : Determine if there is a running instance of Apache NiFi");
System.out.println("Run : Start a new instance of Apache NiFi and monitor the Process, restarting if the instance dies");
System.out.println();
}
public static void main(final String[] args) throws IOException, InterruptedException {
if ( args.length != 1 ) {
printUsage();
return;
}
switch (args[0].toLowerCase()) {
case "start":
case "run":
case "stop":
case "status":
break;
default:
System.out.println("Invalid argument: " + args[0]);
System.out.println();
printUsage();
return;
}
String configFilename = System.getProperty("org.apache.nifi.bootstrap.config.file");
if ( configFilename == null ) { if ( configFilename == null ) {
final String nifiHome = System.getenv("NIFI_HOME"); final String nifiHome = System.getenv("NIFI_HOME");
@ -50,12 +127,137 @@ public class RunNiFi {
} }
final File configFile = new File(configFilename); final File configFile = new File(configFilename);
if ( !configFile.exists() ) {
throw new FileNotFoundException(DEFAULT_CONFIG_FILE); final RunNiFi runNiFi = new RunNiFi(configFile);
switch (args[0].toLowerCase()) {
case "start":
runNiFi.start(false);
break;
case "run":
runNiFi.start(true);
break;
case "stop":
runNiFi.stop();
break;
case "status":
runNiFi.status();
break;
}
}
public File getStatusFile() {
final File confDir = bootstrapConfigFile.getParentFile();
final File nifiHome = confDir.getParentFile();
final File bin = new File(nifiHome, "bin");
final File statusFile = new File(bin, "nifi.port");
return statusFile;
}
private Integer getCurrentPort() throws IOException {
try {
final File statusFile = getStatusFile();
final byte[] info = Files.readAllBytes(statusFile.toPath());
final String text = new String(info);
final int port = Integer.parseInt(text);
try (final Socket socket = new Socket("localhost", port)) {
final OutputStream out = socket.getOutputStream();
out.write((PING_CMD + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
final InputStream in = socket.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
final String response = reader.readLine();
if ( response.equals(PING_CMD) ) {
return port;
}
} catch (final IOException ioe) {
System.out.println("Found NiFi instance info at " + statusFile + " indicating that NiFi is running and listening to port " + port + " but unable to communicate with NiFi on that port. The process may have died or may be hung.");
throw ioe;
}
} catch (final Exception e) {
return null;
}
return null;
}
public void status() throws IOException {
final Integer port = getCurrentPort();
if ( port == null ) {
System.out.println("Apache NiFi does not appear to be running");
} else {
System.out.println("Apache NiFi is currently running, listening on port " + port);
}
return;
}
public void stop() throws IOException {
final Integer port = getCurrentPort();
if ( port == null ) {
System.out.println("Apache NiFi is not currently running");
return;
}
try (final Socket socket = new Socket()) {
socket.setSoTimeout(60000);
socket.connect(new InetSocketAddress("localhost", port));
socket.setSoTimeout(60000);
final OutputStream out = socket.getOutputStream();
out.write((SHUTDOWN_CMD + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
final InputStream in = socket.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
final String response = reader.readLine();
if ( SHUTDOWN_CMD.equals(response) ) {
System.out.println("Apache NiFi has accepted the Shutdown Command and is shutting down now");
final File statusFile = getStatusFile();
if ( !statusFile.delete() ) {
System.err.println("Failed to delete status file " + statusFile + "; this file should be cleaned up manually");
}
} else {
System.err.println("When sending SHUTDOWN command to NiFi, got unexpected response " + response);
}
} catch (final IOException ioe) {
System.err.println("Failed to communicate with Apache NiFi");
return;
}
}
public static boolean isAlive(final Process process) {
try {
process.exitValue();
return false;
} catch (final IllegalStateException | IllegalThreadStateException itse) {
return true;
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void start(final boolean monitor) throws IOException, InterruptedException {
final Integer port = getCurrentPort();
if ( port != null ) {
System.out.println("Apache NiFi is already running, listening on port " + port);
return;
}
final ProcessBuilder builder = new ProcessBuilder();
if ( !bootstrapConfigFile.exists() ) {
throw new FileNotFoundException(bootstrapConfigFile.getAbsolutePath());
} }
final Properties properties = new Properties(); final Properties properties = new Properties();
try (final FileInputStream fis = new FileInputStream(configFile)) { try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
properties.load(fis); properties.load(fis);
} }
@ -67,7 +269,13 @@ public class RunNiFi {
builder.directory(new File(specifiedWorkingDir)); builder.directory(new File(specifiedWorkingDir));
} }
final File workingDir = builder.directory(); final File bootstrapConfigAbsoluteFile = bootstrapConfigFile.getAbsoluteFile();
final File binDir = bootstrapConfigAbsoluteFile.getParentFile();
final File workingDir = binDir.getParentFile();
if ( specifiedWorkingDir == null ) {
builder.directory(workingDir);
}
final String libFilename = replaceNull(props.get("lib.dir"), "./lib").trim(); final String libFilename = replaceNull(props.get("lib.dir"), "./lib").trim();
File libDir = getFile(libFilename, workingDir); File libDir = getFile(libFilename, workingDir);
@ -112,13 +320,10 @@ public class RunNiFi {
throw new RuntimeException("Could not find conf directory at " + confDir.getAbsolutePath()); throw new RuntimeException("Could not find conf directory at " + confDir.getAbsolutePath());
} }
final Path workingDirPath = workingDir.toPath();
final List<String> cpFiles = new ArrayList<>(confFiles.length + libFiles.length); final List<String> cpFiles = new ArrayList<>(confFiles.length + libFiles.length);
cpFiles.add(confDir.getAbsolutePath()); cpFiles.add(confDir.getAbsolutePath());
for ( final File file : libFiles ) { for ( final File file : libFiles ) {
final Path path = workingDirPath.relativize(file.toPath()); cpFiles.add(file.getAbsolutePath());
final String cpPath = path.toString();
cpFiles.add(cpPath);
} }
final StringBuilder classPathBuilder = new StringBuilder(); final StringBuilder classPathBuilder = new StringBuilder();
@ -136,41 +341,154 @@ public class RunNiFi {
javaCmd = "java"; javaCmd = "java";
} }
final NiFiListener listener = new NiFiListener();
final int listenPort = listener.start(this);
final List<String> cmd = new ArrayList<>(); final List<String> cmd = new ArrayList<>();
cmd.add(javaCmd); cmd.add(javaCmd);
cmd.add("-classpath"); cmd.add("-classpath");
cmd.add(classPath); cmd.add(classPath);
cmd.addAll(javaAdditionalArgs); cmd.addAll(javaAdditionalArgs);
cmd.add("-Dnifi.properties.file.path=" + nifiPropsFilename); cmd.add("-Dnifi.properties.file.path=" + nifiPropsFilename);
cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
cmd.add("-Dapp=NiFi");
cmd.add("org.apache.nifi.NiFi"); cmd.add("org.apache.nifi.NiFi");
builder.command(cmd).inheritIO(); builder.command(cmd);
final StringBuilder cmdBuilder = new StringBuilder(); final StringBuilder cmdBuilder = new StringBuilder();
for ( final String s : cmd ) { for ( final String s : cmd ) {
cmdBuilder.append(s).append(" "); cmdBuilder.append(s).append(" ");
} }
System.out.println("Starting Apache NiFi..."); System.out.println("Starting Apache NiFi...");
System.out.println("Working Directory: " + workingDir.getAbsolutePath()); System.out.println("Working Directory: " + workingDir.getAbsolutePath());
System.out.println("Command: " + cmdBuilder.toString()); System.out.println("Command: " + cmdBuilder.toString());
final Process proc = builder.start(); if ( monitor ) {
Runtime.getRuntime().addShutdownHook(new ShutdownHook(proc)); String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP);
final int statusCode = proc.waitFor(); if ( gracefulShutdown == null ) {
System.out.println("Apache NiFi exited with Status Code " + statusCode); gracefulShutdown = DEFAULT_GRACEFUL_SHUTDOWN_VALUE;
}
final int gracefulShutdownSeconds;
try {
gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
} catch (final NumberFormatException nfe) {
throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
}
if ( gracefulShutdownSeconds < 0 ) {
throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
}
Process process = builder.start();
ShutdownHook shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds);
final Runtime runtime = Runtime.getRuntime();
runtime.addShutdownHook(shutdownHook);
while (true) {
final boolean alive = isAlive(process);
if ( alive ) {
try {
Thread.sleep(1000L);
} catch (final InterruptedException ie) {
}
} else {
runtime.removeShutdownHook(shutdownHook);
if (autoRestartNiFi) {
System.out.println("Apache NiFi appears to have died. Restarting...");
process = builder.start();
shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds);
runtime.addShutdownHook(shutdownHook);
final boolean started = waitForStart();
if ( started ) {
System.out.println("Successfully started Apache NiFi");
} else {
System.err.println("Apache NiFi does not appear to have started");
}
} else {
return;
}
}
}
} else {
builder.start();
boolean started = waitForStart();
if ( started ) {
System.out.println("Successfully started Apache NiFi");
} else {
System.err.println("Apache NiFi does not appear to have started");
}
listener.stop();
}
} }
private static File getFile(final String filename, final File workingDir) { private boolean waitForStart() {
File libDir = new File(filename); lock.lock();
if ( !libDir.isAbsolute() ) { try {
libDir = new File(workingDir, filename); final long startTime = System.nanoTime();
while ( ccPort < 1 ) {
try {
startupCondition.await(1, TimeUnit.SECONDS);
} catch (final InterruptedException ie) {
return false;
} }
return libDir; final long waitNanos = System.nanoTime() - startTime;
final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
if (waitSeconds > STARTUP_WAIT_SECONDS) {
return false;
}
}
} finally {
lock.unlock();
}
return true;
} }
private static String replaceNull(final String value, final String replacement) { private File getFile(final String filename, final File workingDir) {
File file = new File(filename);
if ( !file.isAbsolute() ) {
file = new File(workingDir, filename);
}
return file;
}
private String replaceNull(final String value, final String replacement) {
return (value == null) ? replacement : value; return (value == null) ? replacement : value;
} }
void setAutoRestartNiFi(final boolean restart) {
this.autoRestartNiFi = restart;
}
void setNiFiCommandControlPort(final int port) {
this.ccPort = port;
final File statusFile = getStatusFile();
try (final FileOutputStream fos = new FileOutputStream(statusFile)) {
fos.write(String.valueOf(port).getBytes(StandardCharsets.UTF_8));
fos.getFD().sync();
} catch (final IOException ioe) {
System.err.println("Apache NiFi has started but failed to persist NiFi Port information to " + statusFile.getAbsolutePath() + " due to " + ioe);
}
System.out.println("Apache NiFi now running and listening for requests on port " + port);
}
int getNiFiCommandControlPort() {
return this.ccPort;
}
} }

View File

@ -1,14 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap; package org.apache.nifi.bootstrap;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class ShutdownHook extends Thread { public class ShutdownHook extends Thread {
private final Process nifiProcess; private final Process nifiProcess;
private final RunNiFi runner;
private final int gracefulShutdownSeconds;
public ShutdownHook(final Process nifiProcess) { public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final int gracefulShutdownSeconds) {
this.nifiProcess = nifiProcess; this.nifiProcess = nifiProcess;
this.runner = runner;
this.gracefulShutdownSeconds = gracefulShutdownSeconds;
} }
@Override @Override
public void run() { public void run() {
runner.setAutoRestartNiFi(false);
final int ccPort = runner.getNiFiCommandControlPort();
if ( ccPort > 0 ) {
System.out.println("Initiating Shutdown of NiFi...");
try {
final Socket socket = new Socket("localhost", ccPort);
final OutputStream out = socket.getOutputStream();
out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
out.flush();
socket.close();
} catch (final IOException ioe) {
System.out.println("Failed to Shutdown NiFi due to " + ioe);
}
}
System.out.println("Waiting for Apache NiFi to finish shutting down...");
final long startWait = System.nanoTime();
while ( RunNiFi.isAlive(nifiProcess) ) {
final long waitNanos = System.nanoTime() - startWait;
final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
if ( waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0 ) {
if ( RunNiFi.isAlive(nifiProcess) ) {
System.out.println("NiFi has not finished shutting down after " + gracefulShutdownSeconds + " seconds. Killing process.");
nifiProcess.destroy(); nifiProcess.destroy();
} }
break;
} else {
try {
Thread.sleep(1000L);
} catch (final InterruptedException ie) {}
}
}
final File statusFile = runner.getStatusFile();
if ( !statusFile.delete() ) {
System.err.println("Failed to delete status file " + statusFile.getAbsolutePath() + "; this file should be cleaned up manually");
}
}
} }

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.exception;
public class InvalidCommandException extends Exception {
private static final long serialVersionUID = 1L;
public InvalidCommandException() {
super();
}
public InvalidCommandException(final String message) {
super(message);
}
public InvalidCommandException(final Throwable t) {
super(t);
}
public InvalidCommandException(final String message, final Throwable t) {
super(message, t);
}
}