diff --git a/assemblies/nifi/pom.xml b/assemblies/nifi/pom.xml index ec06a2327d..556b8fed70 100644 --- a/assemblies/nifi/pom.xml +++ b/assemblies/nifi/pom.xml @@ -119,6 +119,11 @@ nifi-runtime ${framework.version} + + org.apache.nifi + nifi-bootstrap + 0.0.1-SNAPSHOT + org.apache.nifi nifi-resources diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java index a0a07f2126..20c50b5763 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -1045,6 +1045,21 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H if (flowFileSwapManager != null) { flowFileSwapManager.shutdown(); } + + if ( processScheduler != null ) { + processScheduler.shutdown(); + } + + if ( provenanceEventRepository != null ) { + try { + provenanceEventRepository.close(); + } catch (final IOException ioe) { + LOG.warn("There was a problem shutting down the Provenance Repository: " + ioe.toString()); + if ( LOG.isDebugEnabled() ) { + LOG.warn("", ioe); + } + } + } } finally { writeLock.unlock(); } diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 8be9c62144..ba74295122 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -92,7 +92,7 @@ public class FileSystemRepository implements ContentRepository { private final List containerNames; private final AtomicLong index; - private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers"); + private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true); private final ConcurrentMap> reclaimable = new ConcurrentHashMap<>(); private final Map containerStateMap = new HashMap<>(); @@ -209,7 +209,7 @@ public class FileSystemRepository implements ContentRepository { } } - containerCleanupExecutor = new FlowEngine(containers.size(), "Cleanup FileSystemRepository Container"); + containerCleanupExecutor = new FlowEngine(containers.size(), "Cleanup FileSystemRepository Container", true); for (final Map.Entry containerEntry : containers.entrySet()) { final String containerName = containerEntry.getKey(); final Path containerPath = containerEntry.getValue(); diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index ab3a6a1fab..292c25815a 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -90,6 +90,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private final long checkpointDelayMillis; private final Path flowFileRepositoryPath; private final int numPartitions; + private final ScheduledExecutorService checkpointExecutor; // effectively final private WriteAheadRepository wal; @@ -128,6 +129,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis flowFileRepositoryPath = properties.getFlowFileRepositoryPath(); numPartitions = properties.getFlowFileRepositoryPartitions(); checkpointDelayMillis = FormatUtils.getTimeDuration(properties.getFlowFileRepositoryCheckpointInterval(), TimeUnit.MILLISECONDS); + + checkpointExecutor = Executors.newSingleThreadScheduledExecutor(); } @Override @@ -150,6 +153,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis checkpointFuture.cancel(false); } + checkpointExecutor.shutdown(); wal.shutdown(); } @@ -363,8 +367,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } }; - final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - checkpointFuture = executorService.scheduleWithFixedDelay(checkpointRunnable, checkpointDelayMillis, checkpointDelayMillis, TimeUnit.MILLISECONDS); + checkpointFuture = checkpointExecutor.scheduleWithFixedDelay(checkpointRunnable, checkpointDelayMillis, checkpointDelayMillis, TimeUnit.MILLISECONDS); return maxId; } diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 0d5055a436..7fc65f9426 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -138,6 +138,9 @@ public final class StandardProcessScheduler implements ProcessScheduler { LOG.error("", t); } } + + frameworkTaskExecutor.shutdown(); + componentLifeCycleThreadPool.shutdown(); } public void schedule(final ReportingTaskNode taskNode) { diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/engine/FlowEngine.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/engine/FlowEngine.java index 2430f56ddf..76e8e3e1cd 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/engine/FlowEngine.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/engine/FlowEngine.java @@ -42,6 +42,18 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor { * @param threadNamePrefix */ public FlowEngine(int corePoolSize, final String threadNamePrefix) { + this(corePoolSize, threadNamePrefix, false); + } + + /** + * Creates a new instance of FlowEngine + * + * @param corePoolSize the maximum number of threads available to tasks + * running in the engine. + * @param threadNamePrefix + * @param deamon if true, the thread pool will be populated with daemon threads, otherwise the threads will not be marked as daemon. + */ + public FlowEngine(int corePoolSize, final String threadNamePrefix, final boolean daemon) { super(corePoolSize); final AtomicInteger threadIndex = new AtomicInteger(0); @@ -50,6 +62,9 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor { @Override public Thread newThread(final Runnable r) { final Thread t = defaultThreadFactory.newThread(r); + if ( daemon ) { + t.setDaemon(true); + } t.setName(threadNamePrefix + " Thread-" + threadIndex.incrementAndGet()); return t; } diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat new file mode 100644 index 0000000000..ed9c5163ff --- /dev/null +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat @@ -0,0 +1,32 @@ +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +@echo off + +rem Use JAVA_HOME if it's set; otherwise, just use java +IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) + +SET LIB_DIR=%~dp0..\lib +SET CONF_DIR=%~dp0..\conf + +SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf +SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% + +SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi +SET BOOTSTRAP_ACTION=status + +cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh index 6fedfb74e3..ad90d5b35b 100644 --- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh @@ -21,34 +21,6 @@ DIRNAME=`dirname "$0"` PROGNAME=`basename "$0"` -# -# Sourcing environment settings for NIFI similar to tomcats setenv -# -NIFI_SCRIPT="nifi.sh" -export NIFI_SCRIPT -if [ -f "$DIRNAME/setenv.sh" ]; then - . "$DIRNAME/setenv.sh" -fi - -# -# Check/Set up some easily accessible MIN/MAX params for JVM mem usage -# -if [ "x$JAVA_MIN_MEM" = "x" ]; then - JAVA_MIN_MEM=512M - export JAVA_MIN_MEM -fi -if [ "x$JAVA_MAX_MEM" = "x" ]; then - JAVA_MAX_MEM=512M - export JAVA_MAX_MEM -fi -if [ "x$JAVA_PERMSIZE" = "x" ]; then - JAVA_PERMSIZE=128M - export JAVA_PERMSIZE -fi -if [ "x$JAVA_MAX_PERMSIZE" = "x" ]; then - JAVA_MAX_PERMSIZE=128M - export JAVA_MAX_PERMSIZE -fi # #Readlink is not available on all systems. Change variable to appropriate alternative as part of OS detection @@ -140,58 +112,6 @@ locateHome() { } -locateBase() { - if [ "x$NIFI_BASE" != "x" ]; then - if [ ! -d "$NIFI_BASE" ]; then - die "NIFI_BASE is not valid: $NIFI_BASE" - fi - else - NIFI_BASE=$NIFI_HOME - fi -} - - -locateConf() { - if [ "x$NIFI_CONF" != "x" ]; then - if [ ! -d "$NIFI_CONF" ]; then - die "NIFI_CONF is not valid: $NIFI_CONF" - fi - else - NIFI_CONF=$NIFI_BASE/conf - fi -} - -setupNativePath() { - # Support for loading native libraries - LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:$NIFI_BASE/lib:$NIFI_HOME/lib" - - # For Cygwin, set PATH from LD_LIBRARY_PATH - if $cygwin; then - LD_LIBRARY_PATH=`cygpath --path --windows "$LD_LIBRARY_PATH"` - PATH="$PATH;$LD_LIBRARY_PATH" - export PATH - fi - export LD_LIBRARY_PATH -} - -pathCanonical() { - local dst="${1}" - while [ -h "${dst}" ] ; do - ls=`ls -ld "${dst}"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - dst="$link" - else - dst="`dirname "${dst}"`/$link" - fi - done - local bas=`basename "${dst}"` - local dir=`dirname "${dst}"` - if [ "$bas" != "$dir" ]; then - dst="`pathCanonical "$dir"`/$bas" - fi - echo "${dst}" | sed -e 's#//#/#g' -e 's#/./#/#g' -e 's#/[^/]*/../#/#g' -} locateJava() { # Setup the Java Virtual Machine @@ -223,82 +143,6 @@ locateJava() { fi } -detectJVM() { - #echo "`$JAVA -version`" - # This service should call `java -version`, - # read stdout, and look for hints - if $JAVA -version 2>&1 | grep "^IBM" ; then - JVM_VENDOR="IBM" - # on OS/400, java -version does not contain IBM explicitly - elif $os400; then - JVM_VENDOR="IBM" - else - JVM_VENDOR="SUN" - fi - # echo "JVM vendor is $JVM_VENDOR" -} - -setupDebugOptions() { - if [ "x$JAVA_OPTS" = "x" ]; then - JAVA_OPTS="$DEFAULT_JAVA_OPTS" - fi - export JAVA_OPTS - - if [ "x$EXTRA_JAVA_OPTS" != "x" ]; then - JAVA_OPTS="$JAVA_OPTS $EXTRA_JAVA_OPTS" - fi - - # Set Debug options if enabled - if [ "x$NIFI_DEBUG" != "x" ]; then - # Use the defaults if JAVA_DEBUG_OPTS was not set - if [ "x$JAVA_DEBUG_OPTS" = "x" ]; then - JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS" - fi - - JAVA_OPTS="$JAVA_DEBUG_OPTS $JAVA_OPTS" - warn "Enabling Java debug options: $JAVA_DEBUG_OPTS" - fi -} - -setupDefaults() { - DEFAULT_JAVA_OPTS="-Xms$JAVA_MIN_MEM -Xmx$JAVA_MAX_MEM -XX:PermSize=$JAVA_PERMSIZE -XX:MaxPermSize=$JAVA_MAX_PERMSIZE" - - #Set the JVM_VENDOR specific JVM flags - if [ "$JVM_VENDOR" = "SUN" ]; then - # - # Check some easily accessible MIN/MAX params for JVM mem usage - # - if [ "x$JAVA_PERM_MEM" != "x" ]; then - DEFAULT_JAVA_OPTS="$DEFAULT_JAVA_OPTS -XX:PermSize=$JAVA_PERM_MEM" - fi - if [ "x$JAVA_MAX_PERM_MEM" != "x" ]; then - DEFAULT_JAVA_OPTS="$DEFAULT_JAVA_OPTS -XX:MaxPermSize=$JAVA_MAX_PERM_MEM" - fi - DEFAULT_JAVA_OPTS="-server $DEFAULT_JAVA_OPTS -Dcom.sun.management.jmxremote" - elif [ "$JVM_VENDOR" = "IBM" ]; then - if $os400; then - DEFAULT_JAVA_OPTS="$DEFAULT_JAVA_OPTS" - elif $aix; then - DEFAULT_JAVA_OPTS="-Xverify:none -Xdump:heap -Xlp $DEFAULT_JAVA_OPTS" - else - DEFAULT_JAVA_OPTS="-Xverify:none $DEFAULT_JAVA_OPTS" - fi - fi - - DEFAULT_JAVA_OPTS="$DEFAULT_JAVA_OPTS -Djava.net.preferIPv4Stack=true -Dsun.net.http.allowRestrictedHeaders=true -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dorg.apache.jasper.compiler.disablejsr199=true -XX:ReservedCodeCacheSize=128m -XX:+UseCodeCacheFlushing" - - # Setup classpath - CLASSPATH="$NIFI_HOME"/conf - for f in "$NIFI_HOME"/lib/* - do - CLASSPATH="${CLASSPATH}":"${f}" - done - - - DEFAULT_JAVA_DEBUG_OPTS="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005" - -} - init() { # Determine if there is special OS handling we must perform detectOS @@ -309,49 +153,28 @@ init() { # Locate the NiFi home directory locateHome - # Locate the NiFi base directory - locateBase - - # Locate the NiFi conf directory - locateConf - - # Setup the native library path - setupNativePath - # Locate the Java VM to execute locateJava - - # Determine the JVM vendor - detectJVM - - # Setup default options - setupDefaults - - # Install debug options - setupDebugOptions - } run() { - + BOOTSTRAP_CONF="$NIFI_HOME/conf/bootstrap.conf"; + if $cygwin; then NIFI_HOME=`cygpath --path --windows "$NIFI_HOME"` - NIFI_BASE=`cygpath --path --windows "$NIFI_BASE"` - NIFI_CONF=`cygpath --path --windows "$NIFI_CONF"` - CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + BOOTSTRAP_CONF=`cygpath --path --windows "$BOOTSTRAP_CONF"` fi - # export CLASSPATH to the java process. Could also pass in via -cp - export CLASSPATH + echo echo "Classpath: $CLASSPATH" echo echo "Java home: $JAVA_HOME" echo "NiFi home: $NIFI_HOME" - echo "Java Options: $JAVA_OPTS" echo - echo "Launching NiFi. See logs..." - exec "$JAVA" -Dapp=nifi $JAVA_OPTS -Dnifi.properties.file.path="$NIFI_HOME"/conf/nifi.properties org.apache.nifi.NiFi - + echo "Bootstrap Config File: $BOOTSTRAP_CONF" + echo + + exec "$JAVA" -cp "$NIFI_HOME"/lib/nifi-bootstrap*.jar -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1 } main() { diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/run-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/run-nifi.bat new file mode 100644 index 0000000000..fdff815a0a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/run-nifi.bat @@ -0,0 +1,32 @@ +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +@echo off + +rem Use JAVA_HOME if it's set; otherwise, just use java +IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) + +SET LIB_DIR=%~dp0..\lib +SET CONF_DIR=%~dp0..\conf + +SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf +SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% + +SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi +SET BOOTSTRAP_ACTION=run + +cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat new file mode 100644 index 0000000000..ba4739a998 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat @@ -0,0 +1,32 @@ +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +@echo off + +rem Use JAVA_HOME if it's set; otherwise, just use java +IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) + +SET LIB_DIR=%~dp0..\lib +SET CONF_DIR=%~dp0..\conf + +SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf +SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% + +SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi +SET BOOTSTRAP_ACTION=start + +cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat new file mode 100644 index 0000000000..828be6ec85 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat @@ -0,0 +1,32 @@ +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +@echo off + +rem Use JAVA_HOME if it's set; otherwise, just use java +IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) + +SET LIB_DIR=%~dp0..\lib +SET CONF_DIR=%~dp0..\conf + +SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf +SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% + +SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi +SET BOOTSTRAP_ACTION=stop + +cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf b/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf new file mode 100644 index 0000000000..c45d8f8280 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf @@ -0,0 +1,16 @@ +# Configure where NiFi's lib and conf directories live +lib.dir=./lib +conf.dir=./conf + +# How long to wait after telling NiFi to shutdown before explicitly killing the Process +graceful.shutdown.seconds=20 + +# Disable JSR 199 so that we can use JSP's without running a JDK +java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true + +# JVM memory settings +java.arg.2=-Xms256m +java.arg.3=-Xmx512m + +# Enable Remote Debugging +#java.arg.2=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 \ No newline at end of file diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java new file mode 100644 index 0000000000..31f336cb83 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BootstrapListener { + private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class); + + private final NiFi nifi; + private final int bootstrapPort; + + private volatile Listener listener; + private volatile ServerSocket serverSocket; + + + public BootstrapListener(final NiFi nifi, final int port) { + this.nifi = nifi; + this.bootstrapPort = port; + } + + public void start() throws IOException { + logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", bootstrapPort); + + serverSocket = new ServerSocket(); + serverSocket.bind(new InetSocketAddress("localhost", 0)); + serverSocket.setSoTimeout(2000); + + final int localPort = serverSocket.getLocalPort(); + logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort); + + listener = new Listener(serverSocket); + final Thread listenThread = new Thread(listener); + listenThread.setDaemon(true); + listenThread.setName("Listen to Bootstrap"); + listenThread.start(); + + logger.debug("Notifying Bootstrap that local port is {}", localPort); + try (final Socket socket = new Socket()) { + socket.setSoTimeout(60000); + socket.connect(new InetSocketAddress("localhost", bootstrapPort)); + socket.setSoTimeout(60000); + + final OutputStream out = socket.getOutputStream(); + out.write(("PORT " + localPort + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + logger.debug("Awaiting response from Bootstrap..."); + final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + final String response = reader.readLine(); + if ("OK".equals(response)) { + logger.info("Successfully initiated communication with Bootstrap"); + } else { + logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from NiFi"); + } + } + } + + + public void stop() { + if (listener != null) { + listener.stop(); + } + } + + private class Listener implements Runnable { + private final ServerSocket serverSocket; + private final ExecutorService executor; + private volatile boolean stopped = false; + + public Listener(final ServerSocket serverSocket) { + this.serverSocket = serverSocket; + this.executor = Executors.newFixedThreadPool(2); + } + + public void stop() { + stopped = true; + + executor.shutdownNow(); + + try { + serverSocket.close(); + } catch (final IOException ioe) { + // nothing to really do here. we could log this, but it would just become + // confusing in the logs, as we're shutting down and there's no real benefit + } + } + + @Override + public void run() { + while (!stopped) { + try { + final Socket socket; + try { + socket = serverSocket.accept(); + } catch (final SocketTimeoutException ste) { + if ( stopped ) { + return; + } + + continue; + } catch (final IOException ioe) { + if ( stopped ) { + return; + } + + throw ioe; + } + + executor.submit(new Runnable() { + @Override + public void run() { + try { + final BootstrapRequest request = readRequest(socket.getInputStream()); + final BootstrapRequest.RequestType requestType = request.getRequestType(); + + switch (requestType) { + case PING: + logger.debug("Received PING request from Bootstrap; responding"); + echoPing(socket.getOutputStream()); + logger.debug("Responded to PING request from Bootstrap"); + break; + case SHUTDOWN: + logger.info("Received SHUTDOWN request from Bootstrap"); + echoShutdown(socket.getOutputStream()); + nifi.shutdownHook(); + return; + } + } catch (final Throwable t) { + logger.error("Failed to process request from Bootstrap due to " + t.toString(), t); + } finally { + try { + socket.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close socket to Bootstrap due to {}", ioe.toString()); + } + } + } + }); + } catch (final Throwable t) { + logger.error("Failed to process request from Bootstrap due to " + t.toString(), t); + } + } + } + } + + + private void echoPing(final OutputStream out) throws IOException { + out.write("PING\n".getBytes(StandardCharsets.UTF_8)); + out.flush(); + } + + private void echoShutdown(final OutputStream out) throws IOException { + out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8)); + out.flush(); + } + + private BootstrapRequest readRequest(final InputStream in) throws IOException { + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + + final String line = reader.readLine(); + final String[] splits = line.split(" "); + if ( splits.length < 0 ) { + throw new IOException("Received invalid command from NiFi: " + line); + } + + final String requestType = splits[0]; + final String[] args; + if ( splits.length == 1 ) { + args = new String[0]; + } else { + args = Arrays.copyOfRange(splits, 1, splits.length); + } + + try { + return new BootstrapRequest(requestType, args); + } catch (final Exception e) { + throw new IOException("Received invalid request from bootstrap; request type = " + requestType); + } + } + + + private static class BootstrapRequest { + public static enum RequestType { + SHUTDOWN, + PING; + } + + private final RequestType requestType; + private final String[] args; + + public BootstrapRequest(final String request, final String[] args) { + this.requestType = RequestType.valueOf(request); + this.args = args; + } + + public RequestType getRequestType() { + return requestType; + } + + public String[] getArgs() { + return args; + } + } +} diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java index 5fd1a138be..13cd4d691b 100644 --- a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java +++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java @@ -45,6 +45,10 @@ public class NiFi { private static final Logger logger = LoggerFactory.getLogger(NiFi.class); private final NiFiServer nifiServer; + private final BootstrapListener bootstrapListener; + + public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port"; + private volatile boolean shutdown = false; public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { @@ -65,6 +69,25 @@ public class NiFi { } })); + final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY); + if ( bootstrapPort != null ) { + try { + final int port = Integer.parseInt(bootstrapPort); + + if (port < 1 || port > 65535) { + throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); + } + + bootstrapListener = new BootstrapListener(this, port); + bootstrapListener.start(); + } catch (final NumberFormatException nfe) { + throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); + } + } else { + logger.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap"); + bootstrapListener = null; + } + // delete the web working dir - if the application does not start successfully // the web app directories might be in an invalid state. when this happens // jetty will not attempt to re-extract the war into the directory. by removing @@ -104,17 +127,28 @@ public class NiFi { final long startTime = System.nanoTime(); nifiServer = (NiFiServer) jettyConstructor.newInstance(properties); nifiServer.setExtensionMapping(extensionMapping); - nifiServer.start(); - final long endTime = System.nanoTime(); - logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds."); + + if ( shutdown ) { + logger.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller"); + } else { + nifiServer.start(); + + final long endTime = System.nanoTime(); + logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds."); + } } protected void shutdownHook() { try { + this.shutdown = true; + logger.info("Initiating shutdown of Jetty web server..."); if (nifiServer != null) { nifiServer.stop(); } + if (bootstrapListener != null) { + bootstrapListener.stop(); + } logger.info("Jetty web server shutdown completed (nicely or otherwise)."); } catch (final Throwable t) { logger.warn("Problem occured ensuring Jetty web server was properly terminated due to " + t); diff --git a/nifi-bootstrap/pom.xml b/nifi-bootstrap/pom.xml index b620c84a78..a992018d94 100644 --- a/nifi-bootstrap/pom.xml +++ b/nifi-bootstrap/pom.xml @@ -1,5 +1,18 @@ - + + 4.0.0 diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java new file mode 100644 index 0000000000..8138c02076 --- /dev/null +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.Arrays; + +import org.apache.nifi.bootstrap.exception.InvalidCommandException; + +public class BootstrapCodec { + private final RunNiFi runner; + private final BufferedReader reader; + private final BufferedWriter writer; + + public BootstrapCodec(final RunNiFi runner, final InputStream in, final OutputStream out) { + this.runner = runner; + this.reader = new BufferedReader(new InputStreamReader(in)); + this.writer = new BufferedWriter(new OutputStreamWriter(out)); + } + + public void communicate() throws IOException { + final String line = reader.readLine(); + final String[] splits = line.split(" "); + if ( splits.length < 0 ) { + throw new IOException("Received invalid command from NiFi: " + line); + } + + final String cmd = splits[0]; + final String[] args; + if ( splits.length == 1 ) { + args = new String[0]; + } else { + args = Arrays.copyOfRange(splits, 1, splits.length); + } + + try { + processRequest(cmd, args); + } catch (final InvalidCommandException ice) { + throw new IOException("Received invalid command from NiFi: " + line + " : " + ice.getMessage() == null ? "" : "Details: " + ice.toString()); + } + } + + private void processRequest(final String cmd, final String[] args) throws InvalidCommandException, IOException { + switch (cmd) { + case "PORT": { + if ( args.length != 1 ) { + throw new InvalidCommandException(); + } + + final int port; + try { + port = Integer.parseInt( args[0] ); + } catch (final NumberFormatException nfe) { + throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535"); + } + + if ( port < 1 || port > 65535 ) { + throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535"); + } + + runner.setNiFiCommandControlPort(port); + writer.write("OK"); + writer.newLine(); + writer.flush(); + } + break; + } + } +} diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java new file mode 100644 index 0000000000..c83135199f --- /dev/null +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class NiFiListener { + private ServerSocket serverSocket; + private volatile Listener listener; + + int start(final RunNiFi runner) throws IOException { + serverSocket = new ServerSocket(); + serverSocket.bind(new InetSocketAddress("localhost", 0)); + + final int localPort = serverSocket.getLocalPort(); + listener = new Listener(serverSocket, runner); + final Thread listenThread = new Thread(listener); + listenThread.setName("Listen to NiFi"); + listenThread.start(); + return localPort; + } + + public void stop() throws IOException { + final Listener listener = this.listener; + if ( listener == null ) { + return; + } + + listener.stop(); + } + + private class Listener implements Runnable { + private final ServerSocket serverSocket; + private final ExecutorService executor; + private final RunNiFi runner; + private volatile boolean stopped = false; + + public Listener(final ServerSocket serverSocket, final RunNiFi runner) { + this.serverSocket = serverSocket; + this.executor = Executors.newFixedThreadPool(2); + this.runner = runner; + } + + public void stop() throws IOException { + stopped = true; + + executor.shutdown(); + try { + executor.awaitTermination(3, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + } + + serverSocket.close(); + } + + @Override + public void run() { + while (!serverSocket.isClosed()) { + try { + if ( stopped ) { + return; + } + + final Socket socket; + try { + socket = serverSocket.accept(); + } catch (final IOException ioe) { + if ( stopped ) { + return; + } + + throw ioe; + } + + + executor.submit(new Runnable() { + @Override + public void run() { + try { + final BootstrapCodec codec = new BootstrapCodec(runner, socket.getInputStream(), socket.getOutputStream()); + codec.communicate(); + socket.close(); + } catch (final Throwable t) { + System.out.println("Failed to communicate with NiFi due to " + t); + t.printStackTrace(); + } + } + }); + } catch (final Throwable t) { + System.err.println("Failed to receive information from NiFi due to " + t); + t.printStackTrace(); + } + } + } + } +} diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index afa1f4713e..af783a1bc1 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -1,16 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.bootstrap; +import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; -import java.nio.file.Path; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** @@ -18,7 +46,6 @@ import java.util.Properties; * * This class looks for the bootstrap.conf file by looking in the following places (in order): *
    - *
  1. First argument to the program
  2. *
  3. Java System Property named {@code org.apache.nifi.bootstrap.config.file}
  4. *
  5. ${NIFI_HOME}/./conf/bootstrap.conf, where ${NIFI_HOME} references an environment variable {@code NIFI_HOME}
  6. *
  7. ./conf/bootstrap.conf, where {@code .} represents the working directory. @@ -27,14 +54,64 @@ import java.util.Properties; * If the {@code bootstrap.conf} file cannot be found, throws a {@code FileNotFoundException]. */ public class RunNiFi { - public static final String DEFAULT_CONFIG_FILE = "./conf/boostrap.conf"; + public static final String DEFAULT_CONFIG_FILE = "./conf/bootstrap.conf"; public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties"; - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static void main(final String[] args) throws IOException, InterruptedException { - final ProcessBuilder builder = new ProcessBuilder(); - String configFilename = (args.length > 0) ? args[0] : System.getProperty("org.apache.nifi.boostrap.config.file"); + public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds"; + public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20"; + + public static final int MAX_RESTART_ATTEMPTS = 5; + public static final int STARTUP_WAIT_SECONDS = 60; + + public static final String SHUTDOWN_CMD = "SHUTDOWN"; + public static final String PING_CMD = "PING"; + + private volatile boolean autoRestartNiFi = true; + private volatile int ccPort = -1; + + private final Lock lock = new ReentrantLock(); + private final Condition startupCondition = lock.newCondition(); + + private final File bootstrapConfigFile; + + public RunNiFi(final File bootstrapConfigFile) { + this.bootstrapConfigFile = bootstrapConfigFile; + } + + private static void printUsage() { + System.out.println("Usage:"); + System.out.println(); + System.out.println("java org.apache.nifi.bootstrap.RunNiFi "); + System.out.println(); + System.out.println("Valid commands include:"); + System.out.println(""); + System.out.println("Start : Start a new instance of Apache NiFi"); + System.out.println("Stop : Stop a running instance of Apache NiFi"); + System.out.println("Status : Determine if there is a running instance of Apache NiFi"); + System.out.println("Run : Start a new instance of Apache NiFi and monitor the Process, restarting if the instance dies"); + System.out.println(); + } + + public static void main(final String[] args) throws IOException, InterruptedException { + if ( args.length != 1 ) { + printUsage(); + return; + } + + switch (args[0].toLowerCase()) { + case "start": + case "run": + case "stop": + case "status": + break; + default: + System.out.println("Invalid argument: " + args[0]); + System.out.println(); + printUsage(); + return; + } + + String configFilename = System.getProperty("org.apache.nifi.bootstrap.config.file"); if ( configFilename == null ) { final String nifiHome = System.getenv("NIFI_HOME"); @@ -50,12 +127,137 @@ public class RunNiFi { } final File configFile = new File(configFilename); - if ( !configFile.exists() ) { - throw new FileNotFoundException(DEFAULT_CONFIG_FILE); + + final RunNiFi runNiFi = new RunNiFi(configFile); + + switch (args[0].toLowerCase()) { + case "start": + runNiFi.start(false); + break; + case "run": + runNiFi.start(true); + break; + case "stop": + runNiFi.stop(); + break; + case "status": + runNiFi.status(); + break; + } + } + + + public File getStatusFile() { + final File confDir = bootstrapConfigFile.getParentFile(); + final File nifiHome = confDir.getParentFile(); + final File bin = new File(nifiHome, "bin"); + final File statusFile = new File(bin, "nifi.port"); + return statusFile; + } + + private Integer getCurrentPort() throws IOException { + try { + final File statusFile = getStatusFile(); + final byte[] info = Files.readAllBytes(statusFile.toPath()); + final String text = new String(info); + + final int port = Integer.parseInt(text); + + try (final Socket socket = new Socket("localhost", port)) { + final OutputStream out = socket.getOutputStream(); + out.write((PING_CMD + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + final InputStream in = socket.getInputStream(); + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + final String response = reader.readLine(); + if ( response.equals(PING_CMD) ) { + return port; + } + } catch (final IOException ioe) { + System.out.println("Found NiFi instance info at " + statusFile + " indicating that NiFi is running and listening to port " + port + " but unable to communicate with NiFi on that port. The process may have died or may be hung."); + throw ioe; + } + } catch (final Exception e) { + return null; + } + + return null; + } + + + public void status() throws IOException { + final Integer port = getCurrentPort(); + if ( port == null ) { + System.out.println("Apache NiFi does not appear to be running"); + } else { + System.out.println("Apache NiFi is currently running, listening on port " + port); + } + return; + } + + + public void stop() throws IOException { + final Integer port = getCurrentPort(); + if ( port == null ) { + System.out.println("Apache NiFi is not currently running"); + return; + } + + try (final Socket socket = new Socket()) { + socket.setSoTimeout(60000); + socket.connect(new InetSocketAddress("localhost", port)); + socket.setSoTimeout(60000); + + final OutputStream out = socket.getOutputStream(); + out.write((SHUTDOWN_CMD + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + final InputStream in = socket.getInputStream(); + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + final String response = reader.readLine(); + if ( SHUTDOWN_CMD.equals(response) ) { + System.out.println("Apache NiFi has accepted the Shutdown Command and is shutting down now"); + + final File statusFile = getStatusFile(); + if ( !statusFile.delete() ) { + System.err.println("Failed to delete status file " + statusFile + "; this file should be cleaned up manually"); + } + } else { + System.err.println("When sending SHUTDOWN command to NiFi, got unexpected response " + response); + } + } catch (final IOException ioe) { + System.err.println("Failed to communicate with Apache NiFi"); + return; + } + } + + + public static boolean isAlive(final Process process) { + try { + process.exitValue(); + return false; + } catch (final IllegalStateException | IllegalThreadStateException itse) { + return true; + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void start(final boolean monitor) throws IOException, InterruptedException { + final Integer port = getCurrentPort(); + if ( port != null ) { + System.out.println("Apache NiFi is already running, listening on port " + port); + return; + } + + final ProcessBuilder builder = new ProcessBuilder(); + + if ( !bootstrapConfigFile.exists() ) { + throw new FileNotFoundException(bootstrapConfigFile.getAbsolutePath()); } final Properties properties = new Properties(); - try (final FileInputStream fis = new FileInputStream(configFile)) { + try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) { properties.load(fis); } @@ -67,7 +269,13 @@ public class RunNiFi { builder.directory(new File(specifiedWorkingDir)); } - final File workingDir = builder.directory(); + final File bootstrapConfigAbsoluteFile = bootstrapConfigFile.getAbsoluteFile(); + final File binDir = bootstrapConfigAbsoluteFile.getParentFile(); + final File workingDir = binDir.getParentFile(); + + if ( specifiedWorkingDir == null ) { + builder.directory(workingDir); + } final String libFilename = replaceNull(props.get("lib.dir"), "./lib").trim(); File libDir = getFile(libFilename, workingDir); @@ -112,13 +320,10 @@ public class RunNiFi { throw new RuntimeException("Could not find conf directory at " + confDir.getAbsolutePath()); } - final Path workingDirPath = workingDir.toPath(); final List cpFiles = new ArrayList<>(confFiles.length + libFiles.length); cpFiles.add(confDir.getAbsolutePath()); for ( final File file : libFiles ) { - final Path path = workingDirPath.relativize(file.toPath()); - final String cpPath = path.toString(); - cpFiles.add(cpPath); + cpFiles.add(file.getAbsolutePath()); } final StringBuilder classPathBuilder = new StringBuilder(); @@ -136,41 +341,154 @@ public class RunNiFi { javaCmd = "java"; } + final NiFiListener listener = new NiFiListener(); + final int listenPort = listener.start(this); + final List cmd = new ArrayList<>(); cmd.add(javaCmd); cmd.add("-classpath"); cmd.add(classPath); cmd.addAll(javaAdditionalArgs); cmd.add("-Dnifi.properties.file.path=" + nifiPropsFilename); + cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort); + cmd.add("-Dapp=NiFi"); cmd.add("org.apache.nifi.NiFi"); - builder.command(cmd).inheritIO(); + builder.command(cmd); final StringBuilder cmdBuilder = new StringBuilder(); for ( final String s : cmd ) { cmdBuilder.append(s).append(" "); } + System.out.println("Starting Apache NiFi..."); System.out.println("Working Directory: " + workingDir.getAbsolutePath()); System.out.println("Command: " + cmdBuilder.toString()); + + if ( monitor ) { + String gracefulShutdown = props.get(GRACEFUL_SHUTDOWN_PROP); + if ( gracefulShutdown == null ) { + gracefulShutdown = DEFAULT_GRACEFUL_SHUTDOWN_VALUE; + } - final Process proc = builder.start(); - Runtime.getRuntime().addShutdownHook(new ShutdownHook(proc)); - final int statusCode = proc.waitFor(); - System.out.println("Apache NiFi exited with Status Code " + statusCode); + final int gracefulShutdownSeconds; + try { + gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown); + } catch (final NumberFormatException nfe) { + throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); + } + + if ( gracefulShutdownSeconds < 0 ) { + throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer"); + } + + Process process = builder.start(); + + ShutdownHook shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds); + final Runtime runtime = Runtime.getRuntime(); + runtime.addShutdownHook(shutdownHook); + + while (true) { + final boolean alive = isAlive(process); + + if ( alive ) { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + } + } else { + runtime.removeShutdownHook(shutdownHook); + + if (autoRestartNiFi) { + System.out.println("Apache NiFi appears to have died. Restarting..."); + process = builder.start(); + + shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds); + runtime.addShutdownHook(shutdownHook); + + final boolean started = waitForStart(); + + if ( started ) { + System.out.println("Successfully started Apache NiFi"); + } else { + System.err.println("Apache NiFi does not appear to have started"); + } + } else { + return; + } + } + } + } else { + builder.start(); + boolean started = waitForStart(); + + if ( started ) { + System.out.println("Successfully started Apache NiFi"); + } else { + System.err.println("Apache NiFi does not appear to have started"); + } + + listener.stop(); + } } - private static File getFile(final String filename, final File workingDir) { - File libDir = new File(filename); - if ( !libDir.isAbsolute() ) { - libDir = new File(workingDir, filename); + private boolean waitForStart() { + lock.lock(); + try { + final long startTime = System.nanoTime(); + + while ( ccPort < 1 ) { + try { + startupCondition.await(1, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + return false; + } + + final long waitNanos = System.nanoTime() - startTime; + final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); + if (waitSeconds > STARTUP_WAIT_SECONDS) { + return false; + } + } + } finally { + lock.unlock(); + } + return true; + } + + private File getFile(final String filename, final File workingDir) { + File file = new File(filename); + if ( !file.isAbsolute() ) { + file = new File(workingDir, filename); } - return libDir; + return file; } - private static String replaceNull(final String value, final String replacement) { + private String replaceNull(final String value, final String replacement) { return (value == null) ? replacement : value; } + + void setAutoRestartNiFi(final boolean restart) { + this.autoRestartNiFi = restart; + } + + void setNiFiCommandControlPort(final int port) { + this.ccPort = port; + + final File statusFile = getStatusFile(); + try (final FileOutputStream fos = new FileOutputStream(statusFile)) { + fos.write(String.valueOf(port).getBytes(StandardCharsets.UTF_8)); + fos.getFD().sync(); + } catch (final IOException ioe) { + System.err.println("Apache NiFi has started but failed to persist NiFi Port information to " + statusFile.getAbsolutePath() + " due to " + ioe); + } + + System.out.println("Apache NiFi now running and listening for requests on port " + port); + } + + int getNiFiCommandControlPort() { + return this.ccPort; + } } diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java index 55e1f457d2..3c5ed1f951 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/ShutdownHook.java @@ -1,14 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.bootstrap; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + public class ShutdownHook extends Thread { private final Process nifiProcess; + private final RunNiFi runner; + private final int gracefulShutdownSeconds; - public ShutdownHook(final Process nifiProcess) { + public ShutdownHook(final Process nifiProcess, final RunNiFi runner, final int gracefulShutdownSeconds) { this.nifiProcess = nifiProcess; + this.runner = runner; + this.gracefulShutdownSeconds = gracefulShutdownSeconds; } @Override public void run() { - nifiProcess.destroy(); + runner.setAutoRestartNiFi(false); + final int ccPort = runner.getNiFiCommandControlPort(); + if ( ccPort > 0 ) { + System.out.println("Initiating Shutdown of NiFi..."); + + try { + final Socket socket = new Socket("localhost", ccPort); + final OutputStream out = socket.getOutputStream(); + out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8)); + out.flush(); + + socket.close(); + } catch (final IOException ioe) { + System.out.println("Failed to Shutdown NiFi due to " + ioe); + } + } + + System.out.println("Waiting for Apache NiFi to finish shutting down..."); + final long startWait = System.nanoTime(); + while ( RunNiFi.isAlive(nifiProcess) ) { + final long waitNanos = System.nanoTime() - startWait; + final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos); + if ( waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0 ) { + if ( RunNiFi.isAlive(nifiProcess) ) { + System.out.println("NiFi has not finished shutting down after " + gracefulShutdownSeconds + " seconds. Killing process."); + nifiProcess.destroy(); + } + break; + } else { + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) {} + } + } + + final File statusFile = runner.getStatusFile(); + if ( !statusFile.delete() ) { + System.err.println("Failed to delete status file " + statusFile.getAbsolutePath() + "; this file should be cleaned up manually"); + } } } diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java new file mode 100644 index 0000000000..962aa1c39d --- /dev/null +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/exception/InvalidCommandException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap.exception; + +public class InvalidCommandException extends Exception { + private static final long serialVersionUID = 1L; + + public InvalidCommandException() { + super(); + } + + public InvalidCommandException(final String message) { + super(message); + } + + public InvalidCommandException(final Throwable t) { + super(t); + } + + public InvalidCommandException(final String message, final Throwable t) { + super(message, t); + } +}