NIFI-5318 Implement NiFi test harness: initial commit

NIFI-5318 Implement NiFi test harness: replaced original sample feed payload with synthetic content
NIFI-5318 Implement NiFi test harness: fixed test harness run crash issue; better reporting of paths
NIFI-5318 Implement NiFi test harness: added further states where NiFi version can be queried
NIFI-5318 Implement NiFi test harness: fixed incorrect class reference
NIFI-5318 Implement NiFi test harness: added type parameter bounding to setClassOfSingleProcessor to prevent configuring obviously incorrect classes
NIFI-5318 Updated project version.

This closes #3165

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Peter G. Horvath 2018-11-10 22:22:01 +01:00 committed by Mike Thomsen
parent 45ebeba846
commit 43235724e2
21 changed files with 2252 additions and 0 deletions

1
nifi-testharness/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
nifi_testharness_nifi_home/*

View File

@ -0,0 +1,3 @@
This directory is used to mimic NiFi's own home directory: the JVM hosting the
TestNiFiInstance has to be started here. Once started, TestNiFiInstance then
creates symlinks to the actual NiFi installation directory.

233
nifi-testharness/pom.xml Normal file
View File

@ -0,0 +1,233 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>1.10.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-testharness</artifactId>
<description>A test harness for running NiFi flow tests</description>
<packaging>jar</packaging>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>nifi_testharness_nifi_home/NIFI_TESTHARNESS_README.txt</exclude>
<exclude>src/test/resources/sample_technology_rss.xml</exclude>
<exclude>src/test/resources/logback-test.xml</exclude>
<exclude>src/test/resources/flow.xml</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.20.1</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<workingDirectory>nifi_testharness_nifi_home</workingDirectory>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>skip-testharness-tests</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<java.awt.headless>true</java.awt.headless>
</systemPropertyVariables>
<excludes>
<exclude>**/samples/*Test.class</exclude>
<exclude>**/samples/Test*.class</exclude>
<exclude>**/samples/*Spec.class</exclude>
</excludes>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<argLine combine.children="append">-Xmx1G
-Djava.net.preferIPv4Stack=true
${maven.surefire.arguments}
</argLine>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>run-testharness-tests</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<reuseForks>false</reuseForks>
<workingDirectory>${project.basedir}/nifi_testharness_nifi_home</workingDirectory>
<systemPropertyVariables>
<java.awt.headless>true</java.awt.headless>
</systemPropertyVariables>
<includes>
<include>**/*Test.class</include>
<include>**/Test*.class</include>
<include>**/*Spec.class</include>
</includes>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<argLine combine.children="append">-Xmx1G
-Djava.net.preferIPv4Stack=true
${maven.surefire.arguments}
</argLine>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<org.slf4j.version>1.7.25</org.slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-runtime</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-assembly</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-bootstrap</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymockclassextension</artifactId>
<version>3.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-easymock</artifactId>
<version>1.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>1.7.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.testharness.api.FlowFileEditorCallback;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathFactory;
import java.util.LinkedList;
import java.util.Objects;
/**
* <p>
* A facility to describe simple, common changes to a NiFi flow before it is installed to the test
* NiFi instance. Intended to be used by
* {@link TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)}
* </p>
*
* <p>
* The desired edits can be configured via the {@link Builder} object returned by the {@link #builder()}
* method. Once fully configured, the {@link Builder#build()} emits a {@code FlowFileEditorCallback}
* object that can be passed to
* {@link TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)}.
* </p>
*
* <p>
* <strong>CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!</strong>
* Efforts will be made to retain backwards API compatibility, but
* no guarantee is given.
* </p>
*
* @see TestNiFiInstance.Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)
*
*/
public final class SimpleNiFiFlowDefinitionEditor implements FlowFileEditorCallback, TestNiFiInstanceAware {
private final LinkedList<FlowFileEditorCallback> delegateActions;
private TestNiFiInstance testNiFiInstance;
private SimpleNiFiFlowDefinitionEditor(LinkedList<FlowFileEditorCallback> delegateActions) {
this.delegateActions = delegateActions;
}
@Override
public Document edit(Document document) throws Exception {
for (FlowFileEditorCallback change : delegateActions) {
if (change instanceof TestNiFiInstanceAware) {
((TestNiFiInstanceAware)change).setTestNiFiInstance(testNiFiInstance);
}
document = change.edit(document);
}
return document;
}
@Override
public void setTestNiFiInstance(TestNiFiInstance testNiFiInstance) {
this.testNiFiInstance = Objects.requireNonNull(
testNiFiInstance, "argument testNiFiInstance cannot be null");
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private Builder() {
// no external instance
}
private XPath xpath = XPathFactory.newInstance().newXPath();
private final LinkedList<FlowFileEditorCallback> actions = new LinkedList<>();
public Builder rawXmlChange(FlowFileEditorCallback flowFileEditorCallback) {
actions.addLast(flowFileEditorCallback);
return this;
}
public Builder setSingleProcessorProperty(String processorName, String propertyName, String newValue) {
return rawXmlChange(document -> {
String xpathString = "//processor[name/text() = '" + processorName
+ "']/property[name/text() = '" + propertyName + "']/value";
Node propertyValueNode = (Node) xpath.evaluate(xpathString, document, XPathConstants.NODE);
if (propertyValueNode == null) {
throw new IllegalArgumentException("Reference to processor '"+ processorName +"' with property '"
+ propertyName + "' not found: " + xpathString);
}
propertyValueNode.setTextContent(newValue);
return document;
});
}
public <P extends Processor> Builder setClassOfSingleProcessor(String processorName, Class<P> mockProcessor) {
return setClassOfSingleProcessor(processorName, mockProcessor.getName());
}
public Builder setClassOfSingleProcessor(String processorName, String newFullyQualifiedClassName) {
return rawXmlChange(document -> {
String xpathString = "//processor[name/text() = '" + processorName + "']/class";
Node classNameNode = (Node) xpath.evaluate(xpathString, document, XPathConstants.NODE);
if (classNameNode == null) {
throw new IllegalArgumentException("Reference to processor '"+ processorName +" not found: " +
xpathString);
}
classNameNode.setTextContent(newFullyQualifiedClassName);
return document;
});
}
public Builder updateFlowFileBuiltInNiFiProcessorVersionsToNiFiVersion() {
return rawXmlChange(new UpdateFlowFileNiFiVersionFlowFileEditorCallback());
}
public SimpleNiFiFlowDefinitionEditor build() {
return new SimpleNiFiFlowDefinitionEditor(actions);
}
}
private static final class UpdateFlowFileNiFiVersionFlowFileEditorCallback
implements FlowFileEditorCallback, TestNiFiInstanceAware {
private TestNiFiInstance testNiFiInstance;
@Override
public Document edit(Document document) throws Exception {
String niFiVersion = getNiFiVersion();
XPath xpath = XPathFactory.newInstance().newXPath();
NodeList processorNodeVersionList = (NodeList)
xpath.evaluate("//bundle/group[text() = \"org.apache.nifi\"]/parent::bundle/version",
document, XPathConstants.NODESET);
final int length = processorNodeVersionList.getLength();
for (int i=0; i<length; i++) {
Node processorNodeVersion = processorNodeVersionList.item(i);
processorNodeVersion.setTextContent(niFiVersion);
}
return document;
}
private String getNiFiVersion() {
return Objects.requireNonNull(
testNiFiInstance, "testNiFiInstance cannot be null").getNifiVersion();
}
@Override
public void setTestNiFiInstance(TestNiFiInstance testNiFiInstance) {
this.testNiFiInstance = Objects.requireNonNull(
testNiFiInstance, "argument testNiFiInstance cannot be null");
}
}
}

