From 7737fbd84d955520e2f6e23bbbfa8a5ebe43b3ed Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 14 Jan 2015 12:24:09 -0500 Subject: [PATCH] NIFI-262, NIFI-263: Added 'restart' and 'dump' options to nifi.sh script --- .../src/main/resources/bin/dump-nifi.bat | 33 +++++ .../resources/src/main/resources/bin/nifi.sh | 4 +- .../org/apache/nifi/BootstrapListener.java | 122 +++++++++++++++++- .../org/apache/nifi/bootstrap/RunNiFi.java | 89 +++++++++++-- 4 files changed, 234 insertions(+), 14 deletions(-) create mode 100644 nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat new file mode 100644 index 0000000000..71e5a1ae4b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/dump-nifi.bat @@ -0,0 +1,33 @@ +@echo off +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +rem Use JAVA_HOME if it's set; otherwise, just use java +IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) + +SET NIFI_ROOT=%~dp0..\ +CD /d "%NIFI_ROOT%" +SET LIB_DIR=lib\bootstrap +SET CONF_DIR=conf + +SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf +SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% + +SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi +SET BOOTSTRAP_ACTION=dump + +cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh index 163f8e24c6..fb0d22e8e7 100644 --- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh @@ -172,10 +172,10 @@ case "$1" in install) install "$@" ;; - start|stop|run|status) + start|stop|run|restart|status|dump) main "$@" ;; *) - echo "Usage nifi {start|stop|run|status|install}" + echo "Usage nifi {start|stop|run|restart|status|dump|install}" ;; esac diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java index 3393952f46..590797ce90 100644 --- a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java +++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java @@ -17,16 +17,27 @@ package org.apache.nifi; import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.lang.management.LockInfo; +import java.lang.management.ManagementFactory; +import java.lang.management.MonitorInfo; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -162,6 +173,10 @@ public class BootstrapListener { echoShutdown(socket.getOutputStream()); nifi.shutdownHook(); return; + case DUMP: + logger.info("Received DUMP request from Bootstrap"); + writeDump(socket.getOutputStream()); + break; } } catch (final Throwable t) { logger.error("Failed to process request from Bootstrap due to " + t.toString(), t); @@ -182,6 +197,110 @@ public class BootstrapListener { } + private static void writeDump(final OutputStream out) throws IOException { + final ThreadMXBean mbean = ManagementFactory.getThreadMXBean(); + final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out)); + + final ThreadInfo[] infos = mbean.dumpAllThreads(true, true); + final long[] deadlockedThreadIds = mbean.findDeadlockedThreads(); + final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads(); + + final List sortedInfos = new ArrayList<>(infos.length); + for ( final ThreadInfo info : infos ) { + sortedInfos.add(info); + } + Collections.sort(sortedInfos, new Comparator() { + @Override + public int compare(ThreadInfo o1, ThreadInfo o2) { + return o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase()); + } + }); + + final StringBuilder sb = new StringBuilder(); + for ( final ThreadInfo info : sortedInfos ) { + sb.append("\n"); + sb.append("\"").append(info.getThreadName()).append("\" Id="); + sb.append(info.getThreadId()).append(" "); + sb.append(info.getThreadState().toString()).append(" "); + + switch (info.getThreadState()) { + case BLOCKED: + case TIMED_WAITING: + case WAITING: + sb.append(" on "); + sb.append(info.getLockInfo()); + break; + default: + break; + } + + if (info.isSuspended()) { + sb.append(" (suspended)"); + } + if ( info.isInNative() ) { + sb.append(" (in native code)"); + } + + if ( deadlockedThreadIds != null && deadlockedThreadIds.length > 0 ) { + for ( final long id : deadlockedThreadIds ) { + if ( id == info.getThreadId() ) { + sb.append(" ** DEADLOCKED THREAD **"); + } + } + } + + if ( monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0 ) { + for ( final long id : monitorDeadlockThreadIds ) { + if ( id == info.getThreadId() ) { + sb.append(" ** MONITOR-DEADLOCKED THREAD **"); + } + } + } + + final StackTraceElement[] stackTraces = info.getStackTrace(); + for ( final StackTraceElement element : stackTraces ) { + sb.append("\n\tat ").append(element); + + final MonitorInfo[] monitors = info.getLockedMonitors(); + for ( final MonitorInfo monitor : monitors ) { + if ( monitor.getLockedStackFrame().equals(element) ) { + sb.append("\n\t- waiting on ").append(monitor); + } + } + } + + final LockInfo[] lockInfos = info.getLockedSynchronizers(); + if ( lockInfos.length > 0 ) { + sb.append("\n\t"); + sb.append("Number of Locked Synchronizers: ").append(lockInfos.length); + for ( final LockInfo lockInfo : lockInfos ) { + sb.append("\n\t- ").append(lockInfo.toString()); + } + } + + sb.append("\n"); + } + + if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) { + sb.append("\n\nDEADLOCK DETECTED!"); + sb.append("\nThe following thread IDs are deadlocked:"); + for ( final long id : deadlockedThreadIds ) { + sb.append("\n").append(id); + } + } + + if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) { + sb.append("\n\nMONITOR DEADLOCK DETECTED!"); + sb.append("\nThe following thread IDs are deadlocked:"); + for ( final long id : monitorDeadlockThreadIds ) { + sb.append("\n").append(id); + } + } + + writer.write(sb.toString()); + writer.flush(); + } + private void echoPing(final OutputStream out) throws IOException { out.write("PING\n".getBytes(StandardCharsets.UTF_8)); out.flush(); @@ -205,7 +324,7 @@ public class BootstrapListener { final String line = reader.readLine(); final String[] splits = line.split(" "); - if ( splits.length < 0 ) { + if ( splits.length < 1 ) { throw new IOException("Received invalid request from Bootstrap: " + line); } @@ -235,6 +354,7 @@ public class BootstrapListener { private static class BootstrapRequest { public static enum RequestType { SHUTDOWN, + DUMP, PING; } diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index e8f6439658..f920860443 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.attribute.PosixFilePermission; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -75,6 +76,7 @@ public class RunNiFi { public static final String SHUTDOWN_CMD = "SHUTDOWN"; public static final String PING_CMD = "PING"; + public static final String DUMP_CMD = "DUMP"; private volatile boolean autoRestartNiFi = true; private volatile int ccPort = -1; @@ -105,41 +107,52 @@ public class RunNiFi { private static void printUsage() { System.out.println("Usage:"); System.out.println(); - System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] "); + System.out.println("java org.apache.nifi.bootstrap.RunNiFi [<-verbose>] [options]"); System.out.println(); System.out.println("Valid commands include:"); System.out.println(""); System.out.println("Start : Start a new instance of Apache NiFi"); System.out.println("Stop : Stop a running instance of Apache NiFi"); + System.out.println("Restart : Stop Apache NiFi, if it is running, and then start a new instance"); System.out.println("Status : Determine if there is a running instance of Apache NiFi"); + System.out.println("Dump : Write a Thread Dump to the file specified by [options], or to the log if no file is given"); System.out.println("Run : Start a new instance of Apache NiFi and monitor the Process, restarting if the instance dies"); System.out.println(); } + private static String[] shift(final String[] orig) { + return Arrays.copyOfRange(orig, 1, orig.length); + } - public static void main(final String[] args) throws IOException, InterruptedException { - if ( args.length < 1 || args.length > 2 ) { + public static void main(String[] args) throws IOException, InterruptedException { + if ( args.length < 1 || args.length > 3 ) { printUsage(); return; } + File dumpFile = null; boolean verbose = false; - if ( args.length == 2 ) { - if ( args[0].equals("-verbose") ) { - verbose = true; - } else { - printUsage(); - return; - } + if ( args[0].equals("-verbose") ) { + verbose = true; + args = shift(args); } - final String cmd = args.length == 1 ? args[0] : args[1]; + final String cmd = args[0]; + if (cmd.equals("dump") ) { + if ( args.length > 1 ) { + dumpFile = new File(args[1]); + } else { + dumpFile = null; + } + } switch (cmd.toLowerCase()) { case "start": case "run": case "stop": case "status": + case "dump": + case "restart": break; default: printUsage(); @@ -178,6 +191,13 @@ public class RunNiFi { case "status": runNiFi.status(); break; + case "restart": + runNiFi.stop(); + runNiFi.start(false); + break; + case "dump": + runNiFi.dump(dumpFile); + break; } } @@ -391,6 +411,53 @@ public class RunNiFi { } + /** + * Writes a NiFi thread dump to the given file; if file is null, logs at INFO level instead. + * @param dumpFile + * @return + * @throws IOException + */ + public void dump(final File dumpFile) throws IOException { + final Integer port = getCurrentPort(); + if ( port == null ) { + System.out.println("Apache NiFi is not currently running"); + } + + final Properties nifiProps = loadProperties(); + final String secretKey = nifiProps.getProperty("secret.key"); + + final StringBuilder sb = new StringBuilder(); + try (final Socket socket = new Socket()) { + logger.fine("Connecting to NiFi instance"); + socket.setSoTimeout(60000); + socket.connect(new InetSocketAddress("localhost", port)); + logger.fine("Established connection to NiFi instance."); + socket.setSoTimeout(60000); + + logger.fine("Sending DUMP Command to port " + port); + final OutputStream out = socket.getOutputStream(); + out.write((DUMP_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + final InputStream in = socket.getInputStream(); + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + String line; + while ((line = reader.readLine()) != null ) { + sb.append(line).append("\n"); + } + } + + final String dump = sb.toString(); + if ( dumpFile == null ) { + logger.info(dump); + } else { + try (final FileOutputStream fos = new FileOutputStream(dumpFile)) { + fos.write(dump.getBytes(StandardCharsets.UTF_8)); + } + logger.info("Successfully wrote thread dump to " + dumpFile.getAbsolutePath()); + } + } + public void stop() throws IOException { final Integer port = getCurrentPort(); if ( port == null ) {