NIFI-3716 Created flow analyzer in NiFi Toolkit.

This closes #1747.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Eric 2017-04-30 21:33:12 -04:00 committed by Andy LoPresto
parent 24e2981012
commit 7314066b6a
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
8 changed files with 438 additions and 0 deletions

View File

@ -79,6 +79,10 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-toolkit-zookeeper-migrator</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-toolkit-flowanalyzer</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>

View File

@ -0,0 +1,39 @@
@echo off
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
rem Use JAVA_HOME if it's set; otherwise, just use java
if "%JAVA_HOME%" == "" goto noJavaHome
if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
set JAVA_EXE=%JAVA_HOME%\bin\java.exe
goto startConfig
:noJavaHome
echo The JAVA_HOME environment variable is not defined correctly.
echo Instead the PATH will be used to find the java executable.
echo.
set JAVA_EXE=java
goto startConfig
:startConfig
set LIB_DIR=%~dp0..\classpath;%~dp0..\lib
SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.toolkit.flowanalyzer.FlowAnalyzerDriver
cmd.exe /C ""%JAVA_EXE%" %JAVA_PARAMS% %* ""

View File

@ -0,0 +1,120 @@
#!/bin/sh
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches
SCRIPT_DIR=$(dirname "$0")
SCRIPT_NAME=$(basename "$0")
NIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd)
PROGNAME=$(basename "$0")
warn() {
(>&2 echo "${PROGNAME}: $*")
}
die() {
warn "$*"
exit 1
}
detectOS() {
# OS specific support (must be 'true' or 'false').
cygwin=false;
aix=false;
os400=false;
darwin=false;
case "$(uname)" in
CYGWIN*)
cygwin=true
;;
AIX*)
aix=true
;;
OS400*)
os400=true
;;
Darwin)
darwin=true
;;
esac
# For AIX, set an environment variable
if ${aix}; then
export LDR_CNTRL=MAXDATA=0xB0000000@DSA
echo ${LDR_CNTRL}
fi
}
locateJava() {
# Setup the Java Virtual Machine
if $cygwin ; then
[ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}")
[ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}")
fi
if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then
JAVA_HOME=$(java-config --jre-home)
fi
if [ "x${JAVA}" = "x" ]; then
if [ "x${JAVA_HOME}" != "x" ]; then
if [ ! -d "${JAVA_HOME}" ]; then
die "JAVA_HOME is not valid: ${JAVA_HOME}"
fi
JAVA="${JAVA_HOME}/bin/java"
else
warn "JAVA_HOME not set; results may vary"
JAVA=$(type java)
JAVA=$(expr "${JAVA}" : '.* \(/.*\)$')
if [ "x${JAVA}" = "x" ]; then
die "java command not found"
fi
fi
fi
}
init() {
# Determine if there is special OS handling we must perform
detectOS
# Locate the Java VM to execute
locateJava "$1"
}
run() {
LIBS="${NIFI_TOOLKIT_HOME}/lib/*"
sudo_cmd_prefix=""
if $cygwin; then
NIFI_TOOLKIT_HOME=$(cygpath --path --windows "${NIFI_TOOLKIT_HOME}")
CLASSPATH="$NIFI_TOOLKIT_HOME/classpath;$(cygpath --path --windows "${LIBS}")"
else
CLASSPATH="$NIFI_TOOLKIT_HOME/classpath:${LIBS}"
fi
export JAVA_HOME="$JAVA_HOME"
export NIFI_TOOLKIT_HOME="$NIFI_TOOLKIT_HOME"
umask 0077
"${JAVA}" -cp "${CLASSPATH}" -Xms12m -Xmx24m org.apache.nifi.toolkit.flowanalyzer.FlowAnalyzerDriver "$@"
return $?
}
init "$1"
run "$@"

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-toolkit-flowanalyzer</artifactId>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-toolkit</artifactId>
<version>1.3.0-SNAPSHOT</version>
</parent>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/test-data.json</exclude>
<exclude>src/test/resources/test-data-user-pass.json</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.flowanalyzer;
import java.io.FileInputStream;
import java.io.InputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.zip.GZIPInputStream;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
public class FlowAnalyzerDriver {
final static String CONST_BYTES_GB_CONV = "1000000000";
final static String CONST_BYTES_MB_CONV = "1000000";
final static String CONST_BYTES_KB_CONV = "1000";
final static int DIVIDE_SCALE = 9;
final static String CONST_XMLNODE_CONNECTION = "connection";
private static void printUsage() {
System.out.println("This application seeks to produce a report to analyze the flow.xml.gz file");
System.out.println(
"Currently the reports supported by this application are Total Storage for all queues " +
"backpressure, average storage of all queues backpressure, and min and max of all queues " +
"backpressure over the entire flow.");
System.out.println("\n\n\n");
System.out.println("Usage: flow-analyzer.sh <path to flow.xml.gz>");
}
public static void main(String[] args) throws Exception {
BigDecimal totalDataSize = new BigDecimal("0.0");
BigDecimal max = new BigDecimal("0.0");
BigDecimal min = new BigDecimal("0.0");
BigDecimal avg = new BigDecimal("0.0");
long maxQueueSize = 0L;
long minQueueSize = 0L;
long totalQueueSize = 0L;
int numberOfConnections = 0;
if (helpRequested(args)) {
printUsage();
return;
}
String input = args[0];
if (!input.contains("xml.gz"))
input = input + "/flow.xml.gz";
InputStream fileStream = new FileInputStream(input);
InputStream gzipStream = new GZIPInputStream(fileStream);
System.out.println("Using flow=" + input);
DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder documentBuilder;
try {
documentBuilder = documentBuilderFactory.newDocumentBuilder();
Document document = documentBuilder.parse(gzipStream);
NodeList connectionNode = document.getElementsByTagName(CONST_XMLNODE_CONNECTION);
for (int x = 0; x < connectionNode.getLength(); x++) {
Node nNode = connectionNode.item(x);
if (nNode.getNodeType() == Node.ELEMENT_NODE) {
Element maxWorkQueueSize = (Element) nNode;
String maxDataSize = maxWorkQueueSize.getElementsByTagName("maxWorkQueueDataSize").item(0)
.getTextContent();
BigDecimal byteValue = (convertSizeToByteValue(maxDataSize)) != null
? convertSizeToByteValue(maxDataSize) : new BigDecimal("0.0");
numberOfConnections++;
avg = avg.add(byteValue);
String dataQueueSize = maxWorkQueueSize.getElementsByTagName("maxWorkQueueSize").item(0)
.getTextContent();
Long dataQueueSizeL = new Long(dataQueueSize);
totalQueueSize = dataQueueSizeL + totalQueueSize;
if(dataQueueSizeL > maxQueueSize)
maxQueueSize = dataQueueSizeL;
if(dataQueueSizeL < minQueueSize || minQueueSize == 0)
minQueueSize = dataQueueSizeL;
if (max.compareTo(byteValue) < 0)
max = byteValue;
if (byteValue.compareTo(min) < 0 || min.compareTo(new BigDecimal("0.0")) == 0)
min = byteValue;
totalDataSize = totalDataSize.add(byteValue);
}
}
System.out.println("Total Bytes Utilized by System=" + convertBytesToGB(totalDataSize).toPlainString()
+ " GB\nMax Back Pressure Size=" + convertBytesToGB(max).toPlainString()
+ " GB\nMin Back Pressure Size=" + convertBytesToGB(min).toPlainString()
+ " GB\nAverage Back Pressure Size="
+ convertBytesToGB(avg.divide(new BigDecimal(numberOfConnections), DIVIDE_SCALE, RoundingMode.HALF_UP)) + " GB");
System.out.println("Max Flowfile Queue Size=" + maxQueueSize + "\nMin Flowfile Queue Size=" + minQueueSize
+ "\nAvg Flowfile Queue Size=" + new BigDecimal(totalQueueSize).divide(new BigDecimal(numberOfConnections), DIVIDE_SCALE, RoundingMode.HALF_UP));
gzipStream.close();
fileStream.close();
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception occurred while attempting to parse flow.xml.gz. Cause: " + e.getCause());
}
}
private static boolean helpRequested(String[] args) {
return args.length == 0 || (args.length > 0 && (args[0].equalsIgnoreCase("-h") || args[0].equalsIgnoreCase("--help")));
}
/**
*
* @param value to convert to bytes
* @return BigDecimal Byte size
*/
public static BigDecimal convertSizeToByteValue(String value) {
BigDecimal size = null;
if (value.contains("GB")) {
String numericValue = value.substring(0, value.indexOf("G") - 1);
size = new BigDecimal(numericValue).multiply(new BigDecimal(CONST_BYTES_GB_CONV));
}
if (value.contains("MB")) {
String numericValue = value.substring(0, value.indexOf("M") - 1);
size = new BigDecimal(numericValue).multiply(new BigDecimal(CONST_BYTES_MB_CONV));
}
if (value.contains("KB")) {
String numericValue = value.substring(0, value.indexOf("K") - 1);
size = new BigDecimal(numericValue).multiply(new BigDecimal(CONST_BYTES_KB_CONV));
}
return size;
}
/**
* @param bytes to convert to GB
* @return BigDecimal bytes to GB
*/
public static BigDecimal convertBytesToGB(BigDecimal bytes) {
return bytes.divide(new BigDecimal(CONST_BYTES_GB_CONV), DIVIDE_SCALE, RoundingMode.HALF_UP).stripTrailingZeros();
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit;
import static org.junit.Assert.assertEquals;
import java.math.BigDecimal;
import org.apache.nifi.toolkit.flowanalyzer.FlowAnalyzerDriver;
import org.junit.Test;
public class FlowAnalyzerDriverTest {
@Test
public void testConvertSizeToValue() {
String gbTest = "13 GB";
String kbTest = "103 KB";
String mbTest = "20 MB";
BigDecimal gbBigDecimal = FlowAnalyzerDriver.convertSizeToByteValue(gbTest);
assertEquals(gbBigDecimal.toPlainString(), "13000000000");
BigDecimal kbBigDecimal = FlowAnalyzerDriver.convertSizeToByteValue(kbTest);
assertEquals(kbBigDecimal.toPlainString(), "103000");
BigDecimal mbBigDecimal = FlowAnalyzerDriver.convertSizeToByteValue(mbTest);
assertEquals(mbBigDecimal.toPlainString(), "20000000");
}
@Test
public void convertBytesToGB() {
BigDecimal gbBigDecimal = new BigDecimal("13000000000");
BigDecimal kbBigDecimal = new BigDecimal("103000");
BigDecimal mbBigDecimal = new BigDecimal("20000000");
BigDecimal result = FlowAnalyzerDriver.convertBytesToGB(gbBigDecimal);
assertEquals("13", result.stripTrailingZeros().toPlainString());
result = FlowAnalyzerDriver.convertBytesToGB(mbBigDecimal);
assertEquals("0.02", result.stripTrailingZeros().toPlainString());
result = FlowAnalyzerDriver.convertBytesToGB(kbBigDecimal);
assertEquals("0.000103", result.stripTrailingZeros().toPlainString());
}
}

View File

@ -30,6 +30,7 @@
<module>nifi-toolkit-zookeeper-migrator</module>
<module>nifi-toolkit-flowfile-repo</module>
<module>nifi-toolkit-assembly</module>
<module>nifi-toolkit-flowanalyzer</module>
</modules>
<dependencyManagement>
<dependencies>

View File

@ -945,6 +945,11 @@
<artifactId>nifi-toolkit-admin</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-toolkit-flowanalyzer</artifactId>
<version>1.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-registry-service</artifactId>