View File

@ -0,0 +1,620 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness;
import org.apache.nifi.EmbeddedNiFi;
import org.apache.nifi.testharness.api.FlowFileEditorCallback;
import org.apache.nifi.testharness.util.FileUtils;
import org.apache.nifi.testharness.util.NiFiCoreLibClassLoader;
import org.apache.nifi.testharness.util.XmlUtils;
import org.apache.nifi.testharness.util.Zip;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.zip.ZipEntry;
/**
* <p>
* An API wrapper of a "test" NiFi instance to which a flow definition is installed for testing.</p>
*
* <p>
* Due to NiFi design restrictions, {@code TestNiFiInstance} has to take <i>full command</i>
* of the current working directory: it installs a full NiFi installation to there. To ensure
* this is desired, <strong>it will only run if the current directory is called
* "nifi_testharness_nifi_home"</strong>. As such the JVM process has to be started inside a directory
* called "nifi_testharness_nifi_home" so that the following is true:
*
* <pre><tt>
* new File(System.getProperty("user.dir")).getName().equals("nifi_testharness_nifi_home")
* </tt></pre>
* </p>
*
* <p>
* Before {@code TestNiFiInstance} can be used, it has to be configured via its builder
* interface:
* <ul>
* <li>
* {@link Builder#setNiFiBinaryDistributionZip(File)} specifies the location of the NiFi binary
* distribution ZIP file to be used.
* </li>
* <li>
* {@link Builder#setFlowXmlToInstallForTesting(File)} specifies the location of the NiFi flow
* to install.
* </li>
* <li>
* {@link Builder#modifyFlowXmlBeforeInstalling(FlowFileEditorCallback)} allows on-the-fly
* changes to be performed to the Flow file before it is actually installed.
* </li>
* </ul>
*
* <h5>Sample</h5>
* <pre><tt>
* TestNiFiInstance testNiFiInstance = TestNiFiInstance.builder()
* .setNiFiBinaryDistributionZip(YourConstants.NIFI_ZIP_FILE)
* .setFlowXmlToInstallForTesting(YourConstants.FLOW_XML_FILE)
* .modifyFlowXmlBeforeInstalling(YourConstants.FLOW_FILE_CHANGES_FOR_TESTS)
* .build();
* </tt></pre>
*
* </p>
*
* <p>
* If the current working directory is called "nifi_testharness_nifi_home", the caller can
* {@link #install()} this {@code TestNiFiInstance}, which will
* <ol>
* <li>
* (as a first cleanup step) erase all content of the current working directory.
* (NOTE: this potentially destructive operation is the reason why we have the
* "nifi_testharness_nifi_home" directory name guard in place!)
* </li>
* <li>
* Extracts the contents of the NiFi binary distribution ZIP file specified in
* the configuration to a to a temporary directory.
* <li>
* Symlinks all files from the temporary directory to the current working
* directory, causing the directory to hold a fully functional
* NiFi installation.
* </li>
* <li>
* Installs the flow definition files(s) to the NiFi instance specified in
* the configuration.
* </li>
* </ol>
* </p>
*
* <p>
*
* The caller then can proceed to {@link #start()} this {@code TestNiFiInstance},
* which will bootstrap the NiFi engine, which in turn will pick up and start processing
* the flow definition supplied by the caller in the configuration.
* </p>
*
* <p>
* Once the previous step is done, the caller can perform asserts regarding the observed behaviour
* of the NiFi flow, just like one would do it with standard Java test cases.
* </p>
*
* <p>
* To perform a clean shutdown of the hosted NiFi instance, the caller is required to call
* {@link #stopAndCleanup()}, which will shut down NiFi and remove all temporary files, including
* the symlinks created in the current working directory.
* </p>
*
*
* <h4>NOTES</h4>
* <ul>
* <li>
* {@code TestNiFiInstance} is NOT thread safe: if more than one thread uses it,
* external synchronisation is required.
* </li>
* <li>
* Only one {@code TestNiFiInstance} can be started in the same "nifi_testharness_nifi_home"
* directory at the same time.
* </li>
* <li>
* Currently, due to NiFi limitations, one {@code TestNiFiInstance} can be started per JVM process.
* If multiple test cases are required, launch a new JVM process per test case
* (in sequence, see the point above): Maven/Surefire has built-in support for this.
* </li>
* </ul>
*
* <p>
* <strong>CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!</strong>
* Efforts will be made to retain backwards API compatibility, but
* no guarantee is given.
* </p>
*
*
* @see TestNiFiInstance#builder()
*
*
*/
public class TestNiFiInstance {
private static final Logger LOGGER = LoggerFactory.getLogger(TestNiFiInstance.class);
private EmbeddedNiFi testNiFi;
private final File nifiHomeDir;
private final File bootstrapLibDir;
private File nifiProperties;
private final File flowXmlGz;
private final File placeholderNiFiHomeDir;
private String nifiVersion;
private enum State {
STOPPED,
STOP_FAILED,
START_FAILED(STOPPED),
STARTED(STOPPED, STOP_FAILED),
INSTALLATION_FAILED(),
FLOW_INSTALLED(STARTED, START_FAILED),
INSTALLED(FLOW_INSTALLED, INSTALLATION_FAILED),
CREATED(INSTALLED, INSTALLATION_FAILED);
private final Set<State> allowedTransitions;
State(State... allowedTransitions) {
this.allowedTransitions = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(allowedTransitions)));
}
private void checkCanTransition(State newState) {
if (!this.allowedTransitions.contains(newState)) {
throw new IllegalStateException("Cannot transition from " + this + " to " + newState);
}
}
}
private State currentState = State.CREATED;
private final File nifiBinaryZip;
private final File flowXml;
private final FlowFileEditorCallback editCallback;
private TestNiFiInstance(File nifiBinaryZip, File flowXml, FlowFileEditorCallback editCallback) {
this.nifiBinaryZip = Objects.requireNonNull(nifiBinaryZip, "nifiBinaryZip");
this.flowXml = Objects.requireNonNull(flowXml, "flowXml");
this.editCallback = editCallback;
nifiHomeDir = requireCurrentWorkingDirectoryIsCorrect();
final File configDir = new File(nifiHomeDir, "conf");
final File libDir = new File(nifiHomeDir, "lib");
bootstrapLibDir = new File(libDir, "bootstrap");
nifiProperties = new File(configDir, "nifi.properties");
flowXmlGz = new File(configDir, "flow.xml.gz");
placeholderNiFiHomeDir = requireCurrentWorkingDirectoryIsCorrect();
}
String getNifiVersion() {
switch (currentState) {
case INSTALLED:
case FLOW_INSTALLED:
case STARTED:
case START_FAILED:
case STOP_FAILED:
case STOPPED:
return Objects.requireNonNull(nifiVersion, "nifiVersion is null");
default:
throw new IllegalStateException(
"NiFi version can only be retrieved after a successful installation, not in: "
+ currentState);
}
}
public void install() throws IOException {
currentState.checkCanTransition(State.INSTALLED);
File[] staleInstallations = placeholderNiFiHomeDir.listFiles((dir, name) -> name.startsWith("nifi-"));
if (staleInstallations != null) {
Arrays.stream(staleInstallations).forEach(TestNiFiInstance::deleteFileOrDirectoryRecursively);
}
Path tempDirectory = null;
try {
tempDirectory = Files.createTempDirectory("installable-flow");
LOGGER.info("Uncompressing NiFi archive {} to {} ...", nifiBinaryZip, placeholderNiFiHomeDir);
Zip.unzipFile(nifiBinaryZip, placeholderNiFiHomeDir, new Zip.StatusListenerAdapter() {
@Override
public void onUncompressDone(ZipEntry ze) {
LOGGER.debug("Uncompressed {}", ze.getName());
}
});
LOGGER.info("Uncompressing DONE");
File actualNiFiHomeDir = getActualNiFiHomeDir(placeholderNiFiHomeDir);
nifiVersion = getNiFiVersion(actualNiFiHomeDir);
currentState = State.INSTALLED;
File installableFlowFile = createInstallableFlowFile(tempDirectory);
validateNiFiVersionAgainstFlowVersion(nifiVersion, installableFlowFile);
FileUtils.createSymlinks(placeholderNiFiHomeDir, actualNiFiHomeDir);
installFlowFile(installableFlowFile);
} catch (Exception e) {
currentState = State.INSTALLATION_FAILED;
throw new RuntimeException("Installation failed: " + e.getMessage(), e);
} finally {
if (tempDirectory != null) {
FileUtils.deleteDirectoryRecursive(tempDirectory);
}
}
currentState = State.FLOW_INSTALLED;
}
private File createInstallableFlowFile(Path tempDirectory) throws IOException {
File flowXmlFile = new File(tempDirectory.toFile(), "flow.xml");
if (editCallback == null) {
Files.copy(flowXml.toPath(), flowXmlFile.toPath());
} else {
if (editCallback instanceof TestNiFiInstanceAware) {
((TestNiFiInstanceAware)editCallback).setTestNiFiInstance(this);
}
XmlUtils.editXml(flowXml, flowXmlFile, editCallback);
}
return flowXmlFile;
}
private void installFlowFile(File fileToIncludeInGz) throws IOException {
Zip.gzipFile(fileToIncludeInGz, flowXmlGz);
}
private static String getNiFiVersion(File nifiInstallDir) {
File libDir = new File(nifiInstallDir, "lib");
if (!libDir.exists()) {
throw new IllegalStateException(
"No \"lib\" directory found in NiFi home directory: " + nifiInstallDir);
}
File[] nifiApiJarLookupResults =
libDir.listFiles((dir, name) -> name.startsWith("nifi-api-") && name.endsWith(".jar"));
if (nifiApiJarLookupResults == null) {
// since we check the existence before, this can only be null in case of an I/O error
throw new IllegalStateException(
"I/O error listing NiFi lib directory: " + libDir);
}
if (nifiApiJarLookupResults.length == 0) {
throw new IllegalStateException(
"No \"\"nifi-api-*.jar\" file found in NiFi lib directory: " + libDir);
}
if (nifiApiJarLookupResults.length != 1) {
throw new IllegalStateException(
"Multiple \"nifi-api-*.jar\" files found in NiFi lib directory: " + libDir);
}
File nifiApiJar = nifiApiJarLookupResults[0];
return nifiApiJar.getName()
.replace("nifi-api-", "")
.replace(".jar", "");
}
private static void validateNiFiVersionAgainstFlowVersion(String nifiVersion, File flowFile) {
String flowFileVersion = extractFlowFileVersion(flowFile);
if (flowFileVersion != null
&& !flowFileVersion.equalsIgnoreCase(nifiVersion)) {
// prevent user errors and fail fast in case we detect that the flow file
// was created by a different version of NiFi. This can prevent a lot of confusion!
throw new RuntimeException(String.format(
"The NiFi version referenced in the flow file ('%s') does not match the version of NiFi being used ('%s')",
flowFileVersion, nifiVersion));
}
}
private static String extractFlowFileVersion(File flowFile) {
Document flowDocument = XmlUtils.getFileAsDocument(flowFile);
XPath xpath = XPathFactory.newInstance().newXPath();
try {
NodeList processorNodeVersion = (NodeList)
xpath.evaluate("//bundle/group[text() = \"org.apache.nifi\"]/parent::bundle/version/text()",
flowDocument, XPathConstants.NODESET);
HashSet<String> versionNumbers = new HashSet<>();
final int length = processorNodeVersion.getLength();
for (int i=0; i<length; i++) {
Node item = processorNodeVersion.item(i);
String textContent = item.getTextContent();
versionNumbers.add(textContent);
}
if (versionNumbers.size() == 0) {
return null;
}
if (versionNumbers.size() > 1) {
throw new RuntimeException(
"Multiple NiFi versions found in Flow file, this is unexpected: " + versionNumbers);
}
return versionNumbers.iterator().next();
} catch (XPathExpressionException e) {
throw new RuntimeException("Failure extracting version information from flow file: " + flowFile, e);
}
}
public void start() {
currentState.checkCanTransition(State.STARTED);
try {
if (!bootstrapLibDir.exists()) {
throw new IllegalStateException("Not found: " + bootstrapLibDir);
}
System.setProperty("org.apache.jasper.compiler.disablejsr199", "true");
System.setProperty("java.security.egd", "file:/dev/urandom");
System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("java.awt.headless", "true");
System.setProperty("java.protocol.handler.pkgs", "sun.net.www.protocol");
System.setProperty("nifi.properties.file.path", nifiProperties.getAbsolutePath());
System.setProperty("app", "NiFi");
System.setProperty("org.apache.nifi.bootstrap.config.log.dir", "./logs");
ClassLoader coreClassLoader = new NiFiCoreLibClassLoader(nifiHomeDir, ClassLoader.getSystemClassLoader());
Thread.currentThread().setContextClassLoader(coreClassLoader);
this.testNiFi = new EmbeddedNiFi(new String[0], coreClassLoader);
} catch (Exception ex) {
currentState = State.START_FAILED;
throw new RuntimeException("Startup failed", ex);
}
currentState = State.STARTED;
}
public void stopAndCleanup() {
currentState.checkCanTransition(State.STOPPED);
try {
testNiFi.shutdown();
removeNiFiFilesCreatedForTemporaryInstallation(placeholderNiFiHomeDir);
} catch (Exception e) {
currentState = State.STOP_FAILED;
throw new RuntimeException(e);
}
currentState = State.STOPPED;
}
private static File requireCurrentWorkingDirectoryIsCorrect() {
File currentWorkDir = new File(System.getProperty("user.dir"));
if (!currentWorkDir.getName().equals("nifi_testharness_nifi_home")) {
throw new IllegalStateException(
"The test's working directory has to be set to nifi_testharness_nifi_home, but was: " + currentWorkDir);
}
return currentWorkDir;
}
private static File getActualNiFiHomeDir(File currentDir) {
File[] files = currentDir.listFiles((dir, name) -> name.startsWith("nifi-"));
if (files == null || files.length == 0) {
throw new IllegalStateException(
"No \"nifi-*\" directory found in temporary NiFi home directory container: " + currentDir);
}
if (files.length != 1) {
throw new IllegalStateException(
"Multiple \"nifi-*\" directories found in temporary NiFi home directory container: " + currentDir);
}
return files[0];
}
private static void removeNiFiFilesCreatedForTemporaryInstallation(File directoryToClear) {
if (directoryToClear != null) {
File[] directoryContents = directoryToClear.listFiles();
if (directoryContents != null) {
Arrays.stream(directoryContents)
.filter(file -> !"NIFI_TESTHARNESS_README.txt".equals(file.getName()))
.forEach(TestNiFiInstance::deleteFileOrDirectoryRecursively);
}
}
}
private static void deleteFileOrDirectoryRecursively(File file) {
if (file.isDirectory()) {
FileUtils.deleteDirectoryRecursive(file);
} else {
boolean deletedSuccessfully = file.delete();
if (!deletedSuccessfully) {
throw new RuntimeException("Could not delete: " + file);
}
}
}
@Override
public String toString() {
return "NiFi test instance(" + Integer.toHexString(hashCode())
+ ") state: " + currentState + ", home: " + nifiHomeDir;
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private boolean isDisposed = false;
private File nifiBinaryZip;
private File flowXml;
private FlowFileEditorCallback editCallback;
/**
* Sets the location of the NiFi binary distribution file, from which the test instance
* will be uncompressed and built.
*
* @param nifiBinaryZip
* the NiFi binary distribution file, from which the test instance will be built (never {@code null})
* @return {@code this} (for method chaining)
*/
public Builder setNiFiBinaryDistributionZip(File nifiBinaryZip) {
if (!nifiBinaryZip.exists()) {
throw new IllegalArgumentException("File not found: " + nifiBinaryZip);
}
if (nifiBinaryZip.isDirectory()) {
throw new IllegalArgumentException("A ZIP file is expected to be specified, not a directory: "
+ nifiBinaryZip);
}
this.nifiBinaryZip = nifiBinaryZip;
return this;
}
/**
* Sets the NiFi flow XML, which will be installed to the NiFi instance for testing.
*
* @param flowXml the NiFi flow file to install to the test instance for testing (never {@code null})
*
* @return {@code this} (for method chaining)
*/
public Builder setFlowXmlToInstallForTesting(File flowXml) {
if (!flowXml.exists()) {
throw new IllegalArgumentException("File not found: " + flowXml);
}
this.flowXml = flowXml;
return this;
}
/**
* <p>
* An <strong>optional</strong> callback to change the flow definition read from
* {@link #setFlowXmlToInstallForTesting(File)}, before it is actually installed for testing.
* (NOTE: The original file remains unchanged: changes are applied to a copy of it.)</p>
*
* <p>
* NOTE: {@link SimpleNiFiFlowDefinitionEditor} provides various common flow definition changes
* useful for testing.
* </p>
*
* @param callback an <strong>optional</strong> callback to change the flow definition
*
* @return {@code this} (for method chaining)
*
* @see SimpleNiFiFlowDefinitionEditor
*/
public Builder modifyFlowXmlBeforeInstalling(FlowFileEditorCallback callback) {
this.editCallback = callback;
return this;
}
public TestNiFiInstance build() {
if (isDisposed) {
throw new IllegalStateException("builder can only be used once");
}
isDisposed = true;
return new TestNiFiInstance(nifiBinaryZip, flowXml, editCallback);
}
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness;
public interface TestNiFiInstanceAware {
void setTestNiFiInstance(TestNiFiInstance testNiFiInstance);
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness.api;
import org.w3c.dom.Document;
/**
* <p>
* An interface that allows programmatic access to the contents of a NiFi Flow XML,
* allowing changes to be performed before it
* is actually installed to the NiFi instance.</p>
*
* <p>
* <strong>CAUTION: THIS IS AN EXPERIMENTAL API: EXPECT CHANGES!</strong>
* Efforts will be made to retain backwards API compatibility, but
* no guarantee is given.
* </p>
*
*/
public interface FlowFileEditorCallback {
/**
*
* @param document the document to change (never {@code null})
* @return the changed document (never {@code null})
* @throws Exception in case the editing fails
*/
Document edit(Document document) throws Exception;
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness.util;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
public final class FileUtils {
private static final String MAC_DS_STORE_NAME = ".DS_Store";
private FileUtils() {
// no instances
}
public static void deleteDirectoryRecursive(Path directory) throws IOException {
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
public static void deleteDirectoryRecursive(File dir) {
try {
deleteDirectoryRecursive(dir.toPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void createLink(Path newLink, Path existingFile) {
try {
Files.createSymbolicLink(newLink, existingFile);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void createSymlinks(File newLinkDir, File existingDir) {
Arrays.stream(existingDir.list())
.filter(fileName -> !MAC_DS_STORE_NAME.equals(fileName))
.forEach(fileName -> {
Path newLink = Paths.get(newLinkDir.getAbsolutePath(), fileName);
Path existingFile = Paths.get(existingDir.getAbsolutePath(), fileName);
File symlinkFile = newLink.toFile();
if (symlinkFile.exists()) {
symlinkFile.delete();
}
createLink(newLink, existingFile);
});
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness.util;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
public final class NiFiCoreLibClassLoader extends URLClassLoader {
public NiFiCoreLibClassLoader(File nifiHomeDir, ClassLoader parent) {
super(getURls(nifiHomeDir), parent);
}
private static URL[] getURls(File nifiHomeDir) {
try {
File libDir = new File(nifiHomeDir, "lib");
File bootstrapLibDir = new File(libDir, "bootstrap");
List<URL> libs = Files.list(libDir.toPath())
.filter(NiFiCoreLibClassLoader::isJarOrNarFile)
.map(NiFiCoreLibClassLoader::toURL)
.collect(Collectors.toList());
List<URL> bootstrapLibs = Files.list(bootstrapLibDir.toPath())
.filter(NiFiCoreLibClassLoader::isJarOrNarFile)
.map(NiFiCoreLibClassLoader::toURL)
.collect(Collectors.toList());
LinkedList<URL> urls = new LinkedList<>();
urls.addAll(libs);
urls.addAll(bootstrapLibs);
return urls.toArray(new URL[urls.size()]);
} catch (IOException ioEx) {
throw new RuntimeException(ioEx);
}
}
private static URL toURL(Path path) {
try {
return path.toUri().toURL();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}
private static boolean isJarOrNarFile(Path path) {
String fullPathString = path.getFileName().toString();
return path.toFile().isFile() && fullPathString.endsWith(".jar");
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness.util;
import org.apache.nifi.testharness.api.FlowFileEditorCallback;
import org.w3c.dom.Document;
import org.xml.sax.InputSource;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.File;
import java.io.FileInputStream;
public final class XmlUtils {
public static void editXml(File inputFile, File outputFile, FlowFileEditorCallback editCallback) {
try {
Document document = getFileAsDocument(inputFile);
document = editCallback.edit(document);
// save the result
TransformerFactory transformerFactory = TransformerFactory.newInstance();
Transformer transformer = transformerFactory.newTransformer();
transformer.transform(new DOMSource(document), new StreamResult(outputFile));
} catch (Exception e) {
throw new RuntimeException("Failed to change XML document: " + e.getMessage(), e);
}
}
public static Document getFileAsDocument(File xmlFile) {
try(FileInputStream inputStream = new FileInputStream(xmlFile)) {
DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder documentBuilder = documentBuilderFactory.newDocumentBuilder();
return documentBuilder.parse(new InputSource(inputStream));
} catch (Exception e) {
throw new RuntimeException("Failed to parse XML file: " + xmlFile, e);
}
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public final class Zip {
private Zip() {
// no external instances allowed
}
public interface StatusListener {
void onUncompressStarted(ZipEntry ze);
void onUncompressDone(ZipEntry ze);
}
public static class StatusListenerAdapter implements StatusListener {
@Override
public void onUncompressStarted(ZipEntry ze) {
}
@Override
public void onUncompressDone(ZipEntry ze) {
}
}
private static final StatusListener NO_OP_STATUS_LISTENER = new StatusListenerAdapter();
public static void unzipFile(File zipFile, File targetDirectory) throws IOException {
unzipFile(zipFile, targetDirectory, NO_OP_STATUS_LISTENER);
}
public static void unzipFile(File zipFile, File targetDirectory,
StatusListener statusListener) throws IOException {
if (!targetDirectory.exists()) {
boolean mkdirs = targetDirectory.mkdirs();
if (!mkdirs) {
throw new IOException("Failed to create directory: " + targetDirectory);
}
}
try (ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(zipFile))) {
ZipEntry ze = zipInputStream.getNextEntry();
while (ze != null) {
if(ze.isDirectory()) {
ze = zipInputStream.getNextEntry();
continue;
}
statusListener.onUncompressStarted(ze);
String fileName = ze.getName();
File outputFile = new File(targetDirectory, fileName);
File parentDir = new File(outputFile.getParent());
if (!parentDir.exists()) {
boolean couldCreateParentDir = parentDir.mkdirs();
if (!couldCreateParentDir) {
throw new IllegalStateException("Could not create: " + parentDir);
}
}
Files.copy(zipInputStream, outputFile.toPath());
statusListener.onUncompressDone(ze);
ze = zipInputStream.getNextEntry();
}
zipInputStream.closeEntry();
}
}
public static void gzipFile(File inputFile, File gzipFile) throws IOException {
try (GZIPOutputStream gzos =
new GZIPOutputStream(new FileOutputStream(gzipFile))) {
Files.copy(inputFile.toPath(), gzos);
gzos.finish();
}
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness.samples;
import java.io.File;
public final class Constants {
static final File OUTPUT_DIR = new File("./NiFiTest/NiFiReadTest");
// NOTE: you will have to have the NiFi distribution ZIP placed into this directory.
// Its version must be the same as the one referenced in the flow.xml, otherwise it will not work!
static final File NIFI_ZIP_DIR = new File("../../nifi-assembly/target");
static final File FLOW_XML_FILE = new File(NiFiMockFlowTest.class.getResource("/flow.xml").getFile());
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness.samples;
import org.apache.nifi.testharness.SimpleNiFiFlowDefinitionEditor;
import org.apache.nifi.testharness.TestNiFiInstance;
import org.apache.nifi.testharness.util.FileUtils;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.List;
import static org.junit.Assert.assertTrue;
/**
* This test demonstrates how to mock the source data by starting a mock HTTP server (using Jetty)
* and rewriting the URL in flow definition.
*/
public class NiFiFlowTest {
private static final SimpleNiFiFlowDefinitionEditor CONFIGURE_MOCKS_IN_NIFI_FLOW = SimpleNiFiFlowDefinitionEditor.builder()
.updateFlowFileBuiltInNiFiProcessorVersionsToNiFiVersion()
.setSingleProcessorProperty("GetHTTP", "URL", "http://localhost:12345")
.build();
// used by mocked GetHTTP; serves test data
private static Server testJettyServer;
private TestNiFiInstance testNiFiInstance;
@BeforeClass
public static void beforeClass() throws Exception {
NiFiFlowTest.testJettyServer = new Server(12345);
Handler handler = new TestHandler();
NiFiFlowTest.testJettyServer.setHandler(handler);
NiFiFlowTest.testJettyServer.start();
}
@Before
public void bootstrapNiFi() throws Exception {
if (Constants.OUTPUT_DIR.exists()) {
FileUtils.deleteDirectoryRecursive(Constants.OUTPUT_DIR.toPath());
}
File nifiZipFile = TestUtils.getBinaryDistributionZipFile(Constants.NIFI_ZIP_DIR);
TestNiFiInstance testNiFi = TestNiFiInstance.builder()
.setNiFiBinaryDistributionZip(nifiZipFile)
.setFlowXmlToInstallForTesting(Constants.FLOW_XML_FILE)
.modifyFlowXmlBeforeInstalling(CONFIGURE_MOCKS_IN_NIFI_FLOW)
.build();
testNiFi.install();
testNiFi.start();
// only assign testNiFi to the field in case it was started successfully
testNiFiInstance = testNiFi;
}
@Test
public void testFlowCreatesFilesInCorrectLocation() throws IOException {
// We deleted the output directory: our NiFi flow should create it
assertTrue("Output directory not found: " + Constants.OUTPUT_DIR, Constants.OUTPUT_DIR.exists());
File outputFile = new File(Constants.OUTPUT_DIR, "bbc-world.rss.xml");
assertTrue("Output file not found: " + outputFile, outputFile.exists());
List<String> strings = Files.readAllLines(outputFile.toPath());
boolean atLeastOneLineContainsNiFi = strings.stream().anyMatch(line -> line.toLowerCase().contains("nifi"));
assertTrue("There was no line containing NiFi", atLeastOneLineContainsNiFi);
boolean atLeastOneLineContainsNiFiVersion = strings.stream().anyMatch(line -> line.toLowerCase().contains("latest nifi version"));
assertTrue("There was no line containing 'latest NiFi version'", atLeastOneLineContainsNiFiVersion);
}
@After
public void shutdownNiFi() {
if (testNiFiInstance != null) {
testNiFiInstance.stopAndCleanup();
}
}
@AfterClass
public static void afterClass() throws Exception {
NiFiFlowTest.testJettyServer.stop();
}
private static class TestHandler extends org.eclipse.jetty.server.handler.AbstractHandler {
@Override
public void handle(
String target,
Request baseRequest,
HttpServletRequest httpServletRequest,
HttpServletResponse response) throws IOException, ServletException {
response.setContentType("text/html;charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
InputStream resource = TestHandler.class.getResourceAsStream("/sample_technology_rss.xml");
ServletOutputStream outputStream = response.getOutputStream();
byte[] buffer = new byte[1024];
int len;
while ((len = resource.read(buffer)) != -1) {
outputStream.write(buffer, 0, len);
}
}
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness.samples;
import org.apache.nifi.testharness.SimpleNiFiFlowDefinitionEditor;
import org.apache.nifi.testharness.TestNiFiInstance;
import org.apache.nifi.testharness.samples.mock.GetHTTPMock;
import org.apache.nifi.testharness.util.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.List;
import static org.junit.Assert.assertTrue;
/**
* This test demonstrates how to mock the source data by mocking the processor
* itself in the flow definition.
*/
public class NiFiMockFlowTest {
private static final InputStream DEMO_DATA_AS_STREAM =
NiFiMockFlowTest.class.getResourceAsStream("/sample_technology_rss.xml");
// We have a dedicated class. It has to be public static
// so that NiFi engine can instantiate it.
public static class MockedGetHTTP extends GetHTTPMock {
public MockedGetHTTP() {
super("text/xml; charset=utf-8", () -> DEMO_DATA_AS_STREAM);
}
}
private static final SimpleNiFiFlowDefinitionEditor CONFIGURE_MOCKS_IN_NIFI_FLOW = SimpleNiFiFlowDefinitionEditor.builder()
.updateFlowFileBuiltInNiFiProcessorVersionsToNiFiVersion()
.setClassOfSingleProcessor("GetHTTP", MockedGetHTTP.class)
.build();
private TestNiFiInstance testNiFiInstance;
@Before
public void bootstrapNiFi() throws Exception {
if (Constants.OUTPUT_DIR.exists()) {
FileUtils.deleteDirectoryRecursive(Constants.OUTPUT_DIR.toPath());
}
File nifiZipFile = TestUtils.getBinaryDistributionZipFile(Constants.NIFI_ZIP_DIR);
TestNiFiInstance testNiFi = TestNiFiInstance.builder()
.setNiFiBinaryDistributionZip(nifiZipFile)
.setFlowXmlToInstallForTesting(Constants.FLOW_XML_FILE)
.modifyFlowXmlBeforeInstalling(CONFIGURE_MOCKS_IN_NIFI_FLOW)
.build();
testNiFi.install();
testNiFi.start();
// only assign testNiFi to the field in case it was started successfully
testNiFiInstance = testNiFi;
}
@Test
public void testFlowCreatesFilesInCorrectLocation() throws IOException {
// We deleted the output directory: our NiFi flow should create it
assertTrue("Output directory not found: " + Constants.OUTPUT_DIR, Constants.OUTPUT_DIR.exists());
File outputFile = new File(Constants.OUTPUT_DIR, "bbc-world.rss.xml");
assertTrue("Output file not found: " + outputFile, outputFile.exists());
List<String> strings = Files.readAllLines(outputFile.toPath());
boolean atLeastOneLineContainsNiFi = strings.stream().anyMatch(line -> line.toLowerCase().contains("nifi"));
assertTrue("There was no line containing NiFi", atLeastOneLineContainsNiFi);
boolean atLeastOneLineContainsNiFiVersion = strings.stream().anyMatch(line -> line.toLowerCase().contains("latest nifi version"));
assertTrue("There was no line containing 'latest NiFi version'", atLeastOneLineContainsNiFiVersion);
}
@After
public void shutdownNiFi() {
if (testNiFiInstance != null) {
testNiFiInstance.stopAndCleanup();
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness.samples;
import java.io.File;
final class TestUtils {
private TestUtils() {
// no instances allowed
}
static File getBinaryDistributionZipFile(File binaryDistributionZipDir) {
if (!binaryDistributionZipDir.exists()) {
throw new IllegalStateException("NiFi distribution ZIP file not found at the expected location: "
+ binaryDistributionZipDir.getAbsolutePath());
}
File[] files = binaryDistributionZipDir.listFiles((dir, name) ->
name.startsWith("nifi-") && name.endsWith("-bin.zip"));
if (files == null) {
throw new IllegalStateException(
"Not a directory or I/O error reading: " + binaryDistributionZipDir.getAbsolutePath());
}
if (files.length == 0) {
throw new IllegalStateException(
"No NiFi distribution ZIP file is found in: " + binaryDistributionZipDir.getAbsolutePath());
}
if (files.length > 1) {
throw new IllegalStateException(
"Multiple NiFi distribution ZIP files are found in: " + binaryDistributionZipDir.getAbsolutePath());
}
return files[0];
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness.samples.mock;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.StopWatch;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class GetHTTPMock extends MockProcessor {
private final String contentType;
private final Supplier<InputStream> inputStreamSupplier;
public GetHTTPMock(String contentType, Supplier<InputStream> inputStreamSupplier) {
super("org.apache.nifi.processors.standard.GetHTTP");
this.contentType = contentType;
this.inputStreamSupplier = inputStreamSupplier;
}
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All files are transferred to the success relationship")
.build();
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory processSessionFactory) {
final ComponentLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true);
final ProcessSession session = processSessionFactory.createSession();
final String url = context.getProperty("URL").evaluateAttributeExpressions().getValue();
final URI uri;
String source = url;
try {
uri = new URI(url);
source = uri.getHost();
} catch (final URISyntaxException swallow) {
// this won't happen as the url has already been validated
}
FlowFile flowFile = session.create();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), context.getProperty("Filename").evaluateAttributeExpressions().getValue());
flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", source);
flowFile = session.importFrom(inputStreamSupplier.get(), flowFile);
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), contentType);
final long flowFileSize = flowFile.getSize();
stopWatch.stop();
session.getProvenanceReporter().receive(flowFile, url, stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
final String dataRate = stopWatch.calculateDataRate(flowFileSize);
logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate});
session.commit();
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.testharness.samples.mock;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import java.util.Collection;
import java.util.List;
import java.util.Set;
public abstract class MockProcessor implements Processor {
private final Processor delegate;
private ComponentLog logger;
protected MockProcessor(String delegateClassName) {
try {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
final Class<?> delegateClass = Class.forName(delegateClassName, true, contextClassLoader);
delegate = (Processor) delegateClass.newInstance();
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
throw new RuntimeException(e);
}
}
protected Processor getDelegate() {
return delegate;
}
protected final ComponentLog getLogger() {
return logger;
}
@Override
public void initialize(ProcessorInitializationContext processorInitializationContext) {
getDelegate().initialize(processorInitializationContext);
logger = processorInitializationContext.getLogger();
}
@Override
public Set<Relationship> getRelationships() {
return getDelegate().getRelationships();
}
@Override
public abstract void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory);
@Override
public Collection<ValidationResult> validate(ValidationContext validationContext) {
return getDelegate().validate(validationContext);
}
@Override
public PropertyDescriptor getPropertyDescriptor(String s) {
return getDelegate().getPropertyDescriptor(s);
}
@Override
public void onPropertyModified(PropertyDescriptor propertyDescriptor, String s, String s1) {
getDelegate().onPropertyModified(propertyDescriptor, s, s1);
}
@Override
public List<PropertyDescriptor> getPropertyDescriptors() {
return getDelegate().getPropertyDescriptors();
}
@Override
public String getIdentifier() {
return getDelegate().getIdentifier();
}
}

View File

@ -0,0 +1,154 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<flowController encoding-version="1.3">
<maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
<maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
<registries/>
<rootGroup>
<id>92b74849-0166-1000-28d3-4da912e34551</id>
<name>NiFi Flow</name>
<position x="0.0" y="0.0"/>
<comment/>
<processor>
<id>92b9139c-0166-1000-04d5-1184adc0977a</id>
<name>PutFile</name>
<position x="632.0" y="98.0"/>
<styles/>
<comment/>
<class>org.apache.nifi.processors.standard.PutFile</class>
<bundle>
<group>org.apache.nifi</group>
<artifact>nifi-standard-nar</artifact>
<version>1.7.1</version>
</bundle>
<maxConcurrentTasks>1</maxConcurrentTasks>
<schedulingPeriod>0 sec</schedulingPeriod>
<penalizationPeriod>30 sec</penalizationPeriod>
<yieldPeriod>1 sec</yieldPeriod>
<bulletinLevel>WARN</bulletinLevel>
<lossTolerant>false</lossTolerant>
<scheduledState>RUNNING</scheduledState>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<executionNode>ALL</executionNode>
<runDurationNanos>0</runDurationNanos>
<property>
<name>Directory</name>
<value>./NiFiTest/NiFiReadTest</value>
</property>
<property>
<name>Conflict Resolution Strategy</name>
<value>ignore</value>
</property>
<property>
<name>Create Missing Directories</name>
<value>true</value>
</property>
<property>
<name>Maximum File Count</name>
</property>
<property>
<name>Last Modified Time</name>
</property>
<property>
<name>Permissions</name>
</property>
<property>
<name>Owner</name>
</property>
<property>
<name>Group</name>
</property>
<autoTerminatedRelationship>success</autoTerminatedRelationship>
<autoTerminatedRelationship>failure</autoTerminatedRelationship>
</processor>
<processor>
<id>92b87553-0166-1000-527e-7ecdc888d91a</id>
<name>GetHTTP</name>
<position x="238.0" y="98.0"/>
<styles/>
<comment/>
<class>org.apache.nifi.processors.standard.GetHTTP</class>
<bundle>
<group>org.apache.nifi</group>
<artifact>nifi-standard-nar</artifact>
<version>1.7.1</version>
</bundle>
<maxConcurrentTasks>1</maxConcurrentTasks>
<schedulingPeriod>0 sec</schedulingPeriod>
<penalizationPeriod>30 sec</penalizationPeriod>
<yieldPeriod>1 sec</yieldPeriod>
<bulletinLevel>WARN</bulletinLevel>
<lossTolerant>false</lossTolerant>
<scheduledState>RUNNING</scheduledState>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<executionNode>ALL</executionNode>
<runDurationNanos>0</runDurationNanos>
<property>
<name>URL</name>
<value>http://feeds.bbci.co.uk/news/technology/rss.xml?edition=uk#</value>
</property>
<property>
<name>Filename</name>
<value>bbc-world.rss.xml</value>
</property>
<property>
<name>SSL Context Service</name>
</property>
<property>
<name>Username</name>
</property>
<property>
<name>Password</name>
</property>
<property>
<name>Connection Timeout</name>
<value>30 sec</value>
</property>
<property>
<name>Data Timeout</name>
<value>30 sec</value>
</property>
<property>
<name>User Agent</name>
</property>
<property>
<name>Accept Content-Type</name>
</property>
<property>
<name>Follow Redirects</name>
<value>false</value>
</property>
<property>
<name>redirect-cookie-policy</name>
<value>default</value>
</property>
<property>
<name>proxy-configuration-service</name>
</property>
<property>
<name>Proxy Host</name>
</property>
<property>
<name>Proxy Port</name>
</property>
</processor>
<connection>
<id>92b9380b-0166-1000-981d-c9e319f135e3</id>
<name/>
<bendPoints/>
<labelIndex>1</labelIndex>
<zIndex>0</zIndex>
<sourceId>92b87553-0166-1000-527e-7ecdc888d91a</sourceId>
<sourceGroupId>92b74849-0166-1000-28d3-4da912e34551</sourceGroupId>
<sourceType>PROCESSOR</sourceType>
<destinationId>92b9139c-0166-1000-04d5-1184adc0977a</destinationId>
<destinationGroupId>92b74849-0166-1000-28d3-4da912e34551</destinationGroupId>
<destinationType>PROCESSOR</destinationType>
<relationship>success</relationship>
<maxWorkQueueSize>10000</maxWorkQueueSize>
<maxWorkQueueDataSize>1 GB</maxWorkQueueDataSize>
<flowFileExpiration>0 sec</flowFileExpiration>
</connection>
</rootGroup>
<controllerServices/>
<reportingTasks/>
</flowController>

View File

@ -0,0 +1,15 @@
<configuration debug="true">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are by default assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet title="XSL_formatting" type="text/xsl" href="/shared/bsp/xsl/rss/nolsol.xsl"?>
<rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:media="http://search.yahoo.com/mrss/">
<channel>
<title>Sample Technology feed</title>
<description>Sample Technology feed</description>
<image>
<url>https://nifi.apache.org/assets/images/apache-nifi-logo.svg</url>
<title>NiFi sample</title>
<link>https://nifi.apache.org/</link>
</image>
<language>en-gb</language>
<ttl>15</ttl>
<item>
<title>The latest NiFi version is out</title>
<description>The latest version of NiFi is released</description>
<link>https://nifi.apache.org/</link>
<guid isPermaLink="true">https://nifi.apache.org/</guid>
<pubDate>Sat, 24 Sep 2018 17:10:10 GMT</pubDate>
<media:thumbnail width="1024" height="576" url="https://nifi.apache.org/assets/images/apache-nifi-logo.svg"/>
</item>
</channel>
</rss>

View File

@ -30,6 +30,7 @@
<module>nifi-mock</module>
<module>nifi-nar-bundles</module>
<module>nifi-assembly</module>
<module>nifi-testharness</module>
<module>nifi-docs</module>
<module>nifi-maven-archetypes</module>
<module>nifi-external</module>