NIFI-1571 initial commit of SpringContext support

Reviewed by Tony Kurc (tkurc@apache.org) and Joe Witt (joewitt@apache.org). This closes #271
This commit is contained in:
Oleg Zhurakousky 2016-03-02 13:35:26 -05:00 committed by trkurc
parent e12e7a55b7
commit 8f40d2b181
23 changed files with 1740 additions and 0 deletions

View File

@ -302,6 +302,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-cassandra-nar</artifactId> <artifactId>nifi-cassandra-nar</artifactId>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spring-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies> </dependencies>
<properties> <properties>

View File

@ -0,0 +1,35 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spring-bundle</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-spring-nar</artifactId>
<packaging>nar</packaging>
<description>NiFi NAR for interacting with Spring</description>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spring-processors</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,209 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.

View File

@ -0,0 +1,19 @@
nifi-spring-nar
Copyright 2014-2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2014 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())

View File

@ -0,0 +1,75 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF licenses
this file to You under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License. -->
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spring-bundle</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-spring-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>4.2.4.RELEASE</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>4.2.4.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-event</artifactId>
<version>4.2.4.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.spring;
import java.io.File;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class which provides factory method to create and initialize Spring
* Application Context while scoping it within the dedicated Class Loader.
*/
final class SpringContextFactory {
private static final Logger logger = LoggerFactory.getLogger(SpringContextFactory.class);
private static final String SC_DELEGATE_NAME = "org.apache.nifi.spring.bootstrap.SpringContextDelegate";
/**
* Creates and instance of Spring Application Context scoped within a
* dedicated Class Loader.
* <br>
* The core task of this factory is to load delegate that supports message
* exchange with Spring Application Context ({@link SpringDataExchanger})
* using the same class loader used to load the Application Context. Such
* class loader isolation is required to ensure that multiple instances of
* Application Context (representing different applications) and the
* corresponding delegate can exist per single instance of Spring NAR.
* <br>
* The mechanism used here is relatively simple. While
* {@link SpringDataExchanger} is available to the current class loader and
* would normally be loaded once per instance of NAR, the below factory
* method first obtains class bytes for {@link SpringDataExchanger} and then
* loads it from these bytes via ClassLoader.defineClass(..) method, thus
* ensuring that multiple instances of {@link SpringDataExchanger} class can
* exist and everything that is loaded within its scope is using its class
* loader. Upon exit, the class loader is destroyed via close method
* essentially with everything that it loaded.
* <br>
* Also, during the initialization of {@link SpringDataExchanger} the new
* class loader is set as Thread.contextClassLoader ensuring that if there
* are any libraries used by Spring beans that rely on loading resources via
* Thread.contextClassLoader can find such resources.
*/
static SpringDataExchanger createSpringContextDelegate(String classpath, String config) {
URL[] urls = gatherAdditionalClassPathUrls(classpath);
SpringContextClassLoader contextCl = new SpringContextClassLoader(urls,
SpringContextFactory.class.getClassLoader());
ClassLoader tContextCl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(contextCl);
try {
InputStream delegateStream = contextCl.getResourceAsStream(SC_DELEGATE_NAME.replace('.', '/') + ".class");
byte[] delegateBytes = IOUtils.toByteArray(delegateStream);
Class<?> clazz = contextCl.doDefineClass(SC_DELEGATE_NAME, delegateBytes, 0, delegateBytes.length);
Constructor<?> ctr = clazz.getDeclaredConstructor(String.class);
ctr.setAccessible(true);
SpringDataExchanger springDelegate = (SpringDataExchanger) ctr.newInstance(config);
if (logger.isInfoEnabled()) {
logger.info("Successfully instantiated Spring Application Context from '" + config + "'");
}
return springDelegate;
} catch (Exception e) {
try {
contextCl.close();
} catch (Exception e2) {
// ignore
}
throw new IllegalStateException("Failed to instantiate Spring Application Context. Config path: '" + config
+ "'; Classpath: " + Arrays.asList(urls), e);
} finally {
Thread.currentThread().setContextClassLoader(tContextCl);
}
}
/**
*
*/
private static URL[] gatherAdditionalClassPathUrls(String path) {
if (logger.isDebugEnabled()) {
logger.debug("Adding additional resources from '" + path + "' to the classpath.");
}
File libraryDir = new File(path);
if (libraryDir.exists() && libraryDir.isDirectory()) {
String[] cpResourceNames = libraryDir.list();
try {
List<URL> urls = new ArrayList<>();
for (int i = 0; i < cpResourceNames.length; i++) {
URL url = new File(libraryDir, cpResourceNames[i]).toURI().toURL();
urls.add(url);
if (logger.isDebugEnabled()) {
logger.debug("Identifying additional resource to the classpath: " + url);
}
}
return urls.toArray(new URL[] {});
} catch (Exception e) {
throw new IllegalStateException("Failed to parse user libraries from '" + libraryDir.getAbsolutePath() + "'", e);
}
} else {
throw new IllegalArgumentException("Path '" + libraryDir.getAbsolutePath()
+ "' is not valid because it doesn't exist or does not point to a directory.");
}
}
/**
*
*/
private static class SpringContextClassLoader extends URLClassLoader {
/**
*
*/
public SpringContextClassLoader(URL[] urls, ClassLoader parent) {
super(urls, parent);
}
/**
*
*/
public final Class<?> doDefineClass(String name, byte[] b, int off, int len) {
return this.defineClass(name, b, off, len);
}
}
}

