NIFI-145: Bug Fixes and updated nifi.sh to use bootstrap code

This commit is contained in:
Mark Payne 2014-12-10 11:03:21 -05:00
parent 6d46829795
commit 192d782277
12 changed files with 130 additions and 211 deletions

View File

@ -1045,6 +1045,21 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
if (flowFileSwapManager != null) {
flowFileSwapManager.shutdown();
}
if ( processScheduler != null ) {
processScheduler.shutdown();
}
if ( provenanceEventRepository != null ) {
try {
provenanceEventRepository.close();
} catch (final IOException ioe) {
LOG.warn("There was a problem shutting down the Provenance Repository: " + ioe.toString());
if ( LOG.isDebugEnabled() ) {
LOG.warn("", ioe);
}
}
}
} finally {
writeLock.unlock();
}

View File

@ -92,7 +92,7 @@ public class FileSystemRepository implements ContentRepository {
private final List<String> containerNames;
private final AtomicLong index;
private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers");
private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true);
private final ConcurrentMap<String, BlockingQueue<ContentClaim>> reclaimable = new ConcurrentHashMap<>();
private final Map<String, ContainerState> containerStateMap = new HashMap<>();
@ -209,7 +209,7 @@ public class FileSystemRepository implements ContentRepository {
}
}
containerCleanupExecutor = new FlowEngine(containers.size(), "Cleanup FileSystemRepository Container");
containerCleanupExecutor = new FlowEngine(containers.size(), "Cleanup FileSystemRepository Container", true);
for (final Map.Entry<String, Path> containerEntry : containers.entrySet()) {
final String containerName = containerEntry.getKey();
final Path containerPath = containerEntry.getValue();

View File

@ -90,6 +90,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
private final long checkpointDelayMillis;
private final Path flowFileRepositoryPath;
private final int numPartitions;
private final ScheduledExecutorService checkpointExecutor;
// effectively final
private WriteAheadRepository<RepositoryRecord> wal;
@ -128,6 +129,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
flowFileRepositoryPath = properties.getFlowFileRepositoryPath();
numPartitions = properties.getFlowFileRepositoryPartitions();
checkpointDelayMillis = FormatUtils.getTimeDuration(properties.getFlowFileRepositoryCheckpointInterval(), TimeUnit.MILLISECONDS);
checkpointExecutor = Executors.newSingleThreadScheduledExecutor();
}
@Override
@ -150,6 +153,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
checkpointFuture.cancel(false);
}
checkpointExecutor.shutdown();
wal.shutdown();
}
@ -363,8 +367,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
};
final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
checkpointFuture = executorService.scheduleWithFixedDelay(checkpointRunnable, checkpointDelayMillis, checkpointDelayMillis, TimeUnit.MILLISECONDS);
checkpointFuture = checkpointExecutor.scheduleWithFixedDelay(checkpointRunnable, checkpointDelayMillis, checkpointDelayMillis, TimeUnit.MILLISECONDS);
return maxId;
}

View File

