NIFI-262, NIFI-263: Added 'restart' and 'dump' options to nifi.sh script

This commit is contained in:
Mark Payne 2015-01-14 12:24:09 -05:00
parent c62aba1336
commit 7737fbd84d
4 changed files with 234 additions and 14 deletions

View File

@ -0,0 +1,33 @@
@echo off
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
SET NIFI_ROOT=%~dp0..\
CD /d "%NIFI_ROOT%"
SET LIB_DIR=lib\bootstrap
SET CONF_DIR=conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=dump
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%

View File

@ -172,10 +172,10 @@ case "$1" in
install)
install "$@"
;;
start|stop|run|status)
start|stop|run|restart|status|dump)
main "$@"
;;
*)
echo "Usage nifi {start|stop|run|status|install}"
echo "Usage nifi {start|stop|run|restart|status|dump|install}"
;;
esac

View File

@ -17,16 +17,27 @@
package org.apache.nifi;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.lang.management.LockInfo;
import java.lang.management.ManagementFactory;
import java.lang.management.MonitorInfo;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -162,6 +173,10 @@ public class BootstrapListener {
echoShutdown(socket.getOutputStream());
nifi.shutdownHook();
return;
case DUMP:
logger.info("Received DUMP request from Bootstrap");
writeDump(socket.getOutputStream());
break;
}
} catch (final Throwable t) {
logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
@ -182,6 +197,110 @@ public class BootstrapListener {
}
private static void writeDump(final OutputStream out) throws IOException {
final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
final List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length);
for ( final ThreadInfo info : infos ) {
sortedInfos.add(info);
}
Collections.sort(sortedInfos, new Comparator<ThreadInfo>() {
@Override
public int compare(ThreadInfo o1, ThreadInfo o2) {
return o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase());
}
});
final StringBuilder sb = new StringBuilder();
for ( final ThreadInfo info : sortedInfos ) {
sb.append("\n");
sb.append("\"").append(info.getThreadName()).append("\" Id=");
sb.append(info.getThreadId()).append(" ");
sb.append(info.getThreadState().toString()).append(" ");
switch (info.getThreadState()) {
case BLOCKED:
case TIMED_WAITING:
case WAITING:
sb.append(" on ");
sb.append(info.getLockInfo());
break;
default:
break;
}
if (info.isSuspended()) {
sb.append(" (suspended)");
}
if ( info.isInNative() ) {
sb.append(" (in native code)");
}
if ( deadlockedThreadIds != null && deadlockedThreadIds.length > 0 ) {
for ( final long id : deadlockedThreadIds ) {
if ( id == info.getThreadId() ) {
sb.append(" ** DEADLOCKED THREAD **");
}
}
}
if ( monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0 ) {
for ( final long id : monitorDeadlockThreadIds ) {
if ( id == info.getThreadId() ) {
sb.append(" ** MONITOR-DEADLOCKED THREAD **");
}
}
}
final StackTraceElement[] stackTraces = info.getStackTrace();
for ( final StackTraceElement element : stackTraces ) {
sb.append("\n\tat ").append(element);
final MonitorInfo[] monitors = info.getLockedMonitors();
for ( final MonitorInfo monitor : monitors ) {
if ( monitor.getLockedStackFrame().equals(element) ) {
sb.append("\n\t- waiting on ").append(monitor);
}
}
}
final LockInfo[] lockInfos = info.getLockedSynchronizers();
if ( lockInfos.length > 0 ) {
sb.append("\n\t");
sb.append("Number of Locked Synchronizers: ").append(lockInfos.length);
for ( final LockInfo lockInfo : lockInfos ) {
sb.append("\n\t- ").append(lockInfo.toString());
}
}
sb.append("\n");
}
if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
sb.append("\n\nDEADLOCK DETECTED!");
sb.append("\nThe following thread IDs are deadlocked:");
for ( final long id : deadlockedThreadIds ) {
sb.append("\n").append(id);
}
}
if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
sb.append("\n\nMONITOR DEADLOCK DETECTED!");
sb.append("\nThe following thread IDs are deadlocked:");
for ( final long id : monitorDeadlockThreadIds ) {
sb.append("\n").append(id);
}
}
writer.write(sb.toString());
writer.flush();
}
private void echoPing(final OutputStream out) throws IOException {
out.write("PING\n".getBytes(StandardCharsets.UTF_8));
out.flush();
@ -205,7 +324,7 @@ public class BootstrapListener {
final String line = reader.readLine();
final String[] splits = line.split(" ");
if ( splits.length < 0 ) {
if ( splits.length < 1 ) {
throw new IOException("Received invalid request from Bootstrap: " + line);
}
@ -235,6 +354,7 @@ public class BootstrapListener {
private static class BootstrapRequest {
public static enum RequestType {
SHUTDOWN,
DUMP,
PING;
}

View File

@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermission;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -75,6 +76,7 @@ public class RunNiFi {
public static final String SHUTDOWN_CMD = "SHUTDOWN";
public static final String PING_CMD = "PING";
public static final String DUMP_CMD = "DUMP";
private volatile boolean autoRestartNiFi = true;
private volatile int ccPort = -1;
@ -105,41 +107,52 @@ public class RunNiFi {
private static void printUsage() {
System.out.println("Usage:");
System.out.println();
System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] <command>");
System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] <command> [options]");
System.out.println();
System.out.println("Valid commands include:");
System.out.println("");
System.out.println("Start : Start a new instance of Apache NiFi");
System.out.println("Stop : Stop a running instance of Apache NiFi");
System.out.println("Restart : Stop Apache NiFi, if it is running, and then start a new instance");
System.out.println("Status : Determine if there is a running instance of Apache NiFi");
System.out.println("Dump : Write a Thread Dump to the file specified by [options], or to the log if no file is given");
System.out.println("Run : Start a new instance of Apache NiFi and monitor the Process, restarting if the instance dies");
System.out.println();
}
private static String[] shift(final String[] orig) {
return Arrays.copyOfRange(orig, 1, orig.length);
}
public static void main(final String[] args) throws IOException, InterruptedException {
if ( args.length < 1 || args.length > 2 ) {
public static void main(String[] args) throws IOException, InterruptedException {
if ( args.length < 1 || args.length > 3 ) {
printUsage();
return;
}
File dumpFile = null;
boolean verbose = false;
if ( args.length == 2 ) {
if ( args[0].equals("-verbose") ) {
verbose = true;
} else {
printUsage();
return;
}
if ( args[0].equals("-verbose") ) {
verbose = true;
args = shift(args);
}
final String cmd = args.length == 1 ? args[0] : args[1];
final String cmd = args[0];
if (cmd.equals("dump") ) {
if ( args.length > 1 ) {
dumpFile = new File(args[1]);
} else {
dumpFile = null;
}
}
switch (cmd.toLowerCase()) {
case "start":
case "run":
case "stop":
case "status":
case "dump":
case "restart":
break;
default:
printUsage();
@ -178,6 +191,13 @@ public class RunNiFi {
case "status":
runNiFi.status();
break;
case "restart":
runNiFi.stop();
runNiFi.start(false);
break;
case "dump":
runNiFi.dump(dumpFile);
break;
}
}
@ -391,6 +411,53 @@ public class RunNiFi {
}
/**
* Writes a NiFi thread dump to the given file; if file is null, logs at INFO level instead.
* @param dumpFile
* @return
* @throws IOException
*/
public void dump(final File dumpFile) throws IOException {
final Integer port = getCurrentPort();
if ( port == null ) {
System.out.println("Apache NiFi is not currently running");
}
final Properties nifiProps = loadProperties();
final String secretKey = nifiProps.getProperty("secret.key");
final StringBuilder sb = new StringBuilder();
try (final Socket socket = new Socket()) {
logger.fine("Connecting to NiFi instance");
socket.setSoTimeout(60000);
socket.connect(new InetSocketAddress("localhost", port));
logger.fine("Established connection to NiFi instance.");
socket.setSoTimeout(60000);
logger.fine("Sending DUMP Command to port " + port);
final OutputStream out = socket.getOutputStream();
out.write((DUMP_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
out.flush();
final InputStream in = socket.getInputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line;
while ((line = reader.readLine()) != null ) {
sb.append(line).append("\n");
}
}
final String dump = sb.toString();
if ( dumpFile == null ) {
logger.info(dump);
} else {
try (final FileOutputStream fos = new FileOutputStream(dumpFile)) {
fos.write(dump.getBytes(StandardCharsets.UTF_8));
}
logger.info("Successfully wrote thread dump to " + dumpFile.getAbsolutePath());
}
}
public void stop() throws IOException {
final Integer port = getCurrentPort();
if ( port == null ) {