View File

@ -0,0 +1,448 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.spring;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
/**
* Implementation of {@link Processor} capable of sending and receiving data
* from application defined in Spring Application context. It does so via
* predefined in/out {@link MessageChannel}s (see spring-messaging module of
* Spring). Once such channels are defined user is free to implement the rest of
* the application any way they wish (e.g., custom code and/or using frameworks
* such as Spring Integration or Camel).
* <p>
* The requirement and expectations for channel types are:
* <ul>
* <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi"
* (see {@link SpringNiFiConstants#FROM_NIFI})</li>
* <li>Output channel must be of type {@link PollableChannel} and named "toNiFi"
* (see {@link SpringNiFiConstants#TO_NIFI})</li>
* </ul>
* </p>
* Below is the example of sample configuration:
*
* <pre>
* &lt;?xml version="1.0" encoding="UTF-8"?&gt;
* &lt;beans xmlns="http://www.springframework.org/schema/beans"
* xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
* xmlns:int="http://www.springframework.org/schema/integration"
* xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
* http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"&gt;
*
* &lt;int:channel id="fromNiFi"/&gt;
*
* . . . . .
*
* &lt;int:channel id="toNiFi"&gt;
* &lt;int:queue/&gt;
* &lt;/int:channel&gt;
*
* &lt;/beans&gt;
* </pre>
* <p>
* Defining {@link MessageChannel} is optional. That's why this processor
* supports 3 modes of interaction with Spring Application Context:
* <ul>
* <li>Headless no channels are defined therefore nothing is sent to or
* received from such Application Contexts (i.e., some monitoring app).</li>
* <li>One way (NiFi -&gt; Spring or Spring -&gt; NiFi) - depends on existence
* of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context.
* </li>
* <li>Bi-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt;
* Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in
* the Spring Application Context</li>
* </ul>
*
* </p>
* <p>
* To create an instance of the ApplicationConetxt this processor requires user
* to provide configuration file path and the path to the resources that needs
* to be added to the classpath of ApplicationContext. This essentially allows
* user to package their Spring Application any way they want as long as
* everything it requires is available on the classpath.
* </p>
* <p>
* Data exchange between Spring and NiFi relies on simple mechanism which is
* exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is
* converted to primitive representation that can be easily wrapped in Spring
* {@link Message}. The requirement imposed by this Processor is to send/receive
* {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type
* <i>Map&lt;String, Object&gt;</i>. This is primarily for simplicity and type
* safety. Converters and Transformers could be used by either side to change
* representation of the content that is being exchanged between NiFi and
* Spring.
*/
@TriggerWhenEmpty
@Tags({ "Spring", "Message", "Get", "Put", "Integration" })
@CapabilityDescription("A Processor that supports sending and receiving data from application defined in "
+ "Spring Application Context via predefined in/out MessageChannels.")
public class SpringContextProcessor extends AbstractProcessor {
private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
public static final PropertyDescriptor CTX_CONFIG_PATH = new PropertyDescriptor.Builder()
.name("Application Context config path")
.description("The path to the Spring Application Context configuration file relative to the classpath")
.required(true)
.addValidator(new SpringContextConfigValidator())
.build();
public static final PropertyDescriptor CTX_LIB_PATH = new PropertyDescriptor.Builder()
.name("Application Context class path")
.description("Path to the directory with resources (i.e., JARs, configuration files etc.) required to be on "
+ "the classpath of the ApplicationContext.")
.addValidator(new SpringContextConfigValidator())
.required(true)
.build();
public static final PropertyDescriptor SEND_TIMEOUT = new PropertyDescriptor.Builder()
.name("Send Timeout")
.description("Timeout for sending data to Spring Application Context. Defaults to 0.")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder()
.name("Receive Timeout")
.description("Timeout for receiving date from Spring context. Defaults to 0.")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
// ====
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description(
"All FlowFiles that are successfully received from Spring Application Context are routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description(
"All FlowFiles that cannot be sent to Spring Application Context are routed to this relationship")
.build();
private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors;
// =======
private volatile String applicationContextConfigFileName;
private volatile String applicationContextLibPath;
private volatile long sendTimeout;
private volatile long receiveTimeout;
private volatile SpringDataExchanger exchanger;
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(CTX_CONFIG_PATH);
_propertyDescriptors.add(CTX_LIB_PATH);
_propertyDescriptors.add(SEND_TIMEOUT);
_propertyDescriptors.add(RECEIVE_TIMEOUT);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
}
/**
*
*/
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
/**
*
*/
@OnScheduled
public void initializeSpringContext(ProcessContext processContext) {
this.applicationContextConfigFileName = processContext.getProperty(CTX_CONFIG_PATH).getValue();
this.applicationContextLibPath = processContext.getProperty(CTX_LIB_PATH).getValue();
String stStr = processContext.getProperty(SEND_TIMEOUT).getValue();
this.sendTimeout = stStr == null ? 0 : FormatUtils.getTimeDuration(stStr, TimeUnit.MILLISECONDS);
String rtStr = processContext.getProperty(RECEIVE_TIMEOUT).getValue();
this.receiveTimeout = rtStr == null ? 0 : FormatUtils.getTimeDuration(rtStr, TimeUnit.MILLISECONDS);
try {
if (logger.isDebugEnabled()) {
logger.debug(
"Initializing Spring Application Context defined in " + this.applicationContextConfigFileName);
}
this.exchanger = SpringContextFactory.createSpringContextDelegate(this.applicationContextLibPath,
this.applicationContextConfigFileName);
} catch (Exception e) {
throw new IllegalStateException("Failed while initializing Spring Application Context", e);
}
if (logger.isInfoEnabled()) {
logger.info("Successfully initialized Spring Application Context defined in "
+ this.applicationContextConfigFileName);
}
}
/**
* Will close the 'exchanger' which in turn will close both Spring
* Application Context and the ClassLoader that loaded it allowing new
* instance of Spring Application Context to be created upon the next start
* (which may have an updated classpath and functionality) without
* restarting NiFi.
*/
@OnStopped
public void closeSpringContext(ProcessContext processContext) {
if (this.exchanger != null) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Closing Spring Application Context defined in " + this.applicationContextConfigFileName);
}
this.exchanger.close();
if (logger.isInfoEnabled()) {
logger.info("Successfully closed Spring Application Context defined in "
+ this.applicationContextConfigFileName);
}
} catch (IOException e) {
getLogger().warn("Failed while closing Spring Application Context", e);
}
}
}
/**
*
*/
@Override
public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
FlowFile flowFileToProcess = processSession.get();
if (flowFileToProcess != null) {
this.sendToSpring(flowFileToProcess, context, processSession);
}
this.receiveFromSpring(processSession);
}
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
/**
*
*/
private void sendToSpring(FlowFile flowFileToProcess, ProcessContext context, ProcessSession processSession) {
byte[] payload = this.extractMessage(flowFileToProcess, processSession);
boolean sent = false;
try {
sent = this.exchanger.send(payload, flowFileToProcess.getAttributes(), this.sendTimeout);
if (sent) {
processSession.getProvenanceReporter().send(flowFileToProcess, this.applicationContextConfigFileName);
processSession.remove(flowFileToProcess);
} else {
processSession.transfer(processSession.penalize(flowFileToProcess), REL_FAILURE);
this.getLogger().error("Timed out while sending FlowFile to Spring Application Context "
+ this.applicationContextConfigFileName);
context.yield();
}
} catch (Exception e) {
processSession.transfer(flowFileToProcess, REL_FAILURE);
this.getLogger().error("Failed while sending FlowFile to Spring Application Context "
+ this.applicationContextConfigFileName + "; " + e.getMessage(), e);
context.yield();
}
}
/**
*
*/
private void receiveFromSpring(ProcessSession processSession) {
final SpringResponse<?> msgFromSpring = this.exchanger.receive(this.receiveTimeout);
if (msgFromSpring != null) {
FlowFile flowFileToProcess = processSession.create();
flowFileToProcess = processSession.write(flowFileToProcess, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
Object payload = msgFromSpring.getPayload();
byte[] payloadBytes = payload instanceof String ? ((String) payload).getBytes() : (byte[]) payload;
out.write(payloadBytes);
}
});
flowFileToProcess = processSession.putAllAttributes(flowFileToProcess,
this.extractFlowFileAttributesFromMessageHeaders(msgFromSpring.getHeaders()));
processSession.transfer(flowFileToProcess, REL_SUCCESS);
processSession.getProvenanceReporter().receive(flowFileToProcess, this.applicationContextConfigFileName);
}
}
/**
*
*/
private Map<String, String> extractFlowFileAttributesFromMessageHeaders(Map<String, Object> messageHeaders) {
Map<String, String> attributes = new HashMap<>();
for (Entry<String, Object> entry : messageHeaders.entrySet()) {
if (entry.getValue() instanceof String) {
attributes.put(entry.getKey(), (String) entry.getValue());
}
}
return attributes;
}
/**
* Extracts contents of the {@link FlowFile} to byte array.
*/
private byte[] extractMessage(FlowFile flowFile, ProcessSession processSession) {
final byte[] messageContent = new byte[(int) flowFile.getSize()];
processSession.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, messageContent, true);
}
});
return messageContent;
}
/**
*
*/
static class SpringContextConfigValidator implements Validator {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
String configPath = context.getProperty(CTX_CONFIG_PATH).getValue();
String libDirPath = context.getProperty(CTX_LIB_PATH).getValue();
StringBuilder invalidationMessageBuilder = new StringBuilder();
if (configPath != null && libDirPath != null) {
validateClassPath(libDirPath, invalidationMessageBuilder);
if (invalidationMessageBuilder.length() == 0 && !isConfigResolvable(configPath, new File(libDirPath))) {
invalidationMessageBuilder.append("'Application Context config path' can not be located "
+ "in the provided classpath.");
}
} else if (StringUtils.isEmpty(configPath)) {
invalidationMessageBuilder.append("'Application Context config path' must not be empty.");
} else {
if (StringUtils.isEmpty(libDirPath)) {
invalidationMessageBuilder.append("'Application Context class path' must not be empty.");
} else {
validateClassPath(libDirPath, invalidationMessageBuilder);
}
}
String invalidationMessage = invalidationMessageBuilder.toString();
ValidationResult vResult = invalidationMessage.length() == 0
? new ValidationResult.Builder().subject(subject).input(input)
.explanation("Spring configuration '" + configPath + "' is resolvable "
+ "against provided classpath '" + libDirPath + "'.").valid(true).build()
: new ValidationResult.Builder().subject(subject).input(input)
.explanation("Spring configuration '" + configPath + "' is NOT resolvable "
+ "against provided classpath '" + libDirPath + "'. Validation message: " + invalidationMessage).valid(false).build();
return vResult;
}
}
/**
*
*/
private static void validateClassPath(String libDirPath, StringBuilder invalidationMessageBuilder) {
File libDirPathFile = new File(libDirPath);
if (!libDirPathFile.exists()) {
invalidationMessageBuilder.append(
"'Application Context class path' does not exist. Was '" + libDirPathFile.getAbsolutePath() + "'.");
} else if (!libDirPathFile.isDirectory()) {
invalidationMessageBuilder.append("'Application Context class path' must point to a directory. Was '"
+ libDirPathFile.getAbsolutePath() + "'.");
}
}
/**
*
*/
private static boolean isConfigResolvable(String configPath, File libDirPathFile) {
List<URL> urls = new ArrayList<>();
URLClassLoader parentLoader = (URLClassLoader) SpringContextProcessor.class.getClassLoader();
urls.addAll(Arrays.asList(parentLoader.getURLs()));
String[] resourceNames = libDirPathFile.list();
try {
for (String resourceName : resourceNames) {
File r = new File(libDirPathFile, resourceName);
if (!r.isDirectory() && !r.getName().startsWith(".")) {
URL url = new File(libDirPathFile, resourceName).toURI().toURL();
urls.add(url);
}
}
} catch (MalformedURLException e) {
throw new IllegalStateException(e);
}
boolean resolvable = false;
try (URLClassLoader throwawayCl = new URLClassLoader(urls.toArray(new URL[] {}), null)) {
resolvable = throwawayCl.findResource(configPath) != null;
} catch (IOException e) {
// ignore since it can only happen on CL.close()
}
return resolvable;
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.spring;
import java.io.Closeable;
import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.PollableChannel;
/**
* Strategy to support type-safe data exchange between NiFi and Spring. It
* exposes send/receive operations with arguments that are independent of both
* frameworks simplifying conversion from {@link Message} to {@link FlowFile}
* and vice versa.
*/
public interface SpringDataExchanger extends Closeable {
/**
* Sends data to Spring
*
* @param payload
* data that will be used as a payload or {@link Message}
* @param headers
* map that will be used to construct {@link MessageHeaders}
* @param timeout
* value to pass to the
* {@link MessageChannel#send(Message, long)} operation.
* @return 'true' if message was sent and 'false'otherwise.
*/
<T> boolean send(T payload, Map<String, ?> headers, long timeout);
/**
* Receives data from Spring
*
* @param timeout
* value to pass to {@link PollableChannel#receive(long)}
* operation
* @return {@link SpringResponse} representing <i>content</i> (key) and
* <i>attributes</i> of the FlowFile to be constructed.
*/
<T> SpringResponse<T> receive(long timeout);
public static class SpringResponse<T> {
private final T payload;
private final Map<String, Object> headers;
public SpringResponse(T payload, Map<String, Object> headers) {
this.payload = payload;
this.headers = headers;
}
public T getPayload() {
return payload;
}
public Map<String, Object> getHeaders() {
return headers;
}
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.spring;
/**
* Constants used by this module
*/
public interface SpringNiFiConstants {
final String FROM_NIFI = "fromNiFi";
final String TO_NIFI = "toNiFi";
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.spring.bootstrap;
import java.io.Closeable;
import java.io.IOException;
import java.net.URLClassLoader;
import java.util.Map;
import org.apache.nifi.spring.SpringDataExchanger;
import org.apache.nifi.spring.SpringNiFiConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;
/**
* Scopes instance of itself to a dedicated {@link ClassLoader}, thus allowing
* Spring Application Context and its class path to be modified and refreshed by
* simply re-starting SpringContextProcessor. Also ensures that there are no
* class path collisions between multiple instances of Spring Context Processor
* which are loaded by the same NAR Class Loader.
*/
/*
* This class is for internal use only and must never be instantiated by the NAR
* Class Loader (hence in a isolated package with nothing referencing it). It is
* loaded by a dedicated CL via byte array that represents it ensuring that this
* class can be loaded multiple times by multiple Class Loaders within a single
* instance of NAR.
*/
final class SpringContextDelegate implements Closeable, SpringDataExchanger {
private final Logger logger = LoggerFactory.getLogger(SpringContextDelegate.class);
private final ClassPathXmlApplicationContext applicationContext;
private final MessageChannel toSpringChannel;
private final PollableChannel fromSpringChannel;
private final String configName;
/**
*
*/
private SpringContextDelegate(String configName) {
this.configName = configName;
ClassLoader orig = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
if (logger.isDebugEnabled()) {
logger.debug("Using " + Thread.currentThread().getContextClassLoader()
+ " as context class loader while loading Spring Context '" + configName + "'.");
}
try {
this.applicationContext = new ClassPathXmlApplicationContext(configName);
if (this.applicationContext.containsBean(SpringNiFiConstants.FROM_NIFI)){
this.toSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.FROM_NIFI, MessageChannel.class);
if (logger.isDebugEnabled()) {
logger.debug("Spring Application Context defined in '" + configName
+ "' is capable of receiving messages from NiFi since 'fromNiFi' channel was discovered.");
}
} else {
this.toSpringChannel = null;
}
if (this.applicationContext.containsBean(SpringNiFiConstants.TO_NIFI)){
this.fromSpringChannel = this.applicationContext.getBean(SpringNiFiConstants.TO_NIFI, PollableChannel.class);
if (logger.isDebugEnabled()) {
logger.debug("Spring Application Context defined in '" + configName
+ "' is capable of sending messages to " + "NiFi since 'toNiFi' channel was discovered.");
}
} else {
this.fromSpringChannel = null;
}
if (logger.isInfoEnabled() && this.toSpringChannel == null && this.fromSpringChannel == null){
logger.info("Spring Application Context is headless since neither 'fromNiFi' nor 'toNiFi' channels were defined. "
+ "No data will be exchanged.");
}
} finally {
Thread.currentThread().setContextClassLoader(orig);
}
}
/**
*
*/
@Override
public <T> boolean send(T payload, Map<String, ?> messageHeaders, long timeout) {
if (this.toSpringChannel != null){
return this.toSpringChannel.send(MessageBuilder.withPayload(payload).copyHeaders(messageHeaders).build(), timeout);
} else {
throw new IllegalStateException("Failed to send message to '" + this.configName
+ "'. There are no 'fromNiFi' channels configured which means the Application Conetxt is not set up to receive messages from NiFi");
}
}
/**
*
*/
@SuppressWarnings("unchecked")
@Override
public <T> SpringResponse<T> receive(long timeout) {
if (this.fromSpringChannel != null) {
final Message<T> message = (Message<T>) this.fromSpringChannel.receive(timeout);
if (message != null) {
if (!(message.getPayload() instanceof byte[]) && !(message.getPayload() instanceof String)) {
throw new IllegalStateException("Failed while receiving message from Spring due to the "
+ "payload type being other then byte[] or String which are the only types that are supported. Please "
+ "apply transformation/conversion on Spring side when sending message back to NiFi");
}
return new SpringResponse<T>(message.getPayload(), message.getHeaders());
}
}
return null;
}
/**
*
*/
@Override
public void close() throws IOException {
logger.info("Closing Spring Application Context");
this.applicationContext.close();
if (logger.isInfoEnabled()) {
logger.info("Closing " + this.getClass().getClassLoader());
}
((URLClassLoader) this.getClass().getClassLoader()).close();
logger.info("Successfully closed Spring Application Context and its ClassLoader.");
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.spring.SpringContextProcessor

View File

@ -0,0 +1,94 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>SpringContextProcessor</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
<b>SpringContextProcessor</b> allows integration of processes encapsulated in Spring Application Context to run as NiFi
processor by becoming a runtime host for an instance of Spring Application Context.
</p>
<p>
Communication between NiFi and process encapsulated within Spring Application Context is accomplished via Spring Messaging
(one of the core modules of Spring Framework) and supports 3 usage modes:
<ul>
<li><i>Headless</i> - no interaction with NiFi, meaning nothing is sent to it and nothing is received from it (i.e., some monitoring app).
In this case NiFi simply plays the role of the runtime host.</li>
<li><i>One way (NiFi -&gt; Spring or Spring -&gt; NiFi). </i> - This depends on existence of pre-defined message channel in Spring
Application Context. The name of the channel should be “fromNiFi” and the type <i>org.springframework.messaging.MessageChannel.</i></li>
<li><i>By-directional (NiFi -&gt; Spring -&gt; Nifi or Spring -&gt; NiFi -&gt; Spring)</i> - This depends on existence of two channels
in Spring Application Context. One channel receives messages from NiFi with name “fromNiFi” and type <i>org.springframework.messaging.MessageChannel</i>i>
and another is to receive messages from Spring with name “toNiFi” and type <i>org.springframework.messaging.PollableChannel.</i></li>
</ul>
The example below demonstrates template configuration for bi-directional Spring Application Context configuration:
<code>
<pre>
&lt;int:channel id=”fromNiFi”/&gt;
&lt;!—
your custom app configuration to receive messages from fromNiFi channel and optionally send back to NiFi via toNiFi channel.
It could contain any Spring-based application (i.e., Spring Integration, Apache Camel and/or custom code). All you need to do is inject
channels into your beans and send/receive messages from it.
--&gt;
&lt;int:channel id="toNiFi"&gt;
&lt;int:queue/&gt;
&lt;/int:channel&gt;
</pre>
</code>
</p>
<p>
The component is based on assumption that user has an existing Spring Application encapsulated in Spring Context that exposes optional in/out
MessagingChannels to allow data to flow to/from ApplicationContext and into/out-of. NiFi.
Such application is realized by having a directory on the file system, which contains contains all required resources for such application to run.
Such resources usually are JAR files to satisfy application's class-path as well as JAR representing the application and its configuration.
Below is the example of what such directory may contain. In this case the 'SI_DEMO-0.0.1-SNAPSHOT.jar' represents the actual application and the rest
of the JARs represent class-path dependency required by an application.
<pre>
deps
├── SI_DEMO-0.0.1-SNAPSHOT.jar
├── aopalliance-1.0.jar
├── commons-logging-1.2.jar
├── spring-aop-4.2.4.RELEASE.jar
├── spring-beans-4.2.4.RELEASE.jar
├── spring-context-4.2.4.RELEASE.jar
├── spring-core-4.2.4.RELEASE.jar
├── spring-expression-4.2.4.RELEASE.jar
├── spring-integration-core-4.2.5.RELEASE.jar
├── spring-messaging-4.2.4.RELEASE.jar
</pre>
</p>
<p>
You introduce the processor the usual way and then configure its properties:
<ul>
<li><i><b>Application Context config path</b></i> [REQUIRED] - a path to the Application Context configuration.
The path is relative to the class-path of the application defined by the <i>Application Context class path</i> property </li>
<li><i><b>Application Context class path</b></i> [REQUIRED] - a path to a directory on the file system where application dependencies are. </li>
<li><i>Send Timeout</i> [OPTIONAL] - the wait time for sending messages to Spring Application Context. Only required if NiFi plans to send data to Spring.
Defaults to 0 (don't wait). FlowFiles that were successfully sent to Spring will be removed from session while FlowFiles that could not be sent to
Spring will be routed to <i>failure</i> relationship.</li>
<li><i>Receive Timeout</i> - [OPTIONAL] - the wait time for receiving messages from Spring Application Context. Only required if NiFi plans to receive data from Spring.
Defaults to 0 (don't wait). FlowFile is created if and only if a message is successfully received from Spring. It is then transfered to <i>success</i> relationship.</li>
</ul>
</p>
</body>
</html>

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.spring;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
import org.junit.Test;
public class SpringContextFactoryTests {
@Test(expected = IllegalStateException.class)
public void validateACFailureDueToNotFound() {
SpringContextFactory.createSpringContextDelegate(".", "foo.xml");
}
@Test(expected = IllegalStateException.class)
public void validateMessageNotSentNoFromNiFiChannel() throws Exception {
SpringDataExchanger delegate = SpringContextFactory.createSpringContextDelegate(".", "context.xml");
try {
delegate.send("hello", new HashMap<String, Object>(), 1000L);
} finally {
delegate.close();
}
}
@Test
public void validateMessageNotReceivedNoToNiFiChannel() throws Exception {
SpringDataExchanger delegate = SpringContextFactory.createSpringContextDelegate(".", "context.xml");
SpringResponse<?> fromSpring = delegate.receive(1000L);
assertNull(fromSpring);
delegate.close();
}
@Test
public void validateOneWaySent() throws Exception {
SpringDataExchanger delegate = SpringContextFactory.createSpringContextDelegate(".", "toSpringOnly.xml");
boolean sent = delegate.send("hello", new HashMap<String, Object>(), 1000L);
assertTrue(sent);
SpringResponse<?> fromSpring = delegate.receive(1000L);
assertNull(fromSpring);
delegate.close();
}
@Test
public void validateOneWayReceive() throws Exception {
SpringDataExchanger delegate = SpringContextFactory.createSpringContextDelegate(".", "fromSpringOnly.xml");
try {
delegate.send("hello", new HashMap<String, Object>(), 1000L);
} catch (IllegalStateException e) {
// ignore since its expected
}
SpringResponse<?> fromSpring = delegate.receive(1000L);
assertNotNull(fromSpring);
delegate.close();
}
@Test
public void validateRequestReply() throws Exception {
SpringDataExchanger delegate = SpringContextFactory.createSpringContextDelegate(".", "requestReply.xml");
boolean sent = delegate.send("hello", new HashMap<String, Object>(), 1000L);
assertTrue(sent);
SpringResponse<?> fromSpring = delegate.receive(1000L);
assertNotNull(fromSpring);
assertEquals("hello-hello", fromSpring.getPayload());
assertEquals("foo", fromSpring.getHeaders().get("foo"));
delegate.close();
}
@Test
public void validateMultipleSendsWithAggregatedReply() throws Exception {
SpringDataExchanger delegate = SpringContextFactory.createSpringContextDelegate(".", "aggregated.xml");
// 1
boolean sent = delegate.send("hello", new HashMap<String, Object>(), 1000L);
assertTrue(sent);
SpringResponse<?> fromSpring = delegate.receive(100L);
assertNull(fromSpring);
// 2
sent = delegate.send("hello", new HashMap<String, Object>(), 1000L);
assertTrue(sent);
fromSpring = delegate.receive(100L);
assertNull(fromSpring);
// 3
sent = delegate.send("hello", new HashMap<String, Object>(), 1000L);
assertTrue(sent);
fromSpring = delegate.receive(100L);
assertNull(fromSpring);
// 4
sent = delegate.send("hello", new HashMap<String, Object>(), 1000L);
assertTrue(sent);
fromSpring = delegate.receive(100L);
assertNotNull(fromSpring);
assertEquals("4", fromSpring.getPayload());
delegate.close();
}
}

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.spring;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.spring.SpringDataExchanger.SpringResponse;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import org.mockito.Mockito;
public class SpringContextProcessorTest {
@Test
public void notValid() {
TestRunner runner = TestRunners.newTestRunner(SpringContextProcessor.class);
runner.assertNotValid();
runner = TestRunners.newTestRunner(SpringContextProcessor.class);
runner.setProperty(SpringContextProcessor.CTX_CONFIG_PATH, "context.xml");
runner.assertNotValid();
runner = TestRunners.newTestRunner(SpringContextProcessor.class);
runner.setProperty(SpringContextProcessor.CTX_LIB_PATH, "fool");
runner.assertNotValid();
}
@SuppressWarnings("unchecked")
@Test
public void validateOneWayFromNiFi() throws Exception {
TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
runner.setProperty(SpringContextProcessor.CTX_CONFIG_PATH, "toSpringOnly.xml");
runner.setProperty(SpringContextProcessor.CTX_LIB_PATH, ".");
runner.setProperty(SpringContextProcessor.RECEIVE_TIMEOUT, "100 millis");
runner.assertValid();
runner.enqueue("Hello".getBytes());
TestProcessor processor = (TestProcessor) runner.getProcessor();
SpringDataExchanger delegate = processor.getMockedDelegate();
when(delegate.receive(Mockito.anyLong())).thenReturn(null);
when(delegate.send(Mockito.any(), Mockito.any(Map.class), Mockito.anyLong())).thenReturn(true);
runner.run(1, false);
verify(delegate, times(1)).send(Mockito.any(), Mockito.any(Map.class), Mockito.anyLong());
verify(delegate, times(1)).receive(100);
assertTrue(runner.getFlowFilesForRelationship(TestProcessor.REL_SUCCESS).isEmpty());
runner.shutdown();
}
@SuppressWarnings("unchecked")
@Test
public void validateOneWayFromSpring() throws Exception {
TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
runner.setProperty(SpringContextProcessor.CTX_CONFIG_PATH, "fromSpringOnly.xml");
runner.setProperty(SpringContextProcessor.CTX_LIB_PATH, ".");
runner.assertValid();
TestProcessor processor = (TestProcessor) runner.getProcessor();
SpringDataExchanger delegate = processor.getMockedDelegate();
SpringResponse<Object> r = new SpringResponse<Object>("hello".getBytes(),
Collections.<String, Object> emptyMap());
when(delegate.receive(Mockito.anyLong())).thenReturn(r);
when(delegate.send(Mockito.any(), Mockito.any(Map.class), Mockito.anyLong())).thenReturn(true);
runner.run(1, false);
verify(delegate, never()).send(Mockito.any(), Mockito.any(Map.class), Mockito.anyLong());
verify(delegate, times(1)).receive(0);
assertTrue(runner.getFlowFilesForRelationship(TestProcessor.REL_SUCCESS).size() == 1);
runner.shutdown();
}
@SuppressWarnings("unchecked")
@Test
public void validateBiDirectional() throws Exception {
TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
runner.setProperty(SpringContextProcessor.CTX_CONFIG_PATH, "requestReply.xml");
runner.setProperty(SpringContextProcessor.CTX_LIB_PATH, ".");
runner.setProperty(SpringContextProcessor.RECEIVE_TIMEOUT, "100 millis");
runner.assertValid();
runner.enqueue("Hello".getBytes());
TestProcessor processor = (TestProcessor) runner.getProcessor();
SpringDataExchanger delegate = processor.getMockedDelegate();
Map<String, Object> headers = new HashMap<>();
headers.put("foo", "foo");
headers.put("bar", new Object());
SpringResponse<Object> r = new SpringResponse<Object>("hello".getBytes(), headers);
when(delegate.receive(Mockito.anyLong())).thenReturn(r);
when(delegate.send(Mockito.any(), Mockito.any(Map.class), Mockito.anyLong())).thenReturn(true);
runner.run(1, false);
verify(delegate, times(1)).send(Mockito.any(), Mockito.any(Map.class), Mockito.anyLong());
verify(delegate, times(1)).receive(100);
List<MockFlowFile> ffList = runner.getFlowFilesForRelationship(TestProcessor.REL_SUCCESS);
assertTrue(ffList.size() == 1);
assertEquals("foo", ffList.get(0).getAttribute("foo"));
assertNull(ffList.get(0).getAttribute("bar"));
runner.shutdown();
}
public static class TestProcessor extends SpringContextProcessor {
private final SpringDataExchanger mockedDelegate = mock(SpringDataExchanger.class);
public SpringDataExchanger getMockedDelegate() {
return mockedDelegate;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
try {
Field ef = SpringContextProcessor.class.getDeclaredField("exchanger");
ef.setAccessible(true);
ef.set(this, this.mockedDelegate);
super.onTrigger(context, processSession);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
}

View File

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License.
See accompanying LICENSE file. -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd">
<int:transformer input-channel="fromNiFi"
output-channel="aggregatorChannel" expression="new java.lang.String(payload) + '-hello'" />
<int:aggregator input-channel="aggregatorChannel"
output-channel="toNiFi" correlation-strategy-expression="payload.length()"
expire-groups-upon-completion="true" release-strategy-expression="size() == 4"
expression="size().toString()" />
<int:channel id="toNiFi">
<int:queue />
</int:channel>
</beans>

View File

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License.
See accompanying LICENSE file. -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
</beans>

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License.
See accompanying LICENSE file. -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-event="http://www.springframework.org/schema/integration/event"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/event http://www.springframework.org/schema/integration/event/spring-integration-event-4.2.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd">
<int-event:inbound-channel-adapter
channel="toTransform" />
<int:transformer input-channel="toTransform"
output-channel="toNiFi" expression="payload.toString().getBytes()"/>
<int:channel id="toNiFi">
<int:queue />
</int:channel>
</beans>

View File

@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootCategory=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n
#log4j.category.org.apache.nifi.startup=INFO

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License.
See accompanying LICENSE file. -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd">
<int:transformer input-channel="fromNiFi"
output-channel="headerEnrich" expression="new java.lang.String(payload) + '-hello'" />
<int:header-enricher input-channel="headerEnrich"
output-channel="toNiFi">
<int:header name="foo" value="foo" />
</int:header-enricher>
<int:channel id="toNiFi">
<int:queue />
</int:channel>
</beans>

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License.
See accompanying LICENSE file. -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd">
<int:service-activator input-channel="fromNiFi"
expression="T(System).out.println(payload)" />
</beans>

View File

@ -0,0 +1,37 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-spring-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-spring-processors</module>
<module>nifi-spring-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spring-processors</artifactId>
<version>0.6.0-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -58,6 +58,7 @@
<module>nifi-splunk-bundle</module> <module>nifi-splunk-bundle</module>
<module>nifi-jms-bundle</module> <module>nifi-jms-bundle</module>
<module>nifi-cassandra-bundle</module> <module>nifi-cassandra-bundle</module>
<module>nifi-spring-bundle</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>

View File

@ -1047,6 +1047,12 @@ language governing permissions and limitations under the License. -->
<version>0.6.0-SNAPSHOT</version> <version>0.6.0-SNAPSHOT</version>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spring-nar</artifactId>
<version>0.6.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId> <artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>