@ -138,6 +138,9 @@ public final class StandardProcessScheduler implements ProcessScheduler {
LOG.error("", t);
}
}
frameworkTaskExecutor.shutdown();
componentLifeCycleThreadPool.shutdown();
}
public void schedule(final ReportingTaskNode taskNode) {

View File

@ -42,6 +42,18 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
* @param threadNamePrefix
*/
public FlowEngine(int corePoolSize, final String threadNamePrefix) {
this(corePoolSize, threadNamePrefix, false);
}
/**
* Creates a new instance of FlowEngine
*
* @param corePoolSize the maximum number of threads available to tasks
* running in the engine.
* @param threadNamePrefix
* @param deamon if true, the thread pool will be populated with daemon threads, otherwise the threads will not be marked as daemon.
*/
public FlowEngine(int corePoolSize, final String threadNamePrefix, final boolean daemon) {
super(corePoolSize);
final AtomicInteger threadIndex = new AtomicInteger(0);
@ -50,6 +62,9 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
@Override
public Thread newThread(final Runnable r) {
final Thread t = defaultThreadFactory.newThread(r);
if ( daemon ) {
t.setDaemon(true);
}
t.setName(threadNamePrefix + " Thread-" + threadIndex.incrementAndGet());
return t;
}

View File

@ -1,3 +1,20 @@
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java
@ -7,7 +24,7 @@ SET LIB_DIR=%~dp0..\lib
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.boostrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=status

View File

@ -21,34 +21,6 @@
DIRNAME=`dirname "$0"`
PROGNAME=`basename "$0"`
#
# Sourcing environment settings for NIFI similar to tomcats setenv
#
NIFI_SCRIPT="nifi.sh"
export NIFI_SCRIPT
if [ -f "$DIRNAME/setenv.sh" ]; then
. "$DIRNAME/setenv.sh"
fi
#
# Check/Set up some easily accessible MIN/MAX params for JVM mem usage
#
if [ "x$JAVA_MIN_MEM" = "x" ]; then
JAVA_MIN_MEM=512M
export JAVA_MIN_MEM
fi
if [ "x$JAVA_MAX_MEM" = "x" ]; then
JAVA_MAX_MEM=512M
export JAVA_MAX_MEM
fi
if [ "x$JAVA_PERMSIZE" = "x" ]; then
JAVA_PERMSIZE=128M
export JAVA_PERMSIZE
fi
if [ "x$JAVA_MAX_PERMSIZE" = "x" ]; then
JAVA_MAX_PERMSIZE=128M
export JAVA_MAX_PERMSIZE
fi
warn() {
@ -128,58 +100,6 @@ locateHome() {
}
locateBase() {
if [ "x$NIFI_BASE" != "x" ]; then
if [ ! -d "$NIFI_BASE" ]; then
die "NIFI_BASE is not valid: $NIFI_BASE"
fi
else
NIFI_BASE=$NIFI_HOME
fi
}
locateConf() {
if [ "x$NIFI_CONF" != "x" ]; then
if [ ! -d "$NIFI_CONF" ]; then
die "NIFI_CONF is not valid: $NIFI_CONF"
fi
else
NIFI_CONF=$NIFI_BASE/conf
fi
}
setupNativePath() {
# Support for loading native libraries
LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:$NIFI_BASE/lib:$NIFI_HOME/lib"
# For Cygwin, set PATH from LD_LIBRARY_PATH
if $cygwin; then
LD_LIBRARY_PATH=`cygpath --path --windows "$LD_LIBRARY_PATH"`
PATH="$PATH;$LD_LIBRARY_PATH"
export PATH
fi
export LD_LIBRARY_PATH
}
pathCanonical() {
local dst="${1}"
while [ -h "${dst}" ] ; do
ls=`ls -ld "${dst}"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
dst="$link"
else
dst="`dirname "${dst}"`/$link"
fi
done
local bas=`basename "${dst}"`
local dir=`dirname "${dst}"`
if [ "$bas" != "$dir" ]; then
dst="`pathCanonical "$dir"`/$bas"
fi
echo "${dst}" | sed -e 's#//#/#g' -e 's#/./#/#g' -e 's#/[^/]*/../#/#g'
}
locateJava() {
# Setup the Java Virtual Machine
@ -211,82 +131,6 @@ locateJava() {
fi
}
detectJVM() {
#echo "`$JAVA -version`"
# This service should call `java -version`,
# read stdout, and look for hints
if $JAVA -version 2>&1 | grep "^IBM" ; then
JVM_VENDOR="IBM"
# on OS/400, java -version does not contain IBM explicitly
elif $os400; then
JVM_VENDOR="IBM"
else
JVM_VENDOR="SUN"
fi
# echo "JVM vendor is $JVM_VENDOR"
}
setupDebugOptions() {
if [ "x$JAVA_OPTS" = "x" ]; then
JAVA_OPTS="$DEFAULT_JAVA_OPTS"
fi
export JAVA_OPTS
if [ "x$EXTRA_JAVA_OPTS" != "x" ]; then
JAVA_OPTS="$JAVA_OPTS $EXTRA_JAVA_OPTS"
fi
# Set Debug options if enabled
if [ "x$NIFI_DEBUG" != "x" ]; then
# Use the defaults if JAVA_DEBUG_OPTS was not set
if [ "x$JAVA_DEBUG_OPTS" = "x" ]; then
JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
fi
JAVA_OPTS="$JAVA_DEBUG_OPTS $JAVA_OPTS"
warn "Enabling Java debug options: $JAVA_DEBUG_OPTS"
fi
}
setupDefaults() {
DEFAULT_JAVA_OPTS="-Xms$JAVA_MIN_MEM -Xmx$JAVA_MAX_MEM -XX:PermSize=$JAVA_PERMSIZE -XX:MaxPermSize=$JAVA_MAX_PERMSIZE"
#Set the JVM_VENDOR specific JVM flags
if [ "$JVM_VENDOR" = "SUN" ]; then
#
# Check some easily accessible MIN/MAX params for JVM mem usage
#
if [ "x$JAVA_PERM_MEM" != "x" ]; then
DEFAULT_JAVA_OPTS="$DEFAULT_JAVA_OPTS -XX:PermSize=$JAVA_PERM_MEM"
fi
if [ "x$JAVA_MAX_PERM_MEM" != "x" ]; then
DEFAULT_JAVA_OPTS="$DEFAULT_JAVA_OPTS -XX:MaxPermSize=$JAVA_MAX_PERM_MEM"
fi
DEFAULT_JAVA_OPTS="-server $DEFAULT_JAVA_OPTS -Dcom.sun.management.jmxremote"
elif [ "$JVM_VENDOR" = "IBM" ]; then
if $os400; then
DEFAULT_JAVA_OPTS="$DEFAULT_JAVA_OPTS"
elif $aix; then
DEFAULT_JAVA_OPTS="-Xverify:none -Xdump:heap -Xlp $DEFAULT_JAVA_OPTS"
else
DEFAULT_JAVA_OPTS="-Xverify:none $DEFAULT_JAVA_OPTS"
fi
fi
DEFAULT_JAVA_OPTS="$DEFAULT_JAVA_OPTS -Djava.net.preferIPv4Stack=true -Dsun.net.http.allowRestrictedHeaders=true -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dorg.apache.jasper.compiler.disablejsr199=true -XX:ReservedCodeCacheSize=128m -XX:+UseCodeCacheFlushing"
# Setup classpath
CLASSPATH="$NIFI_HOME"/conf
for f in "$NIFI_HOME"/lib/*
do
CLASSPATH="${CLASSPATH}":"${f}"
done
DEFAULT_JAVA_DEBUG_OPTS="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"
}
init() {
# Determine if there is special OS handling we must perform
detectOS
@ -297,49 +141,28 @@ init() {
# Locate the NiFi home directory
locateHome
# Locate the NiFi base directory
locateBase
# Locate the NiFi conf directory
locateConf
# Setup the native library path
setupNativePath
# Locate the Java VM to execute
locateJava
# Determine the JVM vendor
detectJVM
# Setup default options
setupDefaults
# Install debug options
setupDebugOptions
}
run() {
BOOTSTRAP_CONF="$NIFI_HOME/conf/bootstrap.conf";
if $cygwin; then
NIFI_HOME=`cygpath --path --windows "$NIFI_HOME"`
NIFI_BASE=`cygpath --path --windows "$NIFI_BASE"`
NIFI_CONF=`cygpath --path --windows "$NIFI_CONF"`
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
BOOTSTRAP_CONF=`cygpath --path --windows "$BOOTSTRAP_CONF"`
fi
# export CLASSPATH to the java process. Could also pass in via -cp
export CLASSPATH
echo
echo "Classpath: $CLASSPATH"
echo
echo "Java home: $JAVA_HOME"
echo "NiFi home: $NIFI_HOME"
echo "Java Options: $JAVA_OPTS"
echo
echo "Launching NiFi. See logs..."
exec "$JAVA" -Dapp=nifi $JAVA_OPTS -Dnifi.properties.file.path="$NIFI_HOME"/conf/nifi.properties org.apache.nifi.NiFi
echo "Bootstrap Config File: $BOOTSTRAP_CONF"
echo
exec "$JAVA" -cp "$NIFI_HOME"/lib/nifi-bootstrap*.jar -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1
}
main() {

View File

@ -1,3 +1,20 @@
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java
@ -7,7 +24,7 @@ SET LIB_DIR=%~dp0..\lib
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.boostrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=run

View File

@ -1,3 +1,20 @@
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java
@ -7,7 +24,7 @@ SET LIB_DIR=%~dp0..\lib
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.boostrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=start

View File

@ -1,3 +1,20 @@
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java
@ -7,7 +24,7 @@ SET LIB_DIR=%~dp0..\lib
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.boostrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=stop

View File

@ -54,7 +54,7 @@ import java.util.concurrent.locks.ReentrantLock;
* If the {@code bootstrap.conf} file cannot be found, throws a {@code FileNotFoundException].
*/
public class RunNiFi {
public static final String DEFAULT_CONFIG_FILE = "./conf/boostrap.conf";
public static final String DEFAULT_CONFIG_FILE = "./conf/bootstrap.conf";
public static final String DEFAULT_NIFI_PROPS_FILE = "./conf/nifi.properties";
public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
@ -111,7 +111,7 @@ public class RunNiFi {
return;
}
String configFilename = System.getProperty("org.apache.nifi.boostrap.config.file");
String configFilename = System.getProperty("org.apache.nifi.bootstrap.config.file");
if ( configFilename == null ) {
final String nifiHome = System.getenv("NIFI_HOME");
@ -233,11 +233,11 @@ public class RunNiFi {
}
private boolean isAlive(final Process process) {
public static boolean isAlive(final Process process) {
try {
process.exitValue();
return false;
} catch (final IllegalThreadStateException itse) {
} catch (final IllegalStateException | IllegalThreadStateException itse) {
return true;
}
}
@ -253,7 +253,7 @@ public class RunNiFi {
final ProcessBuilder builder = new ProcessBuilder();
if ( !bootstrapConfigFile.exists() ) {
throw new FileNotFoundException(DEFAULT_CONFIG_FILE);
throw new FileNotFoundException(bootstrapConfigFile.getAbsolutePath());
}
final Properties properties = new Properties();
@ -351,6 +351,7 @@ public class RunNiFi {
cmd.addAll(javaAdditionalArgs);
cmd.add("-Dnifi.properties.file.path=" + nifiPropsFilename);
cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
cmd.add("-Dapp=NiFi");
cmd.add("org.apache.nifi.NiFi");
builder.command(cmd);
@ -374,11 +375,11 @@ public class RunNiFi {
try {
gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
} catch (final NumberFormatException nfe) {
throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Boostrap Config File " + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
}
if ( gracefulShutdownSeconds < 0 ) {
throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Boostrap Config File " + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
throw new NumberFormatException("The '" + GRACEFUL_SHUTDOWN_PROP + "' property in Bootstrap Config File " + bootstrapConfigAbsoluteFile.getAbsolutePath() + " has an invalid value. Must be a non-negative integer");
}
Process process = builder.start();

View File

@ -55,11 +55,11 @@ public class ShutdownHook extends Thread {
System.out.println("Waiting for Apache NiFi to finish shutting down...");
final long startWait = System.nanoTime();
while ( isAlive(nifiProcess) ) {
while ( RunNiFi.isAlive(nifiProcess) ) {
final long waitNanos = System.nanoTime() - startWait;
final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
if ( waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0 ) {
if ( isAlive(nifiProcess) ) {
if ( RunNiFi.isAlive(nifiProcess) ) {
System.out.println("NiFi has not finished shutting down after " + gracefulShutdownSeconds + " seconds. Killing process.");
nifiProcess.destroy();
}
@ -76,13 +76,4 @@ public class ShutdownHook extends Thread {
System.err.println("Failed to delete status file " + statusFile.getAbsolutePath() + "; this file should be cleaned up manually");
}
}
private boolean isAlive(final Process process) {
try {
process.exitValue();
return false;
} catch (final IllegalThreadStateException itse) {
return true;
}
